用Python构建企业级资产监控系统:FOFA API实战指南

在数字化时代,企业资产暴露面管理已成为安全运维的核心挑战。传统人工巡检方式效率低下,而商业监控工具又往往价格昂贵且缺乏定制性。本文将带你从零构建一个基于FOFA API的自动化资产监控系统,通过Python实现数据采集、分析预警和可视化呈现的全流程。

1. 准备工作:理解FOFA API的核心能力

FOFA作为专业的网络空间测绘引擎,其API提供了远超基础搜索的高级功能。在开始编码前,我们需要明确几个关键概念:

  • 资产指纹识别 :通过特定语法组合精准定位目标资产
  • 历史数据对比 :追踪资产变更轨迹,识别异常变动
  • 风险量化分析 :结合端口、服务等信息评估暴露面风险等级

申请API密钥的流程非常简单:

  1. 登录FOFA官网并进入个人中心
  2. 在"API管理"页面选择适合的套餐(免费版已有基础调用权限)
  3. 生成并妥善保存API Key和Email这组认证信息

提示:生产环境建议使用付费套餐以获得更高的请求配额和更完整的数据字段

2. API调用核心模块开发

2.1 基础请求封装

我们先构建一个可复用的请求类,处理认证、限流等基础问题:

import requests
import time
from collections import deque

class FofaAPI:
    def __init__(self, email, key, max_retry=3):
        self.base_url = "https://fofa.info/api/v1"
        self.auth = (email, key)
        self.request_history = deque(maxlen=20)  # 维护最近20次请求时间
        
    def _rate_limit(self):
        """确保每分钟请求不超过30次"""
        now = time.time()
        if len(self.request_history) >= 20:
            oldest = self.request_history[0]
            if now - oldest < 60:
                time.sleep(60 - (now - oldest))
        return now
    
    def search(self, query, fields="ip,port,host,title,server", page=1, size=100):
        current_time = self._rate_limit()
        params = {
            "qbase64": base64.b64encode(query.encode()).decode(),
            "fields": fields,
            "page": page,
            "size": size
        }
        try:
            resp = requests.get(f"{self.base_url}/search/all", 
                              params=params,
                              auth=self.auth)
            self.request_history.append(current_time)
            return resp.json()
        except Exception as e:
            print(f"API请求异常: {str(e)}")
            return None

2.2 高级查询技巧

掌握FOFA搜索语法能显著提升监控精度,以下是几个实用技巧:

  • 组合查询 domain="example.com" && port="443" 定位特定域名的HTTPS服务
  • 排除干扰 title=="Login" && !body="test" 过滤测试页面
  • 时间范围 after="2023-01-01" 只关注新增资产
# 示例:获取所有暴露的Redis服务
def find_exposed_redis(api):
    results = api.search('protocol=="redis" && port=="6379"', 
                        fields="ip,port,host")
    if results and results['error'] is False:
        return results['results']
    return []

3. 数据存储与分析模块

3.1 资产数据库设计

使用SQLite实现轻量级存储方案:

import sqlite3
from contextlib import closing

def init_db(db_path):
    with closing(sqlite3.connect(db_path)) as conn:
        cursor = conn.cursor()
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS assets (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            ip TEXT NOT NULL,
            port INTEGER,
            host TEXT,
            service_type TEXT,
            first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_seen TIMESTAMP,
            risk_score INTEGER DEFAULT 0,
            UNIQUE(ip, port)
        )
        """)
        # 创建变更记录表
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS asset_changes (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            asset_id INTEGER,
            change_type TEXT,  -- '新增'/'消失'/'修改'
            change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            details TEXT,
            FOREIGN KEY(asset_id) REFERENCES assets(id)
        )
        """)
        conn.commit()

3.2 差异检测算法

核心是比较新旧扫描结果,识别关键变化:

def detect_changes(current_scan, db_conn):
    cursor = db_conn.cursor()
    
    # 获取已知资产快照
    cursor.execute("SELECT ip, port FROM assets")
    existing_assets = {(row[0], row[1]) for row in cursor.fetchall()}
    
    current_assets = {(item['ip'], item['port']) for item in current_scan}
    
    # 计算变化集
    new_assets = current_assets - existing_assets
    disappeared = existing_assets - current_assets
    
    # 处理新增资产
    for ip, port in new_assets:
        cursor.execute("""
        INSERT INTO assets(ip, port, last_seen) 
        VALUES(?, ?, datetime('now'))
        ON CONFLICT(ip, port) DO UPDATE SET last_seen=excluded.last_seen
        """, (ip, port))
        
        # 记录变更
        cursor.execute("""
        INSERT INTO asset_changes(asset_id, change_type, details)
        SELECT id, '新增', json_object('ip', ?, 'port', ?)
        FROM assets WHERE ip=? AND port=?
        """, (ip, port, ip, port))
    
    # 处理消失资产
    for ip, port in disappeared:
        cursor.execute("""
        INSERT INTO asset_changes(asset_id, change_type)
        SELECT id, '消失' FROM assets WHERE ip=? AND port=?
        """, (ip, port))
    
    db_conn.commit()
    return len(new_assets), len(disappeared)

4. 可视化与告警系统

4.1 Flask可视化面板

构建一个简洁的监控仪表盘:

from flask import Flask, render_template
import pandas as pd

app = Flask(__name__)

@app.route('/dashboard')
def show_dashboard():
    conn = sqlite3.connect('assets.db')
    
    # 获取资产统计
    stats = pd.read_sql("""
    SELECT service_type, COUNT(*) as count 
    FROM assets 
    GROUP BY service_type
    """, conn).to_dict('records')
    
    # 获取最近变更
    changes = pd.read_sql("""
    SELECT a.ip, a.port, c.change_type, c.change_time
    FROM asset_changes c
    JOIN assets a ON c.asset_id = a.id
    ORDER BY c.change_time DESC
    LIMIT 20
    """, conn)
    
    return render_template('dashboard.html', 
                         stats=stats,
                         changes=changes)

4.2 企业微信机器人集成

实现实时告警推送:

import json
import requests

def send_wechat_alert(webhook_url, changes):
    markdown_content = "## 资产变更告警\n"
    for change in changes:
        markdown_content += f"- {change['change_time']} {change['ip']}:{change['port']} {change['change_type']}\n"
    
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "content": markdown_content
        }
    }
    
    try:
        resp = requests.post(webhook_url, 
                           data=json.dumps(payload),
                           headers={'Content-Type': 'application/json'})
        return resp.status_code == 200
    except Exception as e:
        print(f"告警发送失败: {str(e)}")
        return False

5. 系统优化与进阶技巧

5.1 性能调优策略

当监控目标规模较大时,需要考虑以下优化方案:

  • 分布式扫描 :将目标按IP段或业务单元拆分,多进程并行处理
  • 增量更新 :对稳定资产降低扫描频率��重点监控变更频繁区域
  • 缓存机制 :对静态资源(如备案信息)实施本地缓存
# 使用Celery实现分布式任务队列
from celery import Celery

app = Celery('fofa_monitor', broker='redis://localhost:6379/0')

@app.task
def scan_ip_segment(ip_range):
    api = FofaAPI(FOFA_EMAIL, FOFA_KEY)
    results = api.search(f'ip="{ip_range}"')
    process_results.delay(results)

5.2 风险评分模型

建立简单的风险评估体系:

风险指标 权重 评分标准
敏感端口暴露 30% 22/3389/6379等端口各+10分
已知漏洞服务版本 25% 根据CVE数据库匹配程度评分
业务关键性 20% 核心业务系统+20分
暴露时长 15% 超过30天未处理+15分
历史攻击记录 10% 有记录+10分

实现代码示例:

def calculate_risk_score(asset):
    score = 0
    # 检查敏感端口
    if asset['port'] in [22, 3389, 6379]:
        score += 10
    
    # 检查服务版本漏洞
    if is_vulnerable_version(asset['server']):
        score += 25
    
    # 其他评估逻辑...
    return min(score, 100)  # 确保不超过100分

def is_vulnerable_version(server_header):
    # 这里应接入漏洞数据库进行匹配
    vulnerable_versions = {
        'nginx': ['1.18.0', '1.20.1'],
        'Apache': ['2.4.49', '2.4.51']
    }
    for software, versions in vulnerable_versions.items():
        if software in server_header:
            for v in versions:
                if v in server_header:
                    return True
    return False

6. 实战案例:某企业监控系统改造

某金融企业原有监控系统存在以下痛点:

  • 人工维护资产清单,更新滞后
  • 无法感知第三方服务商资产变动
  • 缺少统一风险视图

通过引入FOFA API解决方案后:

  1. 自动化发现 :每周自动扫描企业相关域名和IP段
  2. 变更追踪 :识别出3个未经报备的测试环境暴露公网
  3. 风险处置 :发现2台存在Heartbleed漏洞的遗留服务器

关键实现代码:

# 企业专属监控策略
def enterprise_monitor():
    domains = ['company.com', 'payment-company.com']
    ip_ranges = ['192.168.1.0/24', '10.10.0.0/16']
    
    all_results = []
    for domain in domains:
        all_results.extend(api.search(f'domain="{domain}"'))
    
    for ip_range in ip_ranges:
        all_results.extend(api.search(f'ip="{ip_range}"'))
    
    # 执行风险分析
    risky_assets = []
    for asset in all_results:
        if calculate_risk_score(asset) > 70:
            risky_assets.append(asset)
    
    # 生成周报
    generate_weekly_report(risky_assets)

这套系统最终帮助企业将资产发现效率提升80%,平均风险处置时间从72小时缩短至4小时。

更多推荐