告别低效排查:用ZoomEye API + Python脚本自动化你的资产发现与监控流程
告别低效排查:用ZoomEye API + Python脚本自动化你的资产发现与监控流程
在网络安全和运维领域,资产发现与监控是日常工作中最基础却最耗时的环节之一。传统的手动搜索方式不仅效率低下,还容易遗漏关键信息。想象一下,当你需要监控公司数百个IP段中的新开放端口或服务变更时,手动操作几乎是一场噩梦。这正是自动化工具链的价值所在——将重复性劳动交给机器,让工程师专注于真正需要人类智慧的决策环节。
ZoomEye作为专业的网络空间测绘引擎,其API接口为自动化流程提供了强大支持。结合Python的灵活性和丰富的生态库,我们可以构建一套完整的资产监控系统,实现从发现、分析到告警的全流程自动化。本文将带你从零开始,开发一套可集成到现有运维体系中的自动化工具链。
1. 环境准备与ZoomEye API基础
1.1 获取API访问权限
首先需要注册ZoomEye账号并获取API Key:
- 访问ZoomEye官网完成注册
- 进入个人中心找到"API"选项卡
- 生成专属API Key并妥善保存
注意:免费账号有API调用次数限制,企业级需求建议考虑商业授权
1.2 安装必要的Python库
推荐使用Python 3.8+环境,安装以下核心依赖:
pip install requests pandas schedule python-dotenv
各库的作用:
requests:处理HTTP API请求pandas:数据分析与报表生成schedule:定时任务管理python-dotenv:环境变量管理
1.3 API基础请求示例
下面是一个简单的API测试脚本,验证环境配置是否正确:
import requests
from dotenv import load_dotenv
import os
load_dotenv()
API_KEY = os.getenv('ZOOMEYE_API_KEY')
headers = {
"API-KEY": API_KEY
}
response = requests.get(
"https://api.zoomeye.org/resources-info",
headers=headers
)
print(response.json())
将API Key保存在项目根目录的 .env 文件中:
ZOOMEYE_API_KEY=your_api_key_here
2. 构建自动化资产发现系统
2.1 设计资产搜索策略
根据不同的监控需求,可以设计多种搜索策略:
| 搜索类型 | 适用场景 | 示例查询 |
|---|---|---|
| IP段监控 | 企业内网资产发现 | cidr:192.168.1.0/24 |
| 服务发现 | 特定服务版本监控 | app:nginx port:80 |
| 漏洞扫描 | CVE相关设备发现 | app:Apache Tomcat ver:9.0.0 |
2.2 实现基础搜索功能
以下代码实现了基本的资产搜索功能:
def search_assets(query, page=1):
url = "https://api.zoomeye.org/host/search"
params = {
"query": query,
"page": page
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
return None
2.3 结果解析与存储
将API返回的JSON数据转换为结构化DataFrame:
import pandas as pd
def parse_results(data):
records = []
for item in data.get('matches', []):
record = {
'ip': item.get('ip'),
'port': item.get('portinfo', {}).get('port'),
'service': item.get('portinfo', {}).get('service'),
'app': item.get('portinfo', {}).get('app'),
'version': item.get('portinfo', {}).get('version'),
'timestamp': item.get('timestamp')
}
records.append(record)
return pd.DataFrame(records)
3. 实现资产变更监控系统
3.1 设计变更检测算法
核心思路是通过定期扫描对比历史数据,检测以下变更类型:
- 新增资产 :之前不存在的IP:Port组合
- 服务变更 :同一IP端口上的服务类型或版本变化
- 资产下线 :之前存在的服务不再响应
3.2 实现差异检测功能
def detect_changes(current_df, previous_df):
# 合并两期数据
merged = pd.merge(
current_df,
previous_df,
on=['ip', 'port'],
how='outer',
suffixes=('_current', '_previous')
)
# 识别变更类型
changes = []
for _, row in merged.iterrows():
if pd.isna(row['service_previous']):
changes.append({
'type': '新增',
'ip': row['ip'],
'port': row['port'],
'detail': f"新增服务: {row['service_current']}"
})
elif pd.isna(row['service_current']):
changes.append({
'type': '下线',
'ip': row['ip'],
'port': row['port'],
'detail': f"服务下线: {row['service_previous']}"
})
elif row['service_current'] != row['service_previous']:
changes.append({
'type': '变更',
'ip': row['ip'],
'port': row['port'],
'detail': f"{row['service_previous']} → {row['service_current']}"
})
return pd.DataFrame(changes)
3.3 定时任务集成
使用schedule库实现定时扫描:
import schedule
import time
def monitoring_job():
print(f"开始执行监控任务: {time.ctime()}")
current_data = search_assets("cidr:192.168.1.0/24")
current_df = parse_results(current_data)
try:
previous_df = pd.read_csv('last_scan.csv')
except FileNotFoundError:
previous_df = pd.DataFrame()
if not previous_df.empty:
changes = detect_changes(current_df, previous_df)
if not changes.empty:
alert_on_changes(changes)
current_df.to_csv('last_scan.csv', index=False)
# 每天凌晨2点执行
schedule.every().day.at("02:00").do(monitoring_job)
while True:
schedule.run_pending()
time.sleep(60)
4. 告警与报表系统集成
4.1 多通道告警实现
根据企业环境选择适当的告警方式:
- 邮件告警 :适合非紧急通知
- 即时通讯工具 :如企业微信、钉钉机器人
- SIEM系统集成 :通过Webhook对接安全事件管理系统
示例邮件告警实现:
import smtplib
from email.mime.text import MIMEText
def send_alert_email(changes_df):
msg = MIMEText(changes_df.to_html(), 'html')
msg['Subject'] = '资产变更告警'
msg['From'] = 'monitor@example.com'
msg['To'] = 'security-team@example.com'
with smtplib.SMTP('smtp.example.com') as server:
server.send_message(msg)
4.2 自动化报表生成
使用pandas的样式功能创建专业报表:
def generate_report(changes_df):
report = changes_df.style\
.applymap(highlight_critical, subset=['type'])\
.set_properties(**{'text-align': 'left'})\
.set_table_styles([
{'selector': 'th', 'props': [('background-color', '#f5f5f5')]}
])
with open('report.html', 'w') as f:
f.write(report.render())
def highlight_critical(val):
color = 'red' if val == '新增' else 'orange' if val == '变更' else 'gray'
return f'color: {color}'
5. 高级应用与优化技巧
5.1 性能优化策略
当监控大量资产时,需要考虑API调用效率:
- 并行请求 :使用多线程/协程并发处理
- 结果缓存 :减少重复查询
- 增量检查 :只检查可能发生变更的IP段
5.2 与企业系统集成
将监控系统融入现有DevOps工具链:
# Jenkins集成示例
def jenkins_callback(build_url):
payload = {
"text": f"资产扫描完成,查看报告: {build_url}"
}
requests.post(JENKINS_WEBHOOK, json=payload)
# Prometheus监控指标导出
from prometheus_client import start_http_server, Gauge
assets_gauge = Gauge('zoomeye_assets', 'Discovered assets count')
def export_metrics():
data = search_assets("cidr:192.168.1.0/24")
assets_gauge.set(len(data.get('matches', [])))
5.3 异常处理与日志
健壮的生产级代码需要完善的错误处理:
import logging
logging.basicConfig(
filename='monitor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def safe_search(query):
try:
return search_assets(query)
except Exception as e:
logging.error(f"搜索失败: {str(e)}")
notify_admin(f"搜索失败: {query}")
return None
在实际项目中,这套系统帮助我们将资产发现时间从原来的数小时缩短到几分钟,变更检测的实时性也从天级别提升到小时级别。一个特别有用的技巧是为不同类型的资产设置不同的扫描频率——对核心业务系统提高扫描频率,而对相对静态的基础设施则降低频率以节省API配额。
更多推荐
所有评论(0)