安全工程师的“外挂”:用FOFA API + Python脚本自动化你的日常巡检

在网络安全领域,效率往往决定着防御的成败。想象一下这样的场景:凌晨三点,一个关键服务的新端口突然暴露在公网,而你的团队直到被攻击者利用后才后知后觉。这种被动防御的局面,正是自动化巡检工具要解决的核心痛点。

对于每天需要监控数百个资产的安全团队来说,手动检查每个IP的端口变化或新出现的漏洞影响范围,不仅耗时耗力,而且容易遗漏关键风险点。这就是为什么越来越多的安全工程师开始将FOFA这样的网络空间测绘引擎与Python自动化脚本结合,打造属于自己的"安全外挂"系统。

本文将面向有一定Python基础的安全从业人员,展示如何通过FOFA API实现三个典型场景的自动化:

  1. 关键资产变更的实时监控与告警
  2. 漏洞影响范围的快速评估
  3. 影子资产的自动化发现

1. FOFA API基础:从申请到第一个查询

1.1 获取API访问权限

要使用FOFA的API功能,首先需要注册账号并获取API Key:

  1. 访问FOFA官网并登录
  2. 进入"个人中心"→"API接口"
  3. 根据需求选择适合的套餐(免费版有一定调用限制)

获取到API Key后,建议将其存储在环境变量中而非直接硬编码在脚本里:

# 在Linux/macOS中
export FOFA_KEY="your_api_key_here"
export FOFA_EMAIL="your_email@example.com"

# 在Windows中
set FOFA_KEY=your_api_key_here
set FOFA_EMAIL=your_email@example.com

1.2 理解API的基本调用方式

FOFA API的核心端点是通过HTTP GET请求访问的,基本URL结构如下:

https://fofa.info/api/v1/search/all?email=EMAIL&key=KEY&qbase64=QUERY

其中 qbase64 参数需要将搜索语法进行Base64编码。例如查询域名包含"example.com"的资产:

import base64
import requests

query = 'domain="example.com"'
encoded_query = base64.b64encode(query.encode()).decode()

url = f"https://fofa.info/api/v1/search/all?email={email}&key={api_key}&qbase64={encoded_query}"
response = requests.get(url)

1.3 处理API返回数据

典型的API响应是JSON格式,包含以下关键字段:

字段名 描述 示例值
error 是否出错 false
size 结果总数 42
page 当前页码 1
results 结果数组 [["1.1.1.1", "443", "https"], ...]

一个完整的初始查询函数可能如下:

def query_fofa(search_query, page=1, size=100):
    encoded = base64.b64encode(search_query.encode()).decode()
    url = f"https://fofa.info/api/v1/search/all?email={os.getenv('FOFA_EMAIL')}&key={os.getenv('FOFA_KEY')}&qbase64={encoded}&page={page}&size={size}"
    
    try:
        response = requests.get(url)
        data = response.json()
        if data.get('error'):
            raise ValueError(f"API Error: {data.get('errmsg')}")
        return data
    except Exception as e:
        print(f"Query failed: {str(e)}")
        return None

2. 构建资产变更监控系统

2.1 设计资产变更检测逻辑

资产监控的核心是比较当前状态与历史基线之间的差异。一个健壮的监控系统应该:

  1. 定期执行FOFA查询(如每6小时)
  2. 将结果与上次记录进行比较
  3. 识别新增/减少的端口和服务
  4. 对重要变更发送告警

实现这一流程需要解决几个技术难点:

  • 数据存储 :需要一个轻量级数据库来保存历史记录
  • 变更检测 :需要高效比较两次查询结果的差异
  • 告警去重 :避免对同一变更重复告警

2.2 使用SQLite存储历史数据

SQLite是Python内置的轻量级数据库,非常适合这种场景:

import sqlite3

def init_db():
    conn = sqlite3.connect('asset_monitor.db')
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS asset_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            asset TEXT NOT NULL,
            port INTEGER,
            service TEXT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(asset, port, service)
        )
    ''')
    conn.commit()
    conn.close()

2.3 实现变更检测算法

以下是一个检测新增端口的核心函数:

def detect_new_ports(domain):
    # 获取当前FOFA结果
    current_results = query_fofa(f'domain="{domain}"')
    current_assets = {(item[0], item[1]) for item in current_results['results']}
    
    # 获取历史记录
    conn = sqlite3.connect('asset_monitor.db')
    cursor = conn.cursor()
    cursor.execute('SELECT asset, port FROM asset_history WHERE asset=?', (domain,))
    historical_assets = {(row[0], row[1]) for row in cursor.fetchall()}
    
    # 计算新增端口
    new_ports = current_assets - historical_assets
    
    # 更新数据库
    for asset, port in current_assets:
        cursor.execute(
            'INSERT OR IGNORE INTO asset_history (asset, port) VALUES (?, ?)',
            (asset, port)
        )
    
    conn.commit()
    conn.close()
    return new_ports

2.4 集成告警通知

检测到变更后,可以通过多种方式发送告警。以下是使用SMTP发送邮件的示例:

import smtplib
from email.mime.text import MIMEText

def send_alert(subject, body):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = 'monitor@yourcompany.com'
    msg['To'] = 'security-team@yourcompany.com'
    
    with smtplib.SMTP('smtp.yourcompany.com') as server:
        server.send_message(msg)

3. 漏洞影响评估自动化

3.1 构建漏洞特征库

不同漏洞在FOFA中可以通过特定特征识别。例如:

漏洞名称 FOFA搜索语法示例
Log4j app="Apache Log4j" && version<"2.15.0"
Spring4Shell app="Spring Framework" && body="org.springframework.web.servlet"
WordPress插件漏洞 app="WordPress" && body="plugin-slug"

将这些特征存储在配置文件中便于维护:

{
    "log4j": {
        "query": "app=\"Apache Log4j\" && version<\"2.15.0\"",
        "severity": "critical"
    },
    "spring4shell": {
        "query": "app=\"Spring Framework\" && body=\"org.springframework.web.servlet\"",
        "severity": "high"
    }
}

3.2 自动化漏洞扫描脚本

import json

def scan_for_vulnerabilities(config_file):
    with open(config_file) as f:
        vuln_config = json.load(f)
    
    results = {}
    for vuln_name, config in vuln_config.items():
        data = query_fofa(config['query'])
        if data and data['size'] > 0:
            results[vuln_name] = {
                'count': data['size'],
                'severity': config['severity'],
                'sample': data['results'][:3]  # 取前三个作为样本
            }
    
    return results

3.3 生成可视化报告

使用Pandas和Matplotlib可以生成直观的报告:

import pandas as pd
import matplotlib.pyplot as plt

def generate_report(vuln_results, output_file):
    df = pd.DataFrame.from_dict({
        name: [info['count'], info['severity']] 
        for name, info in vuln_results.items()
    }, orient='index', columns=['Count', 'Severity'])
    
    df.sort_values('Count', ascending=False).plot.bar(
        y='Count',
        color=df['Severity'].map({
            'critical': 'red',
            'high': 'orange',
            'medium': 'yellow'
        }),
        figsize=(10, 6)
    )
    plt.title('Vulnerability Impact Assessment')
    plt.ylabel('Affected Assets')
    plt.savefig(output_file)

4. 影子资产发现技术

4.1 定义影子资产识别策略

影子资产通常指那些存在于公网但未被公司资产管理体系记录的IT资源。识别它们需要:

  1. 从CMDB导出已知资产列表
  2. 通过FOFA查询公司相关的所有资产
  3. 比较两者差异

关键搜索维度包括:

  • 公司域名( domain="example.com"
  • 公司备案号( icp="京ICP备XXXXXX号"
  • 公司特有的技术栈( app="内部系统名称"

4.2 实现资产比对脚本

def find_shadow_assets(known_assets_file):
    # 加载已知资产
    with open(known_assets_file) as f:
        known_assets = {line.strip() for line in f}
    
    # 查询FOFA获取所有相关资产
    fofa_results = query_fofa('domain="example.com" || icp="京ICP备XXXXXX号"', size=1000)
    all_assets = {result[0] for result in fofa_results['results']}
    
    # 计算差异
    shadow_assets = all_assets - known_assets
    return shadow_assets

4.3 处理误报与验证

自动发现的影子资产可能存在误报,需要验证步骤:

  1. DNS验证 :检查IP是否确实解析到公司域名
  2. Whois查询 :验证IP或域名的注册信息
  3. 网络扫描 :对可疑资产进行轻量级扫描确认归属
import socket

def validate_shadow_asset(ip):
    try:
        hostnames = socket.gethostbyaddr(ip)
        return any('example.com' in name for name in hostnames)
    except socket.herror:
        return False

5. 进阶技巧与性能优化

5.1 处理API限制与分页

FOFA API对免费用户有严格的调用限制,付费用户也有配额。优化策略包括:

  • 缓存结果 :对不常变化的查询结果进行本地缓存
  • 智能分页 :根据 size page 参数分批获取数据
  • 错峰调用 :在非高峰时段执行大批量查询
from datetime import datetime
import time

def paginated_query(search_query, max_results=1000):
    all_results = []
    page = 1
    size = 100  # 每页最大数量
    
    while len(all_results) < max_results:
        # 在每小时的前50分钟执行查询,避免整点限制重置时拥挤
        if datetime.now().minute > 50:
            time.sleep(600)  # 等待到下一个小时
        
        data = query_fofa(search_query, page=page, size=size)
        if not data or not data['results']:
            break
            
        all_results.extend(data['results'])
        page += 1
        time.sleep(2)  # 避免触发速率限制
    
    return all_results[:max_results]

5.2 构建自动化任务调度

使用APScheduler可以创建定时任务:

from apscheduler.schedulers.blocking import BlockingScheduler

def daily_monitor():
    new_ports = detect_new_ports("example.com")
    if new_ports:
        send_alert(
            "New ports detected",
            f"Found new open ports: {', '.join(f'{ip}:{port}' for ip, port in new_ports)}"
        )

scheduler = BlockingScheduler()
scheduler.add_job(daily_monitor, 'interval', hours=6)
scheduler.start()

5.3 错误处理与重试机制

健壮的自动化脚本需要完善的错误处理:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def robust_fofa_query(query):
    result = query_fofa(query)
    if not result or result.get('error'):
        raise ValueError(result.get('errmsg', 'Unknown error'))
    return result

更多推荐