项目概述

CAI(Cybersecurity AI)是一个开源的网络安全AI框架,旨在构建能够自主执行完整网络安全攻击链的智能体系统。该项目通过模块化的AI智能体架构,实现了从侦察到权限提升的全流程自动化安全测试,在CTF挑战中表现优异,平均比人类专家快11倍。

功能特性

  • 多智能体架构:包含红队、蓝队、DFIR、代码执行、内存分析等专业安全智能体
  • 自主攻击链执行:支持完整的网络安全攻击链自动化,包括侦察、漏洞利用、权限提升
  • 基准测试体系:集成CyberMetric、SecEval、CTIBench等多个安全评估基准
  • 隐私保护评估:提供CyberPII-Bench基准,专门评估LLM在安全场景中的PII处理能力
  • 并行执行模式:支持多智能体并行执行和协作模式
  • 记忆管理系统:基于RAG的记忆存储和检索,支持跨安全演练的知识复用
  • 工具集成丰富:集成Shodan、Nmap、Burp Suite等主流安全工具

安装指南

环境要求

  • Python 3.8+
  • Docker(可选,用于虚拟化环境)
  • 主流操作系统(Linux/macOS/Windows)

安装步骤

  1. 克隆项目仓库:
git clone https://github.com/aliasrobotics/CAI.git
cd CAI
  1. 安装依赖包:
pip install setuptools wheel twine build
  1. 安装核心框架:
python3 -m build
pip install -e .
  1. 安装基准测试依赖:
pip install cvss
git submodule update --init --recursive
  1. 配置环境变量(在.env文件中):
OPENAI_API_KEY="your_api_key"
ANTHROPIC_API_KEY="your_anthropic_key"
OPENROUTER_API_KEY="your_openrouter_key"
OLLAMA_API_BASE="http://localhost:11434"

使用说明

基本使用

运行基准测试评估:

python benchmarks/eval.py --model ollama/qwen2.5:14b \
    --dataset_file benchmarks/cybermetric/CyberMetric-80-v1.json \
    --eval cybermetric --backend ollama

启动CLI交互界面:

python -m cai.cli

智能体使用示例

红队智能体执行渗透测试:

from cai.agents.red_teamer import redteam_agent

# 执行侦察任务
result = redteam_agent.run("对目标网络进行端口扫描和服务识别")

蓝队智能体进行防御分析:

from cai.agents.blue_team import blueteam_agent

# 执行安全监控
result = blueteam_agent.run("检查系统日志中的异常登录行为")

并行执行模式

配置并行智能体执行:

from cai.repl.commands.parallel import ParallelConfig

# 配置红蓝队并行执行
parallel_configs = [
    ParallelConfig("redteam_agent"),
    ParallelConfig("blueteam_agent")
]

核心代码

红队智能体实现

"""Red Team Base Agent"""
import os
from dotenv import load_dotenv
from cai.sdk.agents import Agent, OpenAIChatCompletionsModel
from openai import AsyncOpenAI
from cai.util import load_prompt_template, create_system_prompt_renderer
from cai.tools.command_and_control.sshpass import run_ssh_command_with_credentials
from cai.tools.reconnaissance.generic_linux_command import generic_linux_command
from cai.tools.web.search_web import make_google_search
from cai.tools.reconnaissance.exec_code import execute_code
from cai.tools.reconnaissance.shodan import shodan_search, shodan_host_info

load_dotenv()

# 加载系统提示词
bug_bounter_system_prompt = load_prompt_template("prompts/system_bug_bounter.md")

# 定义工具列表
tools = [
    generic_linux_command,
    execute_code,
    shodan_search,
    shodan_host_info
]

if os.getenv('GOOGLE_SEARCH_API_KEY') and os.getenv('GOOGLE_SEARCH_CX'):
    tools.append(make_google_search)

# 创建红队智能体实例
bug_bounter_agent = Agent(
    name="Bug Bounter",
    instructions=create_system_prompt_renderer(bug_bounter_system_prompt),
    description="""Agent that specializes in bug bounty hunting and vulnerability discovery.
                   Expert in web security, API testing, and responsible disclosure.""",
    tools=tools,
    model=OpenAIChatCompletionsModel(
        model=os.getenv('CAI_MODEL', "alias0"),
        openai_client=AsyncOpenAI(),
    )
)

代码执行智能体

"""A Coding Agent (CodeAgent)"""
import copy
import platform
import re
import signal
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from cai.agents.meta.local_python_executor import LocalPythonInterpreter
from cai.sdk.agents import Agent, Result, OpenAIChatCompletionsModel
from dotenv import load_dotenv
from openai import AsyncOpenAI

class CodeAgentException(Exception):
    """Base exception class for CodeAgent-related errors."""
    pass

class CodeAgent(Agent):
    """CodeAgent that uses executable Python code to consolidate LLM agents' actions."""
    
    def __init__(self, model_name: str = "alias0"):
        # 初始化Python解释器
        self.interpreter = LocalPythonInterpreter()
        
        # 初始化父类
        super().__init__(
            name="CodeAgent",
            description="Agent that uses executable Python code for security testing",
            instructions="""You are a CodeAgent specialized in cybersecurity tasks.
                         Use Python code to analyze, exploit, and assess security vulnerabilities.""",
            model=OpenAIChatCompletionsModel(
                model=model_name,
                openai_client=AsyncOpenAI(),
            ),
            tools=[self.execute_python_code]
        )
    
    def execute_python_code(self, code: str) -> str:
        """Execute Python code in a secure environment."""
        try:
            result = self.interpreter.execute(code)
            return str(result)
        except Exception as e:
            return f"Execution error: {str(e)}"

基准测试评估系统

"""Benchmark Evaluation Script"""
import argparse
import json
from typing import Dict, List

def evaluate_model(model_name: str, dataset_file: str, eval_type: str, backend: str):
    """
    评估语言模型在网络安全相关多选题基准上的表现
    
    Args:
        model_name: 模型名称(如 "gpt-4", "qwen2.5:14b")
        dataset_file: 数据集文件路径(JSON或TSV格式)
        eval_type: 评估类型("cybermetric", "seceval", "cti_bench")
        backend: 后端类型("openai", "openrouter", "ollama", "anthropic")
    """
    # 加载数据集
    if dataset_file.endswith('.json'):
        with open(dataset_file, 'r') as f:
            dataset = json.load(f)
    else:
        dataset = load_tsv_dataset(dataset_file)
    
    # 初始化评估器
    evaluator = get_evaluator(eval_type)
    
    # 运行评估
    results = []
    for item in dataset:
        response = query_model(model_name, item['question'], backend)
        score = evaluator.evaluate(response, item['answer'])
        results.append(score)
    
    # 计算总体得分
    final_score = sum(results) / len(results)
    return final_score

def main():
    parser = argparse.ArgumentParser(description='网络安全AI基准评估')
    parser.add_argument('-m', '--model', required=True, help='要评估的模型名称')
    parser.add_argument('-d', '--dataset_file', required=True, help='数据集文件路径')
    parser.add_argument('-e', '--eval', required=True, help='评估基准类型')
    parser.add_argument('-B', '--backend', required=True, help='后端类型')
    
    args = parser.parse_args()
    score = evaluate_model(args.model, args.dataset_file, args.eval, args.backend)
    print(f"模型 {args.model}{args.eval} 基准上的得分: {score:.2%}")

记忆管理系统

"""Memory agent for CAI with RAG support"""
import os
from typing import Dict, List
from cai.sdk.agents import Agent
from cai.rag.vector_db import VectorDatabase

class MemoryAgent(Agent):
    """记忆管理智能体,支持长期经验存储和检索"""
    
    def __init__(self):
        # 初始化向量数据库
        self.vector_db = VectorDatabase()
        
        super().__init__(
            name="Memory Agent",
            description="Manages long-term memory experiences across security exercises",
            instructions="""You manage episodic and semantic memory for security exercises.
                         Store experiences in vector database and retrieve using RAG pipeline.""",
            tools=[self.store_memory, self.retrieve_memory]
        )
    
    def store_memory(self, experience: str, exercise_type: str) -> str:
        """存储安全演练经验到记忆库"""
        embedding = self.vector_db.embed_text(experience)
        memory_id = self.vector_db.store(embedding, experience, exercise_type)
        return f"Memory stored with ID: {memory_id}"
    
    def retrieve_memory(self, query: str, exercise_type: str = None) -> List[Dict]:
        """从记忆库检索相关经验"""
        query_embedding = self.vector_db.embed_text(query)
        results = self.vector_db.search(query_embedding, exercise_type)
        return results

更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
公众号二维码
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Logo

更多推荐