用Python构建基于FOFA API的智能资产监控系统

在网络安全和运维领域,及时掌握资产变化是防御的第一道防线。想象一下,当公司新上线了一台测试服务器却忘记关闭调试端口,或是某个老旧设备突然出现在公网,这些都可能成为攻击者的突破口。传统的人工巡检方式不仅效率低下,还容易遗漏关键变化。本文将带你用Python打造一个自动化资产监控系统,通过FOFA API实现资产发现、变更监控和实时告警的全流程自动化。

1. 准备工作:理解FOFA API的核心能力

FOFA作为专业的网络空间测绘引擎,其API提供了远超基础搜索的功能深度。在编写代码前,我们需要明确几个关键概念:

  • 资产指纹 :FOFA通过特征识别技术,能够准确识别超过10万种设备类型和应用程序,从物联网摄像头到企业级防火墙都能分类
  • 历史快照 :每次扫描结果都会生成时间戳记录,这是实现变更检测的基础
  • 语法组合 :通过 && || 等逻辑运算符,可以构建复杂的查询条件,例如 domain="example.com" && port="8080"

申请API密钥时要注意权限分级,个人免费账户通常有调用频率限制(如每月1万条数据),而企业版支持更高的并发和更丰富的字段返回。建议先在 FOFA官网 的API文档中测试查询语句,确保语法正确后再集成到代码中。

2. 构建基础API客户端

让我们从最核心的API交互模块开始。以下代码封装了请求构造、错误处理和基础解析:

import requests
import json
from datetime import datetime

class FofaClient:
    def __init__(self, email, key):
        self.base_url = "https://fofa.info/api/v1"
        self.auth = (email, key)
        self.session = requests.Session()
        
    def search(self, query, fields="ip,port,host,title", page=1, size=100):
        params = {
            "qbase64": base64.b64encode(query.encode()).decode(),
            "fields": fields,
            "page": page,
            "size": size
        }
        
        try:
            resp = self.session.get(
                f"{self.base_url}/search/all",
                params=params,
                auth=self.auth,
                timeout=30
            )
            resp.raise_for_status()
            return resp.json()
        except requests.exceptions.RequestException as e:
            print(f"API请求失败: {str(e)}")
            return None

关键实现细节:

  • 使用 base64 编码查询语句,这是FOFA API的特殊要求
  • 通过 requests.Session 保持连接池,提高多次请求的效率
  • 默认获取ip、port等基础字段,可根据需要扩展为 protocol,country,city

提示:在实际项目中,建议添加重试机制和更完善的错误日志,处理API限流等异常情况

3. 实现资产变更检测引擎

单纯的资产采集还不够,我们需要识别出新增、消失和发生变化的资产。下面是变更检测的核心逻辑:

def diff_assets(current_assets, previous_assets):
    current_set = {f"{a['ip']}:{a['port']}" for a in current_assets}
    previous_set = {f"{a['ip']}:{a['port']}" for a in previous_assets}
    
    added = [a for a in current_assets 
             if f"{a['ip']}:{a['port']}" not in previous_set]
    removed = [a for a in previous_assets 
               if f"{a['ip']}:{a['port']}" not in current_set]
    
    changed = []
    for curr in current_assets:
        for prev in previous_assets:
            if (curr['ip'] == prev['ip'] and 
                curr['port'] == prev['port'] and 
                curr != prev):
                changed.append({
                    'before': prev,
                    'after': curr
                })
    
    return {
        'added': added,
        'removed': removed,
        'changed': changed
    }

配合这个差分引擎,我们可以构建完整的监控流程:

  1. 首次运行 :建立资产基线,保存初始快照到数据库
  2. 后续运行
    • 获取最新扫描结果
    • 与上次结果进行差异比对
    • 生成变更报告
    • 更新数据库记录

4. 多通道告警通知系统

检测到变更后,需要将关键信息推送到相关人员。以下是集成企业微信机器人的示例:

import requests

def send_wecom_alert(webhook_url, changes):
    added_count = len(changes['added'])
    removed_count = len(changes['removed'])
    
    markdown_content = f"""
    ## 资产变更告警
    **新增资产**: {added_count}个
    **下线资产**: {removed_count}个
    
    ### 关键变化详情
    {format_changes(changes)}
    """
    
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "content": markdown_content
        }
    }
    
    requests.post(webhook_url, json=payload)

类似的,可以扩展邮件、钉钉、Slack等通知方式。对于安全团队,建议将高危变更(如暴露了管理后台)设置为最高优先级告警。

5. 进阶优化与生产级部署

要让系统真正稳定运行,还需要考虑以下增强点:

定时任务管理

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', hours=4)
def monitoring_job():
    run_monitoring_cycle()

scheduler.start()

性能优化技巧

  • 对大范围IP段扫描使用分页查询,避免单次返回过多数据
  • 对稳定资产(如企业官网)减少扫描频率
  • 使用缓存减少API调用次数

安全注意事项

  • API密钥应存储在环境变量或加密配置中
  • 扫描结果需根据敏感程度设置访问权限
  • 遵守FOFA的使用条款,避免高频请求

6. 典型应用场景扩展

这套系统可以灵活适配多种业务需求:

  • 漏洞影响范围评估 :当爆出新漏洞时,快速定位内部受影响资产

    # 查找所有暴露的Jenkins服务
    query = 'app="Jenkins" && port="8080"'
    
  • 第三方供应商监控 :持续关注合作方的资产暴露面变化

  • 合规审计 :定期检查是否有未经批准的云服务或数据库暴露在公网

对于大型企业,可以考虑将这些功能封装为内部安全平台的子系统,与CMDB、SIEM等系统集成,形成完整的资产治理闭环。

更多推荐