安全工程师的“外挂”:用FOFA API + Python脚本自动化你的日常巡检
安全工程师的“外挂”:用FOFA API + Python脚本自动化你的日常巡检
在网络安全领域,效率往往决定着防御的成败。想象一下这样的场景:凌晨三点,一个关键服务的新端口突然暴露在公网,而你的团队直到被攻击者利用后才后知后觉。这种被动防御的局面,正是自动化巡检工具要解决的核心痛点。
对于每天需要监控数百个资产的安全团队来说,手动检查每个IP的端口变化或新出现的漏洞影响范围,不仅耗时耗力,而且容易遗漏关键风险点。这就是为什么越来越多的安全工程师开始将FOFA这样的网络空间测绘引擎与Python自动化脚本结合,打造属于自己的"安全外挂"系统。
本文将面向有一定Python基础的安全从业人员,展示如何通过FOFA API实现三个典型场景的自动化:
- 关键资产变更的实时监控与告警
- 漏洞影响范围的快速评估
- 影子资产的自动化发现
1. FOFA API基础:从申请到第一个查询
1.1 获取API访问权限
要使用FOFA的API功能,首先需要注册账号并获取API Key:
- 访问FOFA官网并登录
- 进入"个人中心"→"API接口"
- 根据需求选择适合的套餐(免费版有一定调用限制)
获取到API Key后,建议将其存储在环境变量中而非直接硬编码在脚本里:
# 在Linux/macOS中
export FOFA_KEY="your_api_key_here"
export FOFA_EMAIL="your_email@example.com"
# 在Windows中
set FOFA_KEY=your_api_key_here
set FOFA_EMAIL=your_email@example.com
1.2 理解API的基本调用方式
FOFA API的核心端点是通过HTTP GET请求访问的,基本URL结构如下:
https://fofa.info/api/v1/search/all?email=EMAIL&key=KEY&qbase64=QUERY
其中 qbase64 参数需要将搜索语法进行Base64编码。例如查询域名包含"example.com"的资产:
import base64
import requests
query = 'domain="example.com"'
encoded_query = base64.b64encode(query.encode()).decode()
url = f"https://fofa.info/api/v1/search/all?email={email}&key={api_key}&qbase64={encoded_query}"
response = requests.get(url)
1.3 处理API返回数据
典型的API响应是JSON格式,包含以下关键字段:
| 字段名 | 描述 | 示例值 |
|---|---|---|
error |
是否出错 | false |
size |
结果总数 | 42 |
page |
当前页码 | 1 |
results |
结果数组 | [["1.1.1.1", "443", "https"], ...] |
一个完整的初始查询函数可能如下:
def query_fofa(search_query, page=1, size=100):
encoded = base64.b64encode(search_query.encode()).decode()
url = f"https://fofa.info/api/v1/search/all?email={os.getenv('FOFA_EMAIL')}&key={os.getenv('FOFA_KEY')}&qbase64={encoded}&page={page}&size={size}"
try:
response = requests.get(url)
data = response.json()
if data.get('error'):
raise ValueError(f"API Error: {data.get('errmsg')}")
return data
except Exception as e:
print(f"Query failed: {str(e)}")
return None
2. 构建资产变更监控系统
2.1 设计资产变更检测逻辑
资产监控的核心是比较当前状态与历史基线之间的差异。一个健壮的监控系统应该:
- 定期执行FOFA查询(如每6小时)
- 将结果与上次记录进行比较
- 识别新增/减少的端口和服务
- 对重要变更发送告警
实现这一流程需要解决几个技术难点:
- 数据存储 :需要一个轻量级数据库来保存历史记录
- 变更检测 :需要高效比较两次查询结果的差异
- 告警去重 :避免对同一变更重复告警
2.2 使用SQLite存储历史数据
SQLite是Python内置的轻量级数据库,非常适合这种场景:
import sqlite3
def init_db():
conn = sqlite3.connect('asset_monitor.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS asset_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset TEXT NOT NULL,
port INTEGER,
service TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(asset, port, service)
)
''')
conn.commit()
conn.close()
2.3 实现变更检测算法
以下是一个检测新增端口的核心函数:
def detect_new_ports(domain):
# 获取当前FOFA结果
current_results = query_fofa(f'domain="{domain}"')
current_assets = {(item[0], item[1]) for item in current_results['results']}
# 获取历史记录
conn = sqlite3.connect('asset_monitor.db')
cursor = conn.cursor()
cursor.execute('SELECT asset, port FROM asset_history WHERE asset=?', (domain,))
historical_assets = {(row[0], row[1]) for row in cursor.fetchall()}
# 计算新增端口
new_ports = current_assets - historical_assets
# 更新数据库
for asset, port in current_assets:
cursor.execute(
'INSERT OR IGNORE INTO asset_history (asset, port) VALUES (?, ?)',
(asset, port)
)
conn.commit()
conn.close()
return new_ports
2.4 集成告警通知
检测到变更后,可以通过多种方式发送告警。以下是使用SMTP发送邮件的示例:
import smtplib
from email.mime.text import MIMEText
def send_alert(subject, body):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'monitor@yourcompany.com'
msg['To'] = 'security-team@yourcompany.com'
with smtplib.SMTP('smtp.yourcompany.com') as server:
server.send_message(msg)
3. 漏洞影响评估自动化
3.1 构建漏洞特征库
不同漏洞在FOFA中可以通过特定特征识别。例如:
| 漏洞名称 | FOFA搜索语法示例 |
|---|---|
| Log4j | app="Apache Log4j" && version<"2.15.0" |
| Spring4Shell | app="Spring Framework" && body="org.springframework.web.servlet" |
| WordPress插件漏洞 | app="WordPress" && body="plugin-slug" |
将这些特征存储在配置文件中便于维护:
{
"log4j": {
"query": "app=\"Apache Log4j\" && version<\"2.15.0\"",
"severity": "critical"
},
"spring4shell": {
"query": "app=\"Spring Framework\" && body=\"org.springframework.web.servlet\"",
"severity": "high"
}
}
3.2 自动化漏洞扫描脚本
import json
def scan_for_vulnerabilities(config_file):
with open(config_file) as f:
vuln_config = json.load(f)
results = {}
for vuln_name, config in vuln_config.items():
data = query_fofa(config['query'])
if data and data['size'] > 0:
results[vuln_name] = {
'count': data['size'],
'severity': config['severity'],
'sample': data['results'][:3] # 取前三个作为样本
}
return results
3.3 生成可视化报告
使用Pandas和Matplotlib可以生成直观的报告:
import pandas as pd
import matplotlib.pyplot as plt
def generate_report(vuln_results, output_file):
df = pd.DataFrame.from_dict({
name: [info['count'], info['severity']]
for name, info in vuln_results.items()
}, orient='index', columns=['Count', 'Severity'])
df.sort_values('Count', ascending=False).plot.bar(
y='Count',
color=df['Severity'].map({
'critical': 'red',
'high': 'orange',
'medium': 'yellow'
}),
figsize=(10, 6)
)
plt.title('Vulnerability Impact Assessment')
plt.ylabel('Affected Assets')
plt.savefig(output_file)
4. 影子资产发现技术
4.1 定义影子资产识别策略
影子资产通常指那些存在于公网但未被公司资产管理体系记录的IT资源。识别它们需要:
- 从CMDB导出已知资产列表
- 通过FOFA查询公司相关的所有资产
- 比较两者差异
关键搜索维度包括:
- 公司域名(
domain="example.com") - 公司备案号(
icp="京ICP备XXXXXX号") - 公司特有的技术栈(
app="内部系统名称")
4.2 实现资产比对脚本
def find_shadow_assets(known_assets_file):
# 加载已知资产
with open(known_assets_file) as f:
known_assets = {line.strip() for line in f}
# 查询FOFA获取所有相关资产
fofa_results = query_fofa('domain="example.com" || icp="京ICP备XXXXXX号"', size=1000)
all_assets = {result[0] for result in fofa_results['results']}
# 计算差异
shadow_assets = all_assets - known_assets
return shadow_assets
4.3 处理误报与验证
自动发现的影子资产可能存在误报,需要验证步骤:
- DNS验证 :检查IP是否确实解析到公司域名
- Whois查询 :验证IP或域名的注册信息
- 网络扫描 :对可疑资产进行轻量级扫描确认归属
import socket
def validate_shadow_asset(ip):
try:
hostnames = socket.gethostbyaddr(ip)
return any('example.com' in name for name in hostnames)
except socket.herror:
return False
5. 进阶技巧与性能优化
5.1 处理API限制与分页
FOFA API对免费用户有严格的调用限制,付费用户也有配额。优化策略包括:
- 缓存结果 :对不常变化的查询结果进行本地缓存
- 智能分页 :根据
size和page参数分批获取数据 - 错峰调用 :在非高峰时段执行大批量查询
from datetime import datetime
import time
def paginated_query(search_query, max_results=1000):
all_results = []
page = 1
size = 100 # 每页最大数量
while len(all_results) < max_results:
# 在每小时的前50分钟执行查询,避免整点限制重置时拥挤
if datetime.now().minute > 50:
time.sleep(600) # 等待到下一个小时
data = query_fofa(search_query, page=page, size=size)
if not data or not data['results']:
break
all_results.extend(data['results'])
page += 1
time.sleep(2) # 避免触发速率限制
return all_results[:max_results]
5.2 构建自动化任务调度
使用APScheduler可以创建定时任务:
from apscheduler.schedulers.blocking import BlockingScheduler
def daily_monitor():
new_ports = detect_new_ports("example.com")
if new_ports:
send_alert(
"New ports detected",
f"Found new open ports: {', '.join(f'{ip}:{port}' for ip, port in new_ports)}"
)
scheduler = BlockingScheduler()
scheduler.add_job(daily_monitor, 'interval', hours=6)
scheduler.start()
5.3 错误处理与重试机制
健壮的自动化脚本需要完善的错误处理:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def robust_fofa_query(query):
result = query_fofa(query)
if not result or result.get('error'):
raise ValueError(result.get('errmsg', 'Unknown error'))
return result
更多推荐
所有评论(0)