CAI:开源网络安全AI框架,打造自主安全测试智能体
CAI(Cybersecurity AI)是一个开源的网络安全AI框架,旨在构建能够自主执行完整网络安全攻击链的智能体系统。该项目通过模块化的AI智能体架构,实现了从侦察到权限提升的全流程自动化安全测试,在CTF挑战中表现优异,平均比人类专家快11倍。
·
项目概述
CAI(Cybersecurity AI)是一个开源的网络安全AI框架,旨在构建能够自主执行完整网络安全攻击链的智能体系统。该项目通过模块化的AI智能体架构,实现了从侦察到权限提升的全流程自动化安全测试,在CTF挑战中表现优异,平均比人类专家快11倍。
功能特性
- 多智能体架构:包含红队、蓝队、DFIR、代码执行、内存分析等专业安全智能体
- 自主攻击链执行:支持完整的网络安全攻击链自动化,包括侦察、漏洞利用、权限提升
- 基准测试体系:集成CyberMetric、SecEval、CTIBench等多个安全评估基准
- 隐私保护评估:提供CyberPII-Bench基准,专门评估LLM在安全场景中的PII处理能力
- 并行执行模式:支持多智能体并行执行和协作模式
- 记忆管理系统:基于RAG的记忆存储和检索,支持跨安全演练的知识复用
- 工具集成丰富:集成Shodan、Nmap、Burp Suite等主流安全工具
安装指南
环境要求
- Python 3.8+
- Docker(可选,用于虚拟化环境)
- 主流操作系统(Linux/macOS/Windows)
安装步骤
- 克隆项目仓库:
git clone https://github.com/aliasrobotics/CAI.git
cd CAI
- 安装依赖包:
pip install setuptools wheel twine build
- 安装核心框架:
python3 -m build
pip install -e .
- 安装基准测试依赖:
pip install cvss
git submodule update --init --recursive
- 配置环境变量(在.env文件中):
OPENAI_API_KEY="your_api_key"
ANTHROPIC_API_KEY="your_anthropic_key"
OPENROUTER_API_KEY="your_openrouter_key"
OLLAMA_API_BASE="http://localhost:11434"
使用说明
基本使用
运行基准测试评估:
python benchmarks/eval.py --model ollama/qwen2.5:14b \
--dataset_file benchmarks/cybermetric/CyberMetric-80-v1.json \
--eval cybermetric --backend ollama
启动CLI交互界面:
python -m cai.cli
智能体使用示例
红队智能体执行渗透测试:
from cai.agents.red_teamer import redteam_agent
# 执行侦察任务
result = redteam_agent.run("对目标网络进行端口扫描和服务识别")
蓝队智能体进行防御分析:
from cai.agents.blue_team import blueteam_agent
# 执行安全监控
result = blueteam_agent.run("检查系统日志中的异常登录行为")
并行执行模式
配置并行智能体执行:
from cai.repl.commands.parallel import ParallelConfig
# 配置红蓝队并行执行
parallel_configs = [
ParallelConfig("redteam_agent"),
ParallelConfig("blueteam_agent")
]
核心代码
红队智能体实现
"""Red Team Base Agent"""
import os
from dotenv import load_dotenv
from cai.sdk.agents import Agent, OpenAIChatCompletionsModel
from openai import AsyncOpenAI
from cai.util import load_prompt_template, create_system_prompt_renderer
from cai.tools.command_and_control.sshpass import run_ssh_command_with_credentials
from cai.tools.reconnaissance.generic_linux_command import generic_linux_command
from cai.tools.web.search_web import make_google_search
from cai.tools.reconnaissance.exec_code import execute_code
from cai.tools.reconnaissance.shodan import shodan_search, shodan_host_info
load_dotenv()
# 加载系统提示词
bug_bounter_system_prompt = load_prompt_template("prompts/system_bug_bounter.md")
# 定义工具列表
tools = [
generic_linux_command,
execute_code,
shodan_search,
shodan_host_info
]
if os.getenv('GOOGLE_SEARCH_API_KEY') and os.getenv('GOOGLE_SEARCH_CX'):
tools.append(make_google_search)
# 创建红队智能体实例
bug_bounter_agent = Agent(
name="Bug Bounter",
instructions=create_system_prompt_renderer(bug_bounter_system_prompt),
description="""Agent that specializes in bug bounty hunting and vulnerability discovery.
Expert in web security, API testing, and responsible disclosure.""",
tools=tools,
model=OpenAIChatCompletionsModel(
model=os.getenv('CAI_MODEL', "alias0"),
openai_client=AsyncOpenAI(),
)
)
代码执行智能体
"""A Coding Agent (CodeAgent)"""
import copy
import platform
import re
import signal
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from cai.agents.meta.local_python_executor import LocalPythonInterpreter
from cai.sdk.agents import Agent, Result, OpenAIChatCompletionsModel
from dotenv import load_dotenv
from openai import AsyncOpenAI
class CodeAgentException(Exception):
"""Base exception class for CodeAgent-related errors."""
pass
class CodeAgent(Agent):
"""CodeAgent that uses executable Python code to consolidate LLM agents' actions."""
def __init__(self, model_name: str = "alias0"):
# 初始化Python解释器
self.interpreter = LocalPythonInterpreter()
# 初始化父类
super().__init__(
name="CodeAgent",
description="Agent that uses executable Python code for security testing",
instructions="""You are a CodeAgent specialized in cybersecurity tasks.
Use Python code to analyze, exploit, and assess security vulnerabilities.""",
model=OpenAIChatCompletionsModel(
model=model_name,
openai_client=AsyncOpenAI(),
),
tools=[self.execute_python_code]
)
def execute_python_code(self, code: str) -> str:
"""Execute Python code in a secure environment."""
try:
result = self.interpreter.execute(code)
return str(result)
except Exception as e:
return f"Execution error: {str(e)}"
基准测试评估系统
"""Benchmark Evaluation Script"""
import argparse
import json
from typing import Dict, List
def evaluate_model(model_name: str, dataset_file: str, eval_type: str, backend: str):
"""
评估语言模型在网络安全相关多选题基准上的表现
Args:
model_name: 模型名称(如 "gpt-4", "qwen2.5:14b")
dataset_file: 数据集文件路径(JSON或TSV格式)
eval_type: 评估类型("cybermetric", "seceval", "cti_bench")
backend: 后端类型("openai", "openrouter", "ollama", "anthropic")
"""
# 加载数据集
if dataset_file.endswith('.json'):
with open(dataset_file, 'r') as f:
dataset = json.load(f)
else:
dataset = load_tsv_dataset(dataset_file)
# 初始化评估器
evaluator = get_evaluator(eval_type)
# 运行评估
results = []
for item in dataset:
response = query_model(model_name, item['question'], backend)
score = evaluator.evaluate(response, item['answer'])
results.append(score)
# 计算总体得分
final_score = sum(results) / len(results)
return final_score
def main():
parser = argparse.ArgumentParser(description='网络安全AI基准评估')
parser.add_argument('-m', '--model', required=True, help='要评估的模型名称')
parser.add_argument('-d', '--dataset_file', required=True, help='数据集文件路径')
parser.add_argument('-e', '--eval', required=True, help='评估基准类型')
parser.add_argument('-B', '--backend', required=True, help='后端类型')
args = parser.parse_args()
score = evaluate_model(args.model, args.dataset_file, args.eval, args.backend)
print(f"模型 {args.model} 在 {args.eval} 基准上的得分: {score:.2%}")
记忆管理系统
"""Memory agent for CAI with RAG support"""
import os
from typing import Dict, List
from cai.sdk.agents import Agent
from cai.rag.vector_db import VectorDatabase
class MemoryAgent(Agent):
"""记忆管理智能体,支持长期经验存储和检索"""
def __init__(self):
# 初始化向量数据库
self.vector_db = VectorDatabase()
super().__init__(
name="Memory Agent",
description="Manages long-term memory experiences across security exercises",
instructions="""You manage episodic and semantic memory for security exercises.
Store experiences in vector database and retrieve using RAG pipeline.""",
tools=[self.store_memory, self.retrieve_memory]
)
def store_memory(self, experience: str, exercise_type: str) -> str:
"""存储安全演练经验到记忆库"""
embedding = self.vector_db.embed_text(experience)
memory_id = self.vector_db.store(embedding, experience, exercise_type)
return f"Memory stored with ID: {memory_id}"
def retrieve_memory(self, query: str, exercise_type: str = None) -> List[Dict]:
"""从记忆库检索相关经验"""
query_embedding = self.vector_db.embed_text(query)
results = self.vector_db.search(query_embedding, exercise_type)
return results
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
公众号二维码
更多推荐
所有评论(0)