手把手教你用FOFA API写个自动化资产监控脚本(Python版)
用Python构建基于FOFA API的智能资产监控系统
在网络安全和运维领域,及时掌握资产变化是防御的第一道防线。想象一下,当公司新上线了一台测试服务器却忘记关闭调试端口,或是某个老旧设备突然出现在公网,这些都可能成为攻击者的突破口。传统的人工巡检方式不仅效率低下,还容易遗漏关键变化。本文将带你用Python打造一个自动化资产监控系统,通过FOFA API实现资产发现、变更监控和实时告警的全流程自动化。
1. 准备工作:理解FOFA API的核心能力
FOFA作为专业的网络空间测绘引擎,其API提供了远超基础搜索的功能深度。在编写代码前,我们需要明确几个关键概念:
- 资产指纹 :FOFA通过特征识别技术,能够准确识别超过10万种设备类型和应用程序,从物联网摄像头到企业级防火墙都能分类
- 历史快照 :每次扫描结果都会生成时间戳记录,这是实现变更检测的基础
- 语法组合 :通过
&&、||等逻辑运算符,可以构建复杂的查询条件,例如domain="example.com" && port="8080"
申请API密钥时要注意权限分级,个人免费账户通常有调用频率限制(如每月1万条数据),而企业版支持更高的并发和更丰富的字段返回。建议先在 FOFA官网 的API文档中测试查询语句,确保语法正确后再集成到代码中。
2. 构建基础API客户端
让我们从最核心的API交互模块开始。以下代码封装了请求构造、错误处理和基础解析:
import requests
import json
from datetime import datetime
class FofaClient:
def __init__(self, email, key):
self.base_url = "https://fofa.info/api/v1"
self.auth = (email, key)
self.session = requests.Session()
def search(self, query, fields="ip,port,host,title", page=1, size=100):
params = {
"qbase64": base64.b64encode(query.encode()).decode(),
"fields": fields,
"page": page,
"size": size
}
try:
resp = self.session.get(
f"{self.base_url}/search/all",
params=params,
auth=self.auth,
timeout=30
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
print(f"API请求失败: {str(e)}")
return None
关键实现细节:
- 使用
base64编码查询语句,这是FOFA API的特殊要求 - 通过
requests.Session保持连接池,提高多次请求的效率 - 默认获取ip、port等基础字段,可根据需要扩展为
protocol,country,city等
提示:在实际项目中,建议添加重试机制和更完善的错误日志,处理API限流等异常情况
3. 实现资产变更检测引擎
单纯的资产采集还不够,我们需要识别出新增、消失和发生变化的资产。下面是变更检测的核心逻辑:
def diff_assets(current_assets, previous_assets):
current_set = {f"{a['ip']}:{a['port']}" for a in current_assets}
previous_set = {f"{a['ip']}:{a['port']}" for a in previous_assets}
added = [a for a in current_assets
if f"{a['ip']}:{a['port']}" not in previous_set]
removed = [a for a in previous_assets
if f"{a['ip']}:{a['port']}" not in current_set]
changed = []
for curr in current_assets:
for prev in previous_assets:
if (curr['ip'] == prev['ip'] and
curr['port'] == prev['port'] and
curr != prev):
changed.append({
'before': prev,
'after': curr
})
return {
'added': added,
'removed': removed,
'changed': changed
}
配合这个差分引擎,我们可以构建完整的监控流程:
- 首次运行 :建立资产基线,保存初始快照到数据库
- 后续运行 :
- 获取最新扫描结果
- 与上次结果进行差异比对
- 生成变更报告
- 更新数据库记录
4. 多通道告警通知系统
检测到变更后,需要将关键信息推送到相关人员。以下是集成企业微信机器人的示例:
import requests
def send_wecom_alert(webhook_url, changes):
added_count = len(changes['added'])
removed_count = len(changes['removed'])
markdown_content = f"""
## 资产变更告警
**新增资产**: {added_count}个
**下线资产**: {removed_count}个
### 关键变化详情
{format_changes(changes)}
"""
payload = {
"msgtype": "markdown",
"markdown": {
"content": markdown_content
}
}
requests.post(webhook_url, json=payload)
类似的,可以扩展邮件、钉钉、Slack等通知方式。对于安全团队,建议将高危变更(如暴露了管理后台)设置为最高优先级告警。
5. 进阶优化与生产级部署
要让系统真正稳定运行,还需要考虑以下增强点:
定时任务管理 :
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', hours=4)
def monitoring_job():
run_monitoring_cycle()
scheduler.start()
性能优化技巧 :
- 对大范围IP段扫描使用分页查询,避免单次返回过多数据
- 对稳定资产(如企业官网)减少扫描频率
- 使用缓存减少API调用次数
安全注意事项 :
- API密钥应存储在环境变量或加密配置中
- 扫描结果需根据敏感程度设置访问权限
- 遵守FOFA的使用条款,避免高频请求
6. 典型应用场景扩展
这套系统可以灵活适配多种业务需求:
-
漏洞影响范围评估 :当爆出新漏洞时,快速定位内部受影响资产
# 查找所有暴露的Jenkins服务 query = 'app="Jenkins" && port="8080"' -
第三方供应商监控 :持续关注合作方的资产暴露面变化
-
合规审计 :定期检查是否有未经批准的云服务或数据库暴露在公网
对于大型企业,可以考虑将这些功能封装为内部安全平台的子系统,与CMDB、SIEM等系统集成,形成完整的资产治理闭环。
更多推荐

所有评论(0)