第三章 自进化超级智能体——系统架构设计与蓝图

“架构不是关于技术选型,而是关于如何组织复杂性,使系统能够在不确定的环境中持续演进。”
—— Martin Fowler,《企业应用架构模式》


章节导读

本章是全书的核心技术章节。我们将从Agent的本质出发,逐层剖析自进化超级智能体的完整架构蓝图。不同于停留在概念层面的讨论,本章将提供:

  • 可实施的架构设计:每个设计决策都附带权衡分析和替代方案
  • 完整的代码实现:超过500行Python核心框架代码
  • 真实的工业案例:Airbnb、Shopify、Stripe的实战经验
  • 立即可用的基础设施配置:完整的Kubernetes Helm Chart

读完本章,你将能够:

  1. 理解Agent与传统自动化的本质区别
  2. 掌握五层架构(感知-认知-决策-执行-进化)的设计方法
  3. 搭建一个可运行的基础Agent框架
  4. 针对真实业务场景进行架构裁剪

3.1 Agent的本质:从程序到自主体

3.1.1 Agent的定义与分类

什么是Agent?

在人工智能领域,Agent(智能体)的定义经历了数十年的演变。Russell和Norvig在《人工智能:一种现代方法》中给出了经典定义:

智能体是任何能够感知其环境并对该环境采取行动的事物。

这个定义看似简单,却包含了三个关键要素:感知(Perceive)决策(Decide)行动(Act)

在软件工程领域,我们对Agent的理解需要更加具体和实用。一个软件交付Agent必须能够:

  1. 感知环境状态:读取Prometheus指标、解析日志流、理解系统拓扑
  2. 形成内部表征:将原始数据转化为结构化的情境理解
  3. 制定行动计划:基于当前状态和目标,选择最优行动序列
  4. 执行并观察:实施行动并监控其效果
  5. 从经验中学习:根据执行结果更新自身的知识和策略

四种基础Agent类型

根据内部架构的复杂度,我们可以将Agent分为四种基本类型:

┌─────────────────────────────────────────────────────────────────┐
│                        Agent类型分类                              │
├────────────────────┬────────────────────────────────────────────┤
│  类型               │  特征                                       │
├────────────────────┼────────────────────────────────────────────┤
│  简单反射Agent       │  条件→动作规则,无内部状态                    │
│  基于模型的Agent     │  维护世界模型,理解部分可观测环境              │
│  目标驱动Agent       │  具备目标表征,可规划多步行动序列              │
│  学习型Agent        │  通过经验改进行为,适应新环境                  │
└────────────────────┴────────────────────────────────────────────┘

简单反射Agent(Simple Reflex Agent)

这是最基础的Agent形式。其决策逻辑完全由一组条件-动作规则(Condition-Action Rules)驱动:

# 简单反射Agent示例:基于阈值的告警响应
class SimpleReflexAgent:
    def __init__(self):
        self.rules = [
            {"condition": lambda m: m["cpu_usage"] > 90, 
             "action": "scale_up"},
            {"condition": lambda m: m["error_rate"] > 0.05,
             "action": "rollback"},
            {"condition": lambda m: m["latency_p99"] > 500,
             "action": "alert_oncall"},
        ]
    
    def act(self, metrics: dict) -> str:
        for rule in self.rules:
            if rule["condition"](metrics):
                return rule["action"]
        return "no_action"

优点:实现简单,响应快速,行为可预测。
缺点:无法处理规则冲突,不理解行动的后果,在复杂场景下失效。

在实际的运维场景中,简单反射Agent就是我们熟悉的"PagerDuty告警规则"——CPU超过90%就告警,这类规则存在大量误报。

基于模型的Agent(Model-Based Agent)

基于模型的Agent维护一个关于世界的内部表征(Internal State),用于处理环境的部分可观测性:

class ModelBasedAgent:
    def __init__(self):
        self.world_model = {
            "services": {},      # 服务状态快照
            "deployments": [],   # 进行中的部署
            "dependencies": {},  # 服务依赖图
        }
    
    def update_model(self, observation: dict):
        """根据新观测更新世界模型"""
        service_id = observation["service_id"]
        self.world_model["services"][service_id] = {
            "health": observation["health_score"],
            "last_deploy": observation["last_deploy_time"],
            "error_rate": observation["error_rate"],
            "updated_at": time.time()
        }
    
    def act(self, current_observation: dict) -> str:
        self.update_model(current_observation)
        # 基于完整世界模型做决策,而非仅当前观测
        return self._decide_based_on_model()
    
    def _decide_based_on_model(self) -> str:
        # 检查服务间依赖关系,避免级联故障
        for service_id, state in self.world_model["services"].items():
            if state["error_rate"] > 0.05:
                # 检查是否有依赖此服务的上游服务
                upstreams = self.world_model["dependencies"].get(service_id, [])
                if upstreams:
                    return f"quarantine_{service_id}"
        return "normal"

目标驱动Agent(Goal-Based Agent)

目标驱动Agent不仅知道"现在的状态",还明确知道"期望的状态",能够规划达成目标的行动序列:

class GoalBasedAgent:
    def __init__(self, goal_slo: dict):
        self.goal_slo = goal_slo  # 目标服务级别目标
        # 例如: {"error_rate": 0.001, "p99_latency_ms": 200, "availability": 0.9999}
    
    def plan(self, current_state: dict) -> list:
        """生成达成目标的行动序列"""
        plan = []
        
        # 计算与目标的差距
        gaps = self._compute_gaps(current_state)
        
        # 按优先级排序问题
        prioritized_gaps = sorted(gaps, key=lambda x: x["severity"], reverse=True)
        
        for gap in prioritized_gaps:
            actions = self._generate_actions_for_gap(gap)
            plan.extend(actions)
        
        return plan
    
    def _compute_gaps(self, state: dict) -> list:
        gaps = []
        for metric, target in self.goal_slo.items():
            current = state.get(metric, 0)
            if metric == "error_rate" and current > target:
                gaps.append({
                    "metric": metric,
                    "current": current,
                    "target": target,
                    "severity": (current - target) / target
                })
        return gaps

学习型Agent(Learning Agent)

学习型Agent是最复杂也是最强大的形式。它能够从历史经验中提炼规律,不断改进自身的决策质量:

class LearningAgent:
    def __init__(self):
        self.experience_buffer = []  # 经验回放缓冲区
        self.policy_model = None     # 策略模型(可以是ML模型或LLM)
        self.performance_metrics = [] # 性能历史
    
    def act(self, observation: dict) -> str:
        action = self.policy_model.predict(observation)
        return action
    
    def learn(self, observation: dict, action: str, reward: float, next_obs: dict):
        """记录经验并更新策略"""
        experience = {
            "observation": observation,
            "action": action,
            "reward": reward,
            "next_observation": next_obs,
            "timestamp": time.time()
        }
        self.experience_buffer.append(experience)
        
        # 当积累足够经验后,触发模型更新
        if len(self.experience_buffer) >= self.batch_size:
            self._update_policy()
    
    def _update_policy(self):
        """从经验缓冲区学习,更新策略模型"""
        batch = random.sample(self.experience_buffer, self.batch_size)
        # 使用强化学习算法(如PPO、SAC)更新策略
        self.policy_model.fit(batch)

软件交付Agent的核心特征

在软件交付场景中,Agent需要具备以下特征,这些特征将其与简单自动化脚本根本区分开来:

特征维度 传统自动化脚本 软件交付Agent
决策方式 预定义规则,无例外处理 上下文感知,动态决策
失败处理 失败即停止 自适应重试,备选方案
学习能力 无,每次执行完全独立 从历史学习,持续优化
目标理解 执行步骤,不理解目的 理解业务目标,灵活路径
环境感知 仅感知触发条件 全面感知多维环境状态
协作能力 独立运行 可与其他Agent协作
解释能力 日志记录执行步骤 能解释决策理由
边界识别 按配置执行 识别超出能力的场景并升级

Agent成熟度模型(Level 0-5)

与驾驶自动化的SAE Level类似,我们定义软件交付Agent的成熟度等级:

Level 0 - 完全手动(Full Manual)
  • 所有操作需要人工执行
  • 工具辅助但不自动化
  • 例:Confluence文档+人工操作Kubectl

Level 1 - 辅助自动化(Assisted Automation)  
  • 单一场景的脚本自动化
  • 人工触发,机器执行
  • 例:Jenkins Pipeline,GitHub Actions

Level 2 - 部分自动化(Partial Automation)
  • 多场景自动化,规则驱动
  • 人工监督,关键节点确认
  • 例:带审批门控的CD流水线

Level 3 - 条件自动化(Conditional Automation)
  • 环境感知,动态决策
  • 部分场景全自动,复杂场景人工干预
  • 例:带智能回滚的Canary发布系统

Level 4 - 高度自动化(High Automation)
  • 大多数场景全自动,包括异常处理
  • 人工仅在定义边界外介入
  • 例:具备学习能力的自适应发布系统

Level 5 - 完全自动化(Full Automation)
  • 所有场景全自动
  • 自我优化,持续进化
  • 例:本书的自进化超级智能体

当前大多数企业处于Level 1-2,领先企业(Netflix、Google、Airbnb)正在探索Level 3-4,Level 5是我们本书的目标架构。


3.1.2 软件交付场景的Agent建模

要构建一个有效的软件交付Agent,我们需要首先对其运行的环境和任务空间进行精确建模。这一步骤是整个架构设计的基础,直接决定了系统的能力边界。

环境建模:定义Agent的世界

在强化学习理论中,我们用马尔可夫决策过程(MDP)来描述Agent与环境的交互:

MDP = (S, A, P, R, γ)
  S  - 状态空间(State Space)
  A  - 行动空间(Action Space)
  P  - 转移概率(Transition Probability)
  R  - 奖励函数(Reward Function)
  γ  - 折扣因子(Discount Factor)

对于软件交付Agent,这个抽象框架具体化为:

状态空间(S)的定义

软件交付的状态空间可以从三个维度来刻画:

@dataclass
class DeliveryEnvironmentState:
    """软件交付环境的完整状态表征"""
    
    # 维度1:CI/CD流水线状态
    pipeline_state: PipelineState = field(default_factory=PipelineState)
    
    # 维度2:生产环境状态
    production_state: ProductionState = field(default_factory=ProductionState)
    
    # 维度3:成本与资源状态
    resource_state: ResourceState = field(default_factory=ResourceState)
    
    # 维度4:外部上下文(时间、活动、依赖服务)
    context: EnvironmentContext = field(default_factory=EnvironmentContext)

@dataclass
class PipelineState:
    """流水线当前状态"""
    active_deployments: List[Deployment]   # 进行中的部署
    pending_approvals: List[ApprovalGate]  # 待审批节点
    test_results: Dict[str, TestResult]    # 测试结果缓存
    artifact_registry: Dict[str, Artifact] # 制品注册表

@dataclass  
class ProductionState:
    """生产环境状态"""
    services: Dict[str, ServiceHealth]     # 服务健康状态
    error_rates: Dict[str, float]          # 错误率(按服务)
    latency_percentiles: Dict[str, LatencyStats]  # 延迟分布
    traffic_patterns: TrafficPattern       # 当前流量模式
    active_incidents: List[Incident]       # 进行中的事故

@dataclass
class ResourceState:
    """资源与成本状态"""
    cpu_utilization: Dict[str, float]      # CPU使用率
    memory_utilization: Dict[str, float]   # 内存使用率
    pod_counts: Dict[str, int]             # Pod数量
    hourly_cost: float                     # 当前每小时成本
    cost_budget_remaining: float           # 剩余预算

@dataclass
class EnvironmentContext:
    """环境上下文"""
    timestamp: datetime                    # 当前时间
    is_peak_hours: bool                    # 是否高峰时段
    active_marketing_campaigns: List[str]  # 活跃营销活动
    dependency_health: Dict[str, bool]     # 外部依赖健康
    on_call_engineer: str                  # 当前值班工程师

感知接口:数据采集管道

Agent的"感官"是其感知环境的接口。在软件交付场景中,这包括四类核心数据源:

┌─────────────────────────────────────────────────────────────────┐
│                      感知数据分类                                  │
├────────────────┬────────────────────────────────────────────────┤
│  数据类型        │  具体指标/事件                                   │
├────────────────┼────────────────────────────────────────────────┤
│  Metrics       │  RED指标(请求率/错误率/延迟)                      │
│                │  USE指标(利用率/饱和度/错误)                      │
│                │  业务指标(转化率/GMV/活跃用户)                    │
├────────────────┼────────────────────────────────────────────────┤
│  Logs          │  应用错误日志                                    │
│                │  访问日志(Nginx/ALB)                            │
│                │  审计日志                                        │
├────────────────┼────────────────────────────────────────────────┤
│  Traces        │  分布式追踪(Jaeger/Zipkin)                      │
│                │  服务调用链路                                     │
│                │  数据库查询性能                                   │
├────────────────┼────────────────────────────────────────────────┤
│  Events        │  Kubernetes事件                                 │
│                │  部署状态变更                                    │
│                │  告警触发/恢复                                   │
└────────────────┴────────────────────────────────────────────────┘

行动空间(A)的定义

Agent的行动空间定义了它能够对环境施加哪些干预。对于软件交付Agent,行动可以分为五大类:

class ActionSpace:
    """软件交付Agent的完整行动空间"""
    
    # 部署操作
    DEPLOY_ACTIONS = [
        "start_canary_deployment",      # 开始金丝雀发布(初始5%流量)
        "advance_canary",               # 推进金丝雀(5%→20%→50%→100%)
        "pause_canary",                 # 暂停金丝雀发布
        "abort_canary",                 # 中止金丝雀,全量回滚
        "blue_green_cutover",           # 蓝绿切换
        "rolling_update",               # 滚动更新
    ]
    
    # 回滚操作
    ROLLBACK_ACTIONS = [
        "rollback_to_previous",         # 回滚到上一版本
        "rollback_to_stable",           # 回滚到最近稳定版本
        "partial_rollback",             # 部分回滚(特定区域/集群)
        "emergency_rollback",           # 紧急完全回滚
    ]
    
    # 扩缩容操作
    SCALING_ACTIONS = [
        "scale_up_pods",                # 增加Pod数量
        "scale_down_pods",              # 减少Pod数量
        "scale_up_nodes",               # 增加节点
        "enable_hpa",                   # 启用自动扩缩容
        "adjust_resource_limits",       # 调整资源限制
    ]
    
    # 流量管理
    TRAFFIC_ACTIONS = [
        "shift_traffic_percentage",     # 调整流量分配
        "enable_circuit_breaker",       # 启用熔断器
        "add_rate_limiting",            # 添加限流
        "enable_bulkhead",              # 隔离故障服务
    ]
    
    # 通知与升级
    NOTIFICATION_ACTIONS = [
        "send_slack_alert",             # 发送Slack告警
        "page_on_call",                 # 触发PagerDuty
        "create_incident",              # 创建事故工单
        "request_human_approval",       # 请求人工审批
        "escalate_to_manager",          # 升级给管理者
    ]

奖励信号的设计

奖励函数是强化学习中最难设计的部分。一个设计不当的奖励函数会导致Agent的行为偏离我们的意图(奖励破解,Reward Hacking)。

对于软件交付场景,我们设计一个多目标奖励函数:

class RewardFunction:
    """
    软件交付Agent奖励函数设计
    
    核心原则:
    1. 对齐业务目标(SLO达成)
    2. 惩罚负面影响(故障、延迟、成本浪费)
    3. 鼓励主动优化(而非被动响应)
    """
    
    def __init__(self):
        # 权重配置(可通过业务优先级调整)
        self.weights = {
            "slo_achievement": 0.4,      # SLO达成:最高权重
            "deployment_success": 0.3,   # 部署成功率
            "cost_efficiency": 0.15,     # 成本效率
            "mttr_improvement": 0.15,    # MTTR改善
        }
    
    def compute_reward(
        self,
        pre_action_state: dict,
        action: str,
        post_action_state: dict,
        outcome: str
    ) -> float:
        """计算单步奖励"""
        
        reward = 0.0
        
        # 1. SLO达成奖励
        slo_score = self._compute_slo_reward(pre_action_state, post_action_state)
        reward += self.weights["slo_achievement"] * slo_score
        
        # 2. 部署成功奖励
        deploy_score = self._compute_deployment_reward(outcome, action)
        reward += self.weights["deployment_success"] * deploy_score
        
        # 3. 成本效率奖励
        cost_score = self._compute_cost_reward(pre_action_state, post_action_state)
        reward += self.weights["cost_efficiency"] * cost_score
        
        # 4. 严重惩罚项(不可通过其他奖励抵消)
        if outcome == "production_incident":
            reward -= 10.0  # 生产事故:严重惩罚
        if outcome == "data_loss":
            reward -= 100.0  # 数据丢失:极端惩罚
            
        return reward
    
    def _compute_slo_reward(self, pre: dict, post: dict) -> float:
        """SLO改善奖励"""
        pre_slo = self._slo_compliance_score(pre)
        post_slo = self._slo_compliance_score(post)
        return (post_slo - pre_slo) * 10  # 归一化到[-10, 10]
    
    def _slo_compliance_score(self, state: dict) -> float:
        """计算SLO达成分数 [0, 1]"""
        scores = []
        # 错误率:目标 < 0.1%
        error_rate = state.get("error_rate", 0)
        scores.append(max(0, 1 - error_rate / 0.001))
        # P99延迟:目标 < 500ms
        p99 = state.get("p99_latency_ms", 0)
        scores.append(max(0, 1 - p99 / 500))
        # 可用性:目标 > 99.9%
        availability = state.get("availability", 1.0)
        scores.append(availability / 0.999)
        
        return sum(scores) / len(scores)

3.1.3 多Agent协作架构

单一Agent难以处理软件交付的全部复杂性。在大规模系统中,我们通常采用多Agent协作架构,将问题分解为多个专业化Agent共同解决。

专用Agent vs 通用Agent的权衡

┌─────────────────────────────────────────────────────────────────┐
│                   Agent专化程度的权衡                              │
├──────────────────────┬──────────────────────────────────────────┤
│  专用Agent            │  通用Agent                               │
├──────────────────────┼──────────────────────────────────────────┤
│  在特定域内性能最优      │  跨域任务灵活处理                          │
│  推理成本低            │  推理成本高                               │
│  边界清晰,易于测试     │  行为多变,难以预测                         │
│  知识迁移能力弱         │  知识迁移能力强                            │
│  维护多个Agent负担重    │  单一入口,维护简单                         │
│  适合:高频确定性任务   │  适合:低频复杂任务                         │
└──────────────────────┴──────────────────────────────────────────┘

我们的架构采用混合策略:核心操作使用专用Agent(保证效率和安全),跨域协调使用Orchestrator Agent(保证灵活性)。

Agent间通信协议设计

Agent间通信需要满足:异步解耦、消息可靠投递、类型安全三个核心需求。我们定义统一的Agent通信协议(ACP, Agent Communication Protocol):

from dataclasses import dataclass
from typing import Any, Optional, Union
from enum import Enum
import uuid
import time

class MessageType(Enum):
    """Agent消息类型"""
    REQUEST = "request"           # 请求执行某动作
    RESPONSE = "response"         # 响应请求结果
    EVENT = "event"               # 状态变更事件
    QUERY = "query"               # 查询信息
    BROADCAST = "broadcast"       # 广播消息给所有Agent

class Priority(Enum):
    """消息优先级"""
    CRITICAL = 0    # 生产事故:立即处理
    HIGH = 1        # 部署失败:30秒内处理
    NORMAL = 2      # 常规操作:5分钟内处理
    LOW = 3         # 优化建议:1小时内处理

@dataclass
class AgentMessage:
    """Agent间通信的标准消息格式"""
    
    # 消息元数据
    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    correlation_id: Optional[str] = None    # 关联请求的ID
    timestamp: float = field(default_factory=time.time)
    
    # 路由信息
    sender_id: str = ""                      # 发送方Agent ID
    receiver_id: str = ""                    # 接收方Agent ID(可以是广播)
    message_type: MessageType = MessageType.EVENT
    priority: Priority = Priority.NORMAL
    
    # 消息内容
    action: str = ""                         # 请求的操作
    payload: dict = field(default_factory=dict)  # 操作参数
    context: dict = field(default_factory=dict)  # 上下文信息
    
    # 结果(用于Response类型)
    success: Optional[bool] = None
    result: Optional[Any] = None
    error: Optional[str] = None
    
    # 可靠性
    ttl_seconds: int = 300                   # 消息存活时间
    retry_count: int = 0                     # 当前重试次数
    max_retries: int = 3                     # 最大重试次数

Orchestrator + Worker 模式

这是我们架构的核心协作模式。Orchestrator负责任务分解和协调,Worker Agent负责具体执行:

                    ┌──────────────────┐
                    │   Orchestrator   │
                    │  Agent (主控器)   │
                    └────────┬─────────┘
                             │ 分解任务
                    ┌────────┼─────────┐
                    ▼        ▼         ▼
          ┌─────────────┐  ┌──────┐  ┌──────────────┐
          │  Deployment │  │ Risk │  │   Rollback   │
          │    Agent    │  │Agent │  │    Agent     │
          └─────────────┘  └──────┘  └──────────────┘
                    Worker Agents(专业执行者)
class OrchestratorAgent:
    """
    主控Agent:负责高层任务分解和多Agent协调
    """
    
    def __init__(self, message_bus: MessageBus):
        self.message_bus = message_bus
        self.worker_registry: Dict[str, WorkerCapability] = {}
        self.active_tasks: Dict[str, TaskExecution] = {}
    
    async def handle_deployment_request(
        self, 
        request: DeploymentRequest
    ) -> DeploymentPlan:
        """
        处理部署请求:分解任务并协调多个Worker Agent
        """
        
        # 步骤1:风险评估(委托给Risk Agent)
        risk_report = await self._delegate(
            worker="risk_agent",
            action="assess_deployment_risk",
            payload={
                "service": request.service_name,
                "version": request.target_version,
                "environment": request.environment
            }
        )
        
        # 步骤2:基于风险选择部署策略
        strategy = self._select_deployment_strategy(
            risk_score=risk_report.overall_score,
            service_criticality=request.criticality
        )
        
        # 步骤3:并行执行前置检查
        pre_checks = await asyncio.gather(
            self._delegate("health_agent", "check_cluster_health", {}),
            self._delegate("capacity_agent", "verify_capacity", {
                "replicas": strategy.target_replicas
            }),
            self._delegate("dependency_agent", "check_dependencies", {
                "service": request.service_name
            })
        )
        
        # 步骤4:如果前置检查通过,开始部署
        if all(check.passed for check in pre_checks):
            deployment_task = await self._delegate(
                "deployment_agent",
                "execute_deployment",
                {"strategy": strategy, "request": request}
            )
            
            # 步骤5:持续监控部署进度
            return await self._monitor_deployment(deployment_task.id)
        else:
            # 前置检查失败,请求人工介入
            await self._delegate(
                "notification_agent",
                "request_human_review",
                {"reason": "pre_checks_failed", "details": pre_checks}
            )
    
    def _select_deployment_strategy(
        self, 
        risk_score: float, 
        service_criticality: str
    ) -> DeploymentStrategy:
        """基于风险分数选择部署策略"""
        
        if service_criticality == "critical" or risk_score > 0.7:
            return CanaryStrategy(
                initial_weight=1,       # 从1%流量开始
                increment_step=5,       # 每步增加5%
                observation_duration=300  # 每步观察5分钟
            )
        elif risk_score > 0.4:
            return CanaryStrategy(
                initial_weight=5,
                increment_step=20,
                observation_duration=120
            )
        else:
            return RollingUpdateStrategy(
                max_surge=25,
                max_unavailable=0
            )
    
    async def _delegate(
        self, 
        worker: str, 
        action: str, 
        payload: dict
    ) -> AgentResponse:
        """向Worker Agent委派任务"""
        
        msg = AgentMessage(
            sender_id="orchestrator",
            receiver_id=worker,
            message_type=MessageType.REQUEST,
            action=action,
            payload=payload,
            priority=Priority.HIGH
        )
        
        # 发送请求并等待响应(带超时)
        response = await self.message_bus.send_and_wait(
            msg, 
            timeout_seconds=60
        )
        
        if not response.success:
            raise AgentDelegationError(
                f"Worker {worker} failed to execute {action}: {response.error}"
            )
        
        return response

共识机制:多Agent决策冲突解决

当多个Agent对同一问题给出不同的判断时,我们需要一个可靠的共识机制:

class ConsensusEngine:
    """
    多Agent共识引擎
    用于解决Agent间的决策冲突
    """
    
    def __init__(self, quorum_threshold: float = 0.6):
        self.quorum_threshold = quorum_threshold  # 60%多数同意即达成共识
    
    async def reach_consensus(
        self,
        question: str,
        participating_agents: List[str],
        context: dict
    ) -> ConsensusResult:
        """
        向多个Agent发起投票,达成共识
        
        应用场景:
        - 是否需要回滚当前部署?
        - 是否需要触发PagerDuty?
        - 是否允许在当前高风险时间窗口发布?
        """
        
        # 并行收集所有Agent的意见
        votes = await asyncio.gather(*[
            self._get_agent_vote(agent_id, question, context)
            for agent_id in participating_agents
        ])
        
        # 统计投票结果
        positive_votes = [v for v in votes if v.decision == "yes"]
        confidence_weighted_score = sum(
            v.confidence for v in positive_votes
        ) / len(votes)
        
        # 判断是否达成共识
        simple_majority = len(positive_votes) / len(votes)
        
        if simple_majority >= self.quorum_threshold:
            decision = "approved"
        elif simple_majority < (1 - self.quorum_threshold):
            decision = "rejected"
        else:
            decision = "escalate_to_human"  # 无法达成共识,升级
        
        return ConsensusResult(
            question=question,
            decision=decision,
            votes=votes,
            confidence=confidence_weighted_score,
            requires_human=decision == "escalate_to_human"
        )

3.2 自进化架构的核心设计原则

本节详细阐述自进化超级智能体的五层架构设计。每一层都有明确的职责边界和接口规范,共同构成一个具备持续学习能力的完整系统。

3.2.1 感知层设计(Perception Layer)

感知层是Agent与外部世界交互的"感官系统"。其核心目标是:将分散、异构、嘈杂的原始数据转化为结构化、高质量的情境信息。

统一遥测标准:OpenTelemetry详解

OpenTelemetry(OTel)是云原生可观测性的工业标准。采用OTel的最大优势是供应商无关性——今天用Jaeger,明天换Zipkin,不需要修改应用代码。

OpenTelemetry的核心概念:

OpenTelemetry 数据模型

Traces(追踪)
  └── Span(操作片段)
        ├── TraceID(全局唯一追踪ID)
        ├── SpanID(当前Span ID)
        ├── ParentSpanID(父Span,构成调用链)
        ├── Name(操作名称)
        ├── StartTime / EndTime
        ├── Attributes(键值对,最多128个)
        ├── Events(时间戳+消息的有序列表)
        └── Status(OK/Error)

Metrics(指标)
  └── 五种数据类型:
        ├── Counter(单调递增计数器)
        ├── Gauge(可上下波动的值)
        ├── Histogram(值分布统计)
        ├── UpDownCounter(可增可减的计数器)
        └── ObservableXxx(异步观测版本)

Logs(日志)
  └── LogRecord
        ├── Timestamp
        ├── SeverityNumber(1-24,对应TRACE/DEBUG/INFO/WARN/ERROR/FATAL)
        ├── Body(日志消息)
        └── Attributes(结构化字段)

在Python应用中集成OpenTelemetry:

# requirements.txt
# opentelemetry-api==1.20.0
# opentelemetry-sdk==1.20.0
# opentelemetry-exporter-otlp==1.20.0
# opentelemetry-instrumentation-fastapi==0.41b0

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

def setup_telemetry(service_name: str, otlp_endpoint: str):
    """初始化OpenTelemetry"""
    
    # 配置Trace Provider
    trace_provider = TracerProvider(
        resource=Resource.create({
            "service.name": service_name,
            "service.version": os.getenv("APP_VERSION", "unknown"),
            "deployment.environment": os.getenv("ENVIRONMENT", "production"),
        })
    )
    trace_provider.add_span_processor(
        BatchSpanProcessor(
            OTLPSpanExporter(endpoint=otlp_endpoint)
        )
    )
    trace.set_tracer_provider(trace_provider)
    
    # 配置Metrics Provider
    metric_reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(endpoint=otlp_endpoint),
        export_interval_millis=10000  # 10秒导出一次
    )
    metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
    
    return trace.get_tracer(service_name), metrics.get_meter(service_name)

# 在业务代码中使用
tracer, meter = setup_telemetry("payment-service", "http://otel-collector:4317")

# 自定义业务指标
deployment_counter = meter.create_counter(
    "deployments_total",
    description="Total number of deployments",
    unit="1"
)

deployment_duration = meter.create_histogram(
    "deployment_duration_seconds",
    description="Deployment duration in seconds",
    unit="s"
)

deployment_success_rate = meter.create_observable_gauge(
    "deployment_success_rate",
    callbacks=[lambda: deployment_success_rate_value],
    description="Deployment success rate (7 day rolling)",
    unit="1"
)

# 在部署完成时记录
async def record_deployment_completion(
    service: str,
    version: str,
    duration: float,
    success: bool
):
    deployment_counter.add(1, {
        "service": service,
        "version": version,
        "status": "success" if success else "failure",
        "environment": "production"
    })
    
    deployment_duration.record(duration, {"service": service})

多源数据融合架构

单一数据源无法提供完整的环境视图。我们的感知层需要融合四类数据源:

┌─────────────────────────────────────────────────────────────────┐
│                     多源数据融合管道                               │
├───────────────────────────────────────────────────────────────┐ │
│  数据源层                                                        │ │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│  │Prometheus│ │Loki/ELK  │ │  Jaeger  │ │  业务指标API      │ │ │
│  │(Metrics) │ │ (Logs)   │ │ (Traces) │ │  (GMV/用户数)    │ │ │
│  └────┬─────┘ └────┬─────┘ └────┬─────┘ └────────┬─────────┘ │ │
│       │             │             │                │           │ │
├───────┼─────────────┼─────────────┼────────────────┼───────────┘ │
│       ▼             ▼             ▼                ▼             │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │              Kafka 消息总线(统一数据通道)                    │ │
│  │  topic: metrics.raw | logs.error | traces.spans | biz.events│ │
│  └──────────────────────────┬──────────────────────────────────┘ │
│                             ▼                                     │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │              Flink 流处理引擎(实时计算)                      │ │
│  │  • 时序对齐(Event Time + Watermarks)                        │ │
│  │  • 数据关联(Trace与Metrics跨源JOIN)                         │ │
│  │  • 特征计算(滑动窗口聚合)                                    │ │
│  │  • 异常检测(在线学习模型)                                    │ │
│  └──────────────────────────┬──────────────────────────────────┘ │
│                             ▼                                     │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │              结构化环境状态(供Agent消费)                      │ │
│  └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

数据质量保障:去噪、填充、归一化

原始遥测数据充满噪声,直接喂给Agent会导致频繁误操作。数据质量处理是感知层不可或缺的模块:

class DataQualityProcessor:
    """
    遥测数据质量处理器
    
    处理顺序:
    1. 去噪(过滤毛刺)
    2. 缺失值填充
    3. 归一化(量纲统一)
    4. 异常检测(标记可疑数据点)
    """
    
    def __init__(self, config: DataQualityConfig):
        self.config = config
        self.z_score_threshold = 3.0      # 超过3σ视为异常
        self.ewma_alpha = 0.3             # 指数加权移动平均系数
    
    def denoise(self, series: pd.Series) -> pd.Series:
        """
        去噪:使用指数加权移动平均(EWMA)平滑时间序列
        
        选择EWMA而非简单移动平均的原因:
        - 对近期数据赋予更高权重
        - 更快响应真实的状态变化
        - 同时过滤随机噪声
        """
        return series.ewm(alpha=self.ewma_alpha, adjust=False).mean()
    
    def fill_missing(self, series: pd.Series) -> pd.Series:
        """
        缺失值填充策略:
        - 短期缺失(<5分钟):线性插值
        - 中期缺失(5-30分钟):前向填充
        - 长期缺失(>30分钟):标记为数据不可用,不填充
        """
        gap_threshold_short = pd.Timedelta(minutes=5)
        gap_threshold_medium = pd.Timedelta(minutes=30)
        
        # 检测连续缺失段
        null_groups = series.isnull().astype(int).groupby(
            (series.notnull().astype(int).cumsum())
        )
        
        result = series.copy()
        for _, group in null_groups:
            if group.sum() == 0:  # 非缺失段,跳过
                continue
            
            gap_duration = group.index[-1] - group.index[0]
            
            if gap_duration <= gap_threshold_short:
                # 短期缺失:线性插值
                result = result.interpolate(method='time')
            elif gap_duration <= gap_threshold_medium:
                # 中期缺失:前向填充
                result = result.fillna(method='ffill')
            else:
                # 长期缺失:保持NaN,下游Agent需处理数据不可用
                pass
        
        return result
    
    def normalize(
        self, 
        series: pd.Series, 
        method: str = "min_max"
    ) -> pd.Series:
        """
        归一化:将不同量纲的指标统一到[0, 1]区间
        """
        if method == "min_max":
            min_val = series.min()
            max_val = series.max()
            if max_val == min_val:
                return pd.Series([0.5] * len(series), index=series.index)
            return (series - min_val) / (max_val - min_val)
        
        elif method == "z_score":
            mean = series.mean()
            std = series.std()
            if std == 0:
                return pd.Series([0.0] * len(series), index=series.index)
            return (series - mean) / std
    
    def detect_anomalies(self, series: pd.Series) -> pd.Series:
        """
        异常检测:使用Z分数方法标记异常点
        返回布尔序列,True表示异常
        """
        z_scores = np.abs(stats.zscore(series.dropna()))
        anomaly_mask = pd.Series(False, index=series.index)
        anomaly_mask[series.dropna().index] = z_scores > self.z_score_threshold
        return anomaly_mask
    
    def process_pipeline(self, raw_series: pd.Series) -> ProcessedMetric:
        """完整的数据质量处理流水线"""
        
        # 1. 去噪
        denoised = self.denoise(raw_series)
        
        # 2. 缺失值填充
        filled = self.fill_missing(denoised)
        
        # 3. 归一化
        normalized = self.normalize(filled)
        
        # 4. 异常检测
        anomalies = self.detect_anomalies(raw_series)  # 对原始数据检测异常
        
        return ProcessedMetric(
            values=normalized,
            anomaly_flags=anomalies,
            data_quality_score=1 - (raw_series.isnull().sum() / len(raw_series)),
            has_significant_gaps=raw_series.isnull().sum() > len(raw_series) * 0.1
        )

3.2.2 认知层设计(Cognition Layer)

认知层是Agent的"大脑",负责将感知到的信息与历史经验相结合,形成决策所需的完整情境理解。其核心是记忆管理系统

四级记忆架构

受人类认知心理学启发,我们设计了四级记忆系统:

┌─────────────────────────────────────────────────────────────────┐
│                    四级记忆架构                                   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  工作记忆(Working Memory)                               │   │
│  │  • 容量:当前会话的完整上下文                               │   │
│  │  • 存储:内存(Redis)                                    │   │
│  │  • 时效:当前部署会话(数小时)                             │   │
│  │  • 内容:当前决策的所有相关信息                             │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  短期记忆(Short-Term Memory)                           │   │
│  │  • 容量:最近30天的部署历史                                │   │
│  │  • 存储:PostgreSQL(热数据)                             │   │
│  │  • 时效:30天                                            │   │
│  │  • 内容:近期决策-结果对,用于快速模式匹配                  │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  长期记忆(Long-Term Memory)                            │   │
│  │  • 容量:全历史部署记录                                    │   │
│  │  • 存储:PostgreSQL + 时序DB                            │   │
│  │  • 时效:永久(定期压缩)                                  │   │
│  │  • 内容:服务特征档案,历史模式,异常记录                    │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  知识图谱(Knowledge Graph)                             │   │
│  │  • 容量:系统全局知识                                     │   │
│  │  • 存储:Neo4j / 向量数据库                              │   │
│  │  • 时效:持续更新                                         │   │
│  │  • 内容:服务依赖关系,风险历史,最佳实践                    │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

知识图谱:服务依赖关系建模

服务依赖关系是风险评估中最重要的知识之一。我们用图数据结构建模:

@dataclass
class ServiceNode:
    """服务节点"""
    service_id: str
    service_name: str
    tier: str                    # "frontend" | "backend" | "data"
    criticality: str             # "critical" | "high" | "medium" | "low"
    slo_targets: Dict[str, float]
    
    # 历史数据
    avg_deployment_duration: float    # 平均部署时长(秒)
    deployment_success_rate: float    # 近30天部署成功率
    last_incident_date: Optional[datetime]
    incident_count_30d: int

@dataclass  
class ServiceDependency:
    """服务依赖边"""
    upstream: str       # 依赖方(调用者)
    downstream: str     # 被依赖方(提供者)
    dependency_type: str  # "hard" | "soft" | "async"
    # hard: 同步调用,下游不可用则上游请求失败
    # soft: 有降级逻辑,下游不可用时有备选
    # async: 异步消费,延迟可接受
    
    traffic_percentage: float  # 流量中有多少比例依赖此路径
    fallback_strategy: Optional[str]

class ServiceDependencyGraph:
    """
    服务依赖图:用于分析部署影响范围
    """
    
    def __init__(self):
        self.graph = nx.DiGraph()  # 有向图
        self.nodes: Dict[str, ServiceNode] = {}
    
    def add_service(self, node: ServiceNode):
        self.nodes[node.service_id] = node
        self.graph.add_node(node.service_id, **asdict(node))
    
    def add_dependency(self, dep: ServiceDependency):
        self.graph.add_edge(
            dep.upstream, 
            dep.downstream,
            **asdict(dep)
        )
    
    def get_blast_radius(
        self, 
        service_id: str, 
        failure_type: str = "total"
    ) -> BlastRadiusReport:
        """
        计算爆炸半径:service_id 发生故障后,哪些服务会受影响?
        
        这是部署风险评估的核心算法。
        """
        
        # 找到所有依赖此服务的上游服务(可能受影响的服务)
        affected_services = []
        
        for upstream in self.graph.predecessors(service_id):
            edge_data = self.graph.edges[upstream, service_id]
            dep_type = edge_data["dependency_type"]
            
            if failure_type == "total" and dep_type == "hard":
                # 硬依赖:上游服务会完全中断
                impact_level = "critical"
            elif failure_type == "degraded" and dep_type == "hard":
                # 性能下降:上游受影响但不中断
                impact_level = "high"
            elif dep_type == "soft":
                # 软依赖:上游会降级服务
                impact_level = "medium"
            else:
                impact_level = "low"
            
            affected_services.append({
                "service_id": upstream,
                "impact_level": impact_level,
                "dependency_type": dep_type,
                "traffic_affected": edge_data["traffic_percentage"]
            })
        
        # 递归计算传播链(二阶影响)
        secondary_affected = []
        for service in affected_services:
            if service["impact_level"] in ["critical", "high"]:
                secondary = self.get_blast_radius(
                    service["service_id"], 
                    failure_type="degraded"
                )
                secondary_affected.extend(secondary.direct_impact)
        
        return BlastRadiusReport(
            epicenter=service_id,
            direct_impact=affected_services,
            secondary_impact=secondary_affected,
            total_affected_count=len(affected_services) + len(secondary_affected),
            highest_criticality=max(
                [s["impact_level"] for s in affected_services], 
                default="none",
                key=lambda x: ["none", "low", "medium", "high", "critical"].index(x)
            )
        )

3.2.3 决策层设计(Decision Layer)

决策层是整个系统的核心,负责将情境信息转化为具体的行动计划。我们采用三路径决策架构(3-Path Decision Architecture),根据问题的复杂度和紧迫性选择不同的决策机制。

分层决策架构

┌─────────────────────────────────────────────────────────────────┐
│                    分层决策架构                                   │
│                                                                 │
│  战略层(Strategic Layer)                                        │
│  • 时间跨度:周/月级别                                            │
│  • 决策内容:发布策略调整,架构优化建议,团队流程改进               │
│  • 决策主体:LLM推理 + 人类最终审批                               │
│                                                                 │
│  战术层(Tactical Layer)                                         │
│  • 时间跨度:小时/天级别                                          │
│  • 决策内容:发布计划制定,风险评分,部署窗口选择                   │
│  • 决策主体:ML模型 + 规则引擎                                    │
│                                                                 │
│  操作层(Operational Layer)                                      │
│  • 时间跨度:秒/分钟级别                                          │
│  • 决策内容:扩缩容,流量切换,告警响应,自动回滚                   │
│  • 决策主体:规则引擎(快速响应,可预测)                           │
└─────────────────────────────────────────────────────────────────┘

三路径决策路由

class ThreePathDecisionRouter:
    """
    三路径决策路由器
    
    核心问题:给定一个决策请求,应该使用哪种决策机制?
    
    路径1 - 规则引擎(Fast Path):已知场景,<100ms响应
    路径2 - ML模型(Learn Path):复杂但有历史数据,<1s响应
    路径3 - LLM推理(Complex Path):语义理解,<30s响应
    """
    
    def __init__(
        self,
        rule_engine: RuleEngine,
        ml_model: DeploymentMLModel,
        llm_reasoner: LLMReasoner
    ):
        self.rule_engine = rule_engine
        self.ml_model = ml_model
        self.llm_reasoner = llm_reasoner
    
    async def route_decision(
        self, 
        context: DecisionContext
    ) -> Decision:
        """智能路由:选择最合适的决策路径"""
        
        # 路由条件1:是否命中规则库(已知明确场景)
        rule_match = self.rule_engine.match(context)
        if rule_match.confidence >= 0.95:
            return await self._fast_path(context, rule_match)
        
        # 路由条件2:是否有足够的历史数据支撑ML模型
        has_sufficient_data = await self._check_ml_eligibility(context)
        if has_sufficient_data and not context.requires_explanation:
            return await self._learn_path(context)
        
        # 路由条件3:复杂场景或需要自然语言解释时使用LLM
        return await self._complex_path(context)
    
    async def _fast_path(
        self, 
        context: DecisionContext,
        rule_match: RuleMatch
    ) -> Decision:
        """
        快速路径:规则引擎直接决策
        
        适用场景:
        - CPU > 90% → 扩容
        - 错误率 > 5% → 回滚
        - 部署时间 > 30分钟 → 告警
        """
        start = time.time()
        action = rule_match.triggered_rule.action
        
        return Decision(
            action=action,
            confidence=rule_match.confidence,
            reasoning=f"Rule triggered: {rule_match.triggered_rule.name}",
            path_used="rule_engine",
            latency_ms=(time.time() - start) * 1000
        )
    
    async def _learn_path(self, context: DecisionContext) -> Decision:
        """
        学习路径:ML模型决策
        
        适用场景:
        - 风险评分计算(需要多因素建模)
        - 发布窗口预测(时间序列模型)
        - 异常根因分类
        """
        start = time.time()
        
        features = self._extract_features(context)
        prediction = await self.ml_model.predict(features)
        
        return Decision(
            action=prediction.action,
            confidence=prediction.confidence,
            reasoning=f"ML model prediction (confidence: {prediction.confidence:.2f})",
            feature_importance=prediction.feature_importance,
            path_used="ml_model",
            latency_ms=(time.time() - start) * 1000
        )
    
    async def _complex_path(self, context: DecisionContext) -> Decision:
        """
        复杂路径:LLM推理
        
        适用场景:
        - 全新的故障模式(无历史案例)
        - 需要跨域知识推理
        - 需要自然语言解释
        - 边缘场景处理
        """
        start = time.time()
        
        # 构造结构化提示词
        prompt = self._build_decision_prompt(context)
        
        # 调用LLM
        llm_response = await self.llm_reasoner.reason(
            prompt=prompt,
            temperature=0.1,  # 低温度:更确定性的输出
            max_tokens=1000
        )
        
        # 解析LLM响应
        decision = self._parse_llm_response(llm_response)
        decision.path_used = "llm_reasoning"
        decision.latency_ms = (time.time() - start) * 1000
        
        return decision
    
    def _build_decision_prompt(self, context: DecisionContext) -> str:
        """构造决策提示词"""
        return f"""你是一个软件交付系统的决策专家。请分析以下部署场景并给出决策。

## 当前系统状态
- 服务:{context.service_name}
- 部署版本:{context.target_version}
- 环境:{context.environment}
- 当前错误率:{context.current_error_rate:.2%}
- P99延迟:{context.p99_latency_ms}ms
- 可用性:{context.availability:.4%}

## 部署历史(最近5次)
{self._format_history(context.deployment_history)}

## 当前异常信号
{self._format_anomalies(context.anomaly_signals)}

## 服务依赖影响
{self._format_blast_radius(context.blast_radius)}

请按以下JSON格式输出你的决策:
{{
  "action": "continue_deployment|pause_deployment|rollback|request_human_review",
  "confidence": 0.0-1.0,
  "reasoning": "决策理由(1-3句话)",
  "risk_factors": ["风险因素1", "风险因素2"],
  "monitoring_focus": ["需要重点监控的指标"]
}}"""

3.2.4 执行层设计(Execution Layer)

执行层负责将决策层的指令安全、可靠地转化为实际的系统操作。在分布式环境中,执行层需要处理网络故障、部分失败、并发冲突等复杂情况。

幂等性保证

所有执行操作必须是幂等的——相同的操作执行多次,结果与执行一次相同。这是保证分布式系统可靠性的基础:

class IdempotentExecutor:
    """
    幂等执行器
    
    核心机制:基于操作的唯一ID(Idempotency Key)实现幂等性
    """
    
    def __init__(self, state_store: Redis):
        self.state_store = state_store
        self.ttl_seconds = 86400  # 24小时过期
    
    async def execute(
        self,
        operation: DeliveryOperation,
        idempotency_key: str
    ) -> OperationResult:
        """
        幂等执行操作
        
        流程:
        1. 检查操作是否已执行(查询Redis)
        2. 如果已完成,直接返回历史结果(幂等返回)
        3. 如果正在执行,等待或返回进行中状态
        4. 如果未执行,执行并存储结果
        """
        
        state_key = f"operation:{idempotency_key}"
        
        # 检查已有状态
        existing = await self.state_store.get(state_key)
        
        if existing:
            state = OperationState.from_json(existing)
            
            if state.status == "completed":
                # 幂等:返回历史结果
                return state.result
            
            elif state.status == "in_progress":
                # 操作正在进行中,等待完成
                return await self._wait_for_completion(state_key)
            
            elif state.status == "failed":
                # 上次失败,需要重新执行
                if state.retry_count < state.max_retries:
                    pass  # 继续下面的执行逻辑
                else:
                    raise MaxRetriesExceeded(idempotency_key)
        
        # 标记为进行中(原子操作,防并发)
        acquired = await self.state_store.set(
            state_key,
            OperationState(
                status="in_progress",
                started_at=time.time(),
                retry_count=existing.retry_count + 1 if existing else 0
            ).to_json(),
            nx=True,  # 仅当key不存在时设置
            ex=self.ttl_seconds
        )
        
        if not acquired:
            # 其他进程已获得执行权
            return await self._wait_for_completion(state_key)
        
        try:
            # 执行实际操作
            result = await operation.execute()
            
            # 标记完成
            await self.state_store.set(
                state_key,
                OperationState(
                    status="completed",
                    result=result,
                    completed_at=time.time()
                ).to_json(),
                ex=self.ttl_seconds
            )
            
            return result
            
        except Exception as e:
            # 标记失败
            await self.state_store.set(
                state_key,
                OperationState(
                    status="failed",
                    error=str(e),
                    failed_at=time.time()
                ).to_json(),
                ex=self.ttl_seconds
            )
            raise

Saga模式:分布式事务处理

部署操作涉及多个服务的协调变更,这是一个分布式事务场景。我们使用Saga模式处理:

class DeploymentSaga:
    """
    部署Saga:将复杂的多步骤部署封装为可回滚的事务
    
    Saga = 一系列本地事务 + 每步的补偿事务
    
    示例:Canary发布的Saga
    步骤1: 部署Canary Pod → 补偿: 删除Canary Pod
    步骤2: 切换5%流量到Canary → 补偿: 切换回100%到Stable
    步骤3: 监控5分钟 → 无补偿(观察步骤)
    步骤4: 切换20%流量 → 补偿: 切换回Stable
    步骤5: 全量切换100% → 补偿: 回滚到旧版本
    步骤6: 删除旧版本Pod → 补偿: 恢复旧版本Pod(最后手段)
    """
    
    def __init__(self, saga_id: str, state_store: PostgreSQL):
        self.saga_id = saga_id
        self.state_store = state_store
        self.steps: List[SagaStep] = []
        self.completed_steps: List[int] = []
    
    def add_step(
        self,
        name: str,
        transaction: Callable,
        compensation: Callable
    ) -> "DeploymentSaga":
        """添加Saga步骤"""
        self.steps.append(SagaStep(
            index=len(self.steps),
            name=name,
            transaction=transaction,
            compensation=compensation
        ))
        return self  # 支持链式调用
    
    async def execute(self) -> SagaResult:
        """执行Saga,失败时自动触发补偿"""
        
        for step in self.steps:
            try:
                # 执行步骤
                result = await step.transaction()
                self.completed_steps.append(step.index)
                
                # 持久化进度
                await self._save_progress(step.index, "completed", result)
                
            except Exception as e:
                # 步骤失败,触发补偿(逆序执行)
                await self._compensate()
                raise SagaExecutionError(
                    saga_id=self.saga_id,
                    failed_step=step.name,
                    cause=e,
                    compensated_steps=self.completed_steps
                )
        
        return SagaResult(saga_id=self.saga_id, status="completed")
    
    async def _compensate(self):
        """逆序执行已完成步骤的补偿事务"""
        
        for step_index in reversed(self.completed_steps):
            step = self.steps[step_index]
            try:
                await step.compensation()
                await self._save_progress(step_index, "compensated")
            except Exception as e:
                # 补偿失败:记录并继续(人工介入处理)
                logger.critical(
                    f"Saga compensation failed for step {step.name}",
                    extra={"saga_id": self.saga_id, "error": str(e)}
                )
                # 发送紧急告警
                await self._alert_human(step, e)

# 使用示例:构建Canary发布Saga
async def create_canary_deployment_saga(
    service: str,
    version: str,
    saga_id: str
) -> DeploymentSaga:
    
    saga = DeploymentSaga(saga_id, state_store)
    
    saga.add_step(
        name="deploy_canary_pods",
        transaction=lambda: k8s_client.deploy_canary(service, version, replicas=1),
        compensation=lambda: k8s_client.delete_canary(service)
    ).add_step(
        name="shift_5_percent_traffic",
        transaction=lambda: istio_client.set_traffic_split(service, canary=5, stable=95),
        compensation=lambda: istio_client.set_traffic_split(service, canary=0, stable=100)
    ).add_step(
        name="observe_5_minutes",
        transaction=lambda: monitor.observe_and_validate(service, duration_minutes=5),
        compensation=lambda: None  # 观察步骤无补偿
    ).add_step(
        name="shift_50_percent_traffic",
        transaction=lambda: istio_client.set_traffic_split(service, canary=50, stable=50),
        compensation=lambda: istio_client.set_traffic_split(service, canary=0, stable=100)
    ).add_step(
        name="shift_100_percent_traffic",
        transaction=lambda: istio_client.set_traffic_split(service, canary=100, stable=0),
        compensation=lambda: istio_client.set_traffic_split(service, canary=0, stable=100)
    )
    
    return saga

3.2.5 自进化机制(Evolution Layer)

自进化是本书架构的最大亮点,也是与传统自动化系统的根本区别。进化层使Agent能够从每次经历中学习,持续提升决策质量。

在线学习:实时策略更新

class OnlineLearningEngine:
    """
    在线学习引擎
    
    核心理念:每次部署都是一个学习机会
    每次决策→执行→观察结果的循环,都用于更新模型
    """
    
    def __init__(self, model: RiskScoringModel):
        self.model = model
        self.experience_buffer = ExperienceReplayBuffer(max_size=10000)
        self.update_frequency = 100  # 每100个经验样本触发一次更新
    
    async def record_experience(
        self,
        state: dict,
        action: str,
        reward: float,
        next_state: dict,
        metadata: dict
    ):
        """记录一次部署经验"""
        
        experience = Experience(
            state=state,
            action=action,
            reward=reward,
            next_state=next_state,
            timestamp=time.time(),
            metadata=metadata
        )
        
        self.experience_buffer.add(experience)
        
        # 触发增量更新
        if len(self.experience_buffer) % self.update_frequency == 0:
            await self._incremental_update()
    
    async def _incremental_update(self):
        """
        增量模型更新
        使用随机经验回放(Experience Replay)避免过拟合
        """
        
        # 从缓冲区采样一批经验
        batch = self.experience_buffer.sample(batch_size=64)
        
        # 计算目标Q值(使用Bellman方程)
        target_values = []
        for exp in batch:
            if exp.next_state is None:  # 终止状态
                target = exp.reward
            else:
                # Q-learning更新
                next_q = self.model.predict_q_value(exp.next_state)
                target = exp.reward + 0.95 * max(next_q)  # γ=0.95
            target_values.append(target)
        
        # 执行梯度更新
        states = [exp.state for exp in batch]
        actions = [exp.action for exp in batch]
        
        loss = await self.model.update(
            states=states,
            actions=actions,
            target_values=target_values
        )
        
        logger.info(f"Online learning update: loss={loss:.4f}")

A/B测试框架:策略对比验证

在将新学习到的策略推广之前,必须通过A/B测试验证其效果:

class DeploymentStrategyABTest:
    """
    部署策略A/B测试框架
    
    用途:验证新的决策策略是否优于当前策略
    方法:将部署随机分配到控制组(旧策略)和实验组(新策略)
    评估:基于成功率、MTTR、部署时长等指标比较
    """
    
    def __init__(self, experiment_id: str, traffic_split: float = 0.1):
        self.experiment_id = experiment_id
        self.traffic_split = traffic_split  # 10%流量使用新策略
        self.control_results = []
        self.treatment_results = []
    
    def assign_group(self, deployment_id: str) -> str:
        """基于哈希的确定性分组"""
        hash_val = int(hashlib.md5(
            f"{self.experiment_id}:{deployment_id}".encode()
        ).hexdigest(), 16)
        
        if (hash_val % 100) < (self.traffic_split * 100):
            return "treatment"  # 实验组(新策略)
        else:
            return "control"    # 控制组(旧策略)
    
    def record_result(
        self,
        group: str,
        deployment_id: str,
        outcome: dict
    ):
        """记录实验结果"""
        result = {
            "deployment_id": deployment_id,
            "success": outcome["success"],
            "duration_seconds": outcome["duration"],
            "error_rate_delta": outcome["error_rate_delta"],
            "timestamp": time.time()
        }
        
        if group == "control":
            self.control_results.append(result)
        else:
            self.treatment_results.append(result)
    
    def compute_significance(self) -> ABTestResult:
        """计算统计显著性(使用双比例Z检验)"""
        
        if len(self.control_results) < 30 or len(self.treatment_results) < 30:
            return ABTestResult(
                status="insufficient_data",
                control_size=len(self.control_results),
                treatment_size=len(self.treatment_results)
            )
        
        # 计算成功率
        control_successes = sum(1 for r in self.control_results if r["success"])
        treatment_successes = sum(1 for r in self.treatment_results if r["success"])
        
        n1 = len(self.control_results)
        n2 = len(self.treatment_results)
        p1 = control_successes / n1
        p2 = treatment_successes / n2
        
        # Z检验
        pooled_p = (control_successes + treatment_successes) / (n1 + n2)
        se = math.sqrt(pooled_p * (1 - pooled_p) * (1/n1 + 1/n2))
        
        if se == 0:
            z_score = 0
        else:
            z_score = (p2 - p1) / se
        
        p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
        
        # 效果量(Cohen's h)
        effect_size = 2 * math.asin(math.sqrt(p2)) - 2 * math.asin(math.sqrt(p1))
        
        return ABTestResult(
            status="completed",
            control_success_rate=p1,
            treatment_success_rate=p2,
            relative_improvement=(p2 - p1) / p1,
            p_value=p_value,
            effect_size=effect_size,
            is_significant=p_value < 0.05 and effect_size > 0.2,
            recommendation="promote" if (p_value < 0.05 and p2 > p1) else "reject"
        )

3.3 技术架构详细设计

3.3.1 整体架构图

以下Mermaid图展示了自进化超级智能体的完整架构:

可观测性

进化层(Evolution Layer)

执行层(Execution Layer)

决策层(Decision Layer)

认知层(Cognition Layer)

感知层(Perception Layer)

Orchestrator服务(核心协调)

API网关层

外部输入层

Pull Request
GitHub/GitLab

部署请求
API/CLI

告警事件
PagerDuty

定时触发
Cron

API Gateway
Kong/Nginx

认证授权
OAuth2/JWT

Orchestrator Agent
FastAPI + Celery

状态机引擎
Deployment FSM

任务调度器
Celery Beat

Collector服务
遥测数据采集

OpenTelemetry
Collector

Prometheus
指标抓取

Apache Kafka
事件总线

Apache Flink
流处理引擎

Memory服务
状态与知识管理

Redis Cluster
工作记忆/短期记忆

PostgreSQL
长期记忆/档案

Milvus
向量数据库

Neo4j
知识图谱

Analyzer服务
实时分析引擎

Planner服务
决策计划生成

规则引擎
快速路径

ML模型服务
学习路径

LLM服务
Claude/GPT

Executor服务
操作执行引擎

Kubernetes
容器编排

Istio
服务网格

Helm
应用部署

Learner服务
模型训练与更新

MLflow
实验追踪

Feast
特征存储

Grafana
可视化

Jaeger
分布式追踪

Loki
日志聚合

Learner Monitor Agent Kubernetes Executor Risk Agent Orchestrator API Gateway 开发工程师 Learner Monitor Agent Kubernetes Executor Risk Agent Orchestrator API Gateway 开发工程师 loop [监控观察 (5分钟)] alt [指标正常] [指标异常] POST /api/v1/deployments 转发部署请求 评估部署风险 风险报告 (score=0.35) 选择策略 (Canary, 5%起) 执行Canary部署 创建Canary Pod Pod Running 设置Istio流量5% 流量切换完成 采集错误率/延迟 metrics 健康报告 推进到20%流量 更新Istio规则 触发回滚 恢复100%到Stable 发送回滚通知 记录部署经验 更新风险模型

3.3.2 核心技术栈选型

容器编排:Kubernetes 1.28+

选择Kubernetes的理由不仅是行业标准,更是因为其API驱动的设计非常适合Agent控制:

# Kubernetes版本要求
apiVersion: v1
# 核心功能依赖:
# - KEP-3998: CronJob v2 (稳定版调度)
# - KEP-3157: Allow informers for getting a stream of resource changes
# - Server-Side Apply (避免多控制器冲突)
# - TopologySpreadConstraints (跨可用区高可用)
# 最低版本: 1.26, 推荐版本: 1.28+

---
# Agent专用ServiceAccount(最小权限原则)
apiVersion: v1
kind: ServiceAccount
metadata:
  name: harness-agent
  namespace: harness-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: harness-agent-role
rules:
# 部署控制
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets", "statefulsets", "daemonsets"]
  verbs: ["get", "list", "watch", "create", "update", "patch"]
# Pod管理
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/status", "pods/exec"]
  verbs: ["get", "list", "watch", "delete"]
# 服务与配置
- apiGroups: [""]
  resources: ["services", "configmaps", "endpoints"]
  verbs: ["get", "list", "watch"]
# HPA自动扩缩容
- apiGroups: ["autoscaling"]
  resources: ["horizontalpodautoscalers"]
  verbs: ["get", "list", "watch", "create", "update", "patch"]
# 事件读取(感知层需要)
- apiGroups: [""]
  resources: ["events"]
  verbs: ["get", "list", "watch"]

消息队列:Apache Kafka Topic设计

Kafka的Topic设计决定了整个系统的数据流向和扩展能力:

Kafka集群配置
  brokers: 3(高可用最小配置)
  replication_factor: 3
  min_insync_replicas: 2

Topic列表:

┌───────────────────────────────────────────────────────────────┐
│  Topic名称                    分区数  副本数  保留时间           │
├───────────────────────────────────────────────────────────────┤
│  harness.deployments.events    12      3      7天              │
│  harness.metrics.raw           24      3      1天              │
│  harness.metrics.processed      6      3      7天              │
│  harness.logs.errors            6      3      3天              │
│  harness.alerts.triggered       3      3      30天             │
│  harness.decisions.made         3      3      90天             │
│  harness.decisions.feedback     3      3      365天            │
│  harness.rollback.triggered     3      3      90天             │
│  harness.learning.experiences   6      3      365天            │
│  harness.human.feedback         3      3      无限             │
└───────────────────────────────────────────────────────────────┘

分区策略:
  - deployments.events: 按service_name分区(保证同一服务有序)
  - metrics.raw: 按service_name哈希分区
  - decisions.*: 按deployment_id分区(保证决策序列有序)

时序数据库:VictoriaMetrics配置

# VictoriaMetrics 集群版配置
# 数据保留策略:
#   - 原始数据:15天(精细粒度,每15秒一点)
#   - 降采样5分钟:90天
#   - 降采样1小时:2年

# victoriametrics-cluster values.yaml
vmcluster:
  spec:
    # 数据保留
    retentionPeriod: "15d"
    
    vminsert:
      replicaCount: 2
      resources:
        requests:
          cpu: "500m"
          memory: "512Mi"
    
    vmstorage:
      replicaCount: 3
      storageDataPath: /vm-data
      storage:
        volumeClaimTemplate:
          spec:
            resources:
              requests:
                storage: 200Gi
            storageClassName: fast-ssd
    
    vmselect:
      replicaCount: 2
      cacheMountPath: /select-cache
      
# 预聚合规则(减少查询延迟)
recording_rules:
  - record: harness:deployment:success_rate:5m
    expr: |
      sum(rate(harness_deployment_total{status="success"}[5m])) by (service)
      /
      sum(rate(harness_deployment_total[5m])) by (service)
  
  - record: harness:service:error_rate:5m
    expr: |
      sum(rate(http_requests_total{status=~"5.."}[5m])) by (service)
      /
      sum(rate(http_requests_total[5m])) by (service)

3.3.3 微服务架构设计

Orchestrator服务:核心协调器

Orchestrator服务职责:
  1. 接收部署请求(REST API)
  2. 协调多个Worker Agent
  3. 维护部署状态机
  4. 路由决策请求到决策层
  5. 处理异常和升级逻辑

技术选型:
  - 框架:FastAPI(异步,高性能)
  - 任务队列:Celery + Redis(分布式任务调度)
  - 状态持久化:PostgreSQL(ACID保证)
  - 服务发现:Kubernetes DNS
  - 配置管理:ConfigMap + Secrets

关键指标(SLO):
  - API响应时间:P99 < 200ms
  - 任务调度延迟:P99 < 1s
  - 可用性:99.9%
# orchestrator/main.py - Orchestrator服务入口
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import structlog

from .routers import deployments, rollbacks, decisions, health
from .dependencies import get_db, get_message_bus, get_agent_registry
from .config import settings
from .telemetry import setup_telemetry

logger = structlog.get_logger()

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    logger.info("Orchestrator starting up", version=settings.version)
    
    # 初始化组件
    await setup_telemetry(settings.service_name, settings.otlp_endpoint)
    await get_message_bus().connect()
    
    # 启动后台任务
    app.state.health_monitor = asyncio.create_task(
        run_health_monitor()
    )
    
    yield
    
    # 优雅关闭
    logger.info("Orchestrator shutting down")
    app.state.health_monitor.cancel()
    await get_message_bus().disconnect()

app = FastAPI(
    title="Harness Orchestrator API",
    version="1.0.0",
    lifespan=lifespan
)

# 中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.allowed_origins,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由注册
app.include_router(deployments.router, prefix="/api/v1/deployments", tags=["deployments"])
app.include_router(rollbacks.router, prefix="/api/v1/rollbacks", tags=["rollbacks"])
app.include_router(decisions.router, prefix="/api/v1/decisions", tags=["decisions"])
app.include_router(health.router, prefix="/api/v1/health", tags=["health"])

3.3.4 数据模型设计

完整的PostgreSQL数据库设计,支撑所有核心业务功能:

-- ============================================================
-- Harness Platform 数据库 Schema
-- 版本:1.0.0
-- 包含:10张核心业务表 + 索引 + 触发器
-- ============================================================

-- 启用必要扩展
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_trgm";     -- 模糊文本搜索
CREATE EXTENSION IF NOT EXISTS "btree_gin";   -- 复合索引优化
CREATE EXTENSION IF NOT EXISTS "pg_stat_statements"; -- 慢查询分析

-- ============================================================
-- 表1:服务档案(service_profiles)
-- 存储每个服务的静态信息和历史特征
-- ============================================================
CREATE TABLE service_profiles (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    service_name    VARCHAR(255) NOT NULL UNIQUE,
    service_alias   VARCHAR(255),
    team_owner      VARCHAR(255) NOT NULL,
    tier            VARCHAR(50) NOT NULL CHECK (tier IN ('frontend', 'backend', 'data', 'infra')),
    criticality     VARCHAR(50) NOT NULL CHECK (criticality IN ('critical', 'high', 'medium', 'low')),
    
    -- SLO目标
    slo_error_rate_target   DECIMAL(10, 6) NOT NULL DEFAULT 0.001,  -- 0.1%
    slo_p99_latency_ms      INTEGER NOT NULL DEFAULT 500,
    slo_availability_target DECIMAL(10, 6) NOT NULL DEFAULT 0.9999,
    
    -- 技术栈信息
    language            VARCHAR(50),
    framework           VARCHAR(100),
    container_image     VARCHAR(500),
    kubernetes_namespace VARCHAR(100),
    kubernetes_deployment VARCHAR(100),
    
    -- 统计数据(定期更新)
    avg_deployment_duration_sec  INTEGER DEFAULT 0,
    deployment_success_rate_30d  DECIMAL(5, 4) DEFAULT 1.0,
    incident_count_30d           INTEGER DEFAULT 0,
    last_incident_at             TIMESTAMP WITH TIME ZONE,
    
    -- 元数据
    tags        JSONB DEFAULT '{}',
    created_at  TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at  TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    deleted_at  TIMESTAMP WITH TIME ZONE  -- 软删除
);

CREATE INDEX idx_service_profiles_criticality ON service_profiles(criticality);
CREATE INDEX idx_service_profiles_team ON service_profiles(team_owner);
CREATE INDEX idx_service_profiles_namespace ON service_profiles(kubernetes_namespace);

-- ============================================================
-- 表2:部署记录(deployments)
-- 每次部署的完整生命周期记录
-- ============================================================
CREATE TABLE deployments (
    id                  UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    service_id          UUID NOT NULL REFERENCES service_profiles(id),
    
    -- 版本信息
    current_version     VARCHAR(255) NOT NULL,   -- 当前运行版本
    target_version      VARCHAR(255) NOT NULL,   -- 目标部署版本
    git_commit_sha      CHAR(40),
    docker_image_tag    VARCHAR(500),
    
    -- 触发信息
    triggered_by        VARCHAR(100) NOT NULL,   -- user_id或system
    trigger_type        VARCHAR(50) NOT NULL CHECK (
        trigger_type IN ('manual', 'scheduled', 'auto_rollback', 'hotfix', 'ci_cd')
    ),
    trigger_reason      TEXT,
    
    -- 部署配置
    strategy            VARCHAR(50) NOT NULL CHECK (
        strategy IN ('canary', 'blue_green', 'rolling', 'recreate')
    ),
    environment         VARCHAR(50) NOT NULL CHECK (
        environment IN ('dev', 'staging', 'production', 'canary')
    ),
    
    -- 状态机
    status              VARCHAR(50) NOT NULL DEFAULT 'pending' CHECK (
        status IN (
            'pending',           -- 等待执行
            'risk_assessment',   -- 风险评估中
            'awaiting_approval', -- 等待人工审批
            'in_progress',       -- 部署执行中
            'verifying',         -- 验证观察中
            'completed',         -- 部署成功
            'failed',            -- 部署失败
            'rolled_back',       -- 已回滚
            'cancelled'          -- 已取消
        )
    ),
    
    -- 风险信息
    risk_score          DECIMAL(5, 4),           -- 0-1风险评分
    risk_factors        JSONB DEFAULT '[]',      -- 风险因素列表
    
    -- 时间戳
    scheduled_at        TIMESTAMP WITH TIME ZONE,
    started_at          TIMESTAMP WITH TIME ZONE,
    completed_at        TIMESTAMP WITH TIME ZONE,
    duration_seconds    INTEGER,
    
    -- 结果
    rollback_of         UUID REFERENCES deployments(id),  -- 如果这是回滚操作
    rollback_reason     TEXT,
    outcome_metrics     JSONB DEFAULT '{}',  -- 部署后的关键指标快照
    
    -- 元数据
    metadata            JSONB DEFAULT '{}',
    created_at          TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at          TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_deployments_service ON deployments(service_id);
CREATE INDEX idx_deployments_status ON deployments(status);
CREATE INDEX idx_deployments_environment ON deployments(environment);
CREATE INDEX idx_deployments_created_at ON deployments(created_at DESC);
CREATE INDEX idx_deployments_git_commit ON deployments(git_commit_sha) WHERE git_commit_sha IS NOT NULL;
-- 复合索引:按服务查最近N次部署(高频查询)
CREATE INDEX idx_deployments_service_created ON deployments(service_id, created_at DESC);

-- ============================================================
-- 表3:部署步骤(deployment_steps)
-- 记录每个部署的详细执行步骤
-- ============================================================
CREATE TABLE deployment_steps (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    deployment_id   UUID NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
    
    step_name       VARCHAR(100) NOT NULL,
    step_order      INTEGER NOT NULL,
    step_type       VARCHAR(50) NOT NULL CHECK (
        step_type IN ('pre_check', 'deploy', 'verify', 'promote', 'rollback', 'notify')
    ),
    
    status          VARCHAR(50) NOT NULL DEFAULT 'pending' CHECK (
        status IN ('pending', 'running', 'completed', 'failed', 'skipped', 'compensated')
    ),
    
    -- 执行详情
    input_params    JSONB DEFAULT '{}',
    output          JSONB DEFAULT '{}',
    error_message   TEXT,
    
    -- 时间
    started_at      TIMESTAMP WITH TIME ZONE,
    completed_at    TIMESTAMP WITH TIME ZONE,
    duration_ms     INTEGER,
    
    -- 执行者
    executor_type   VARCHAR(50),  -- rule_engine, ml_model, llm, human
    
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_deployment_steps_deployment ON deployment_steps(deployment_id, step_order);
CREATE INDEX idx_deployment_steps_status ON deployment_steps(status) WHERE status IN ('running', 'failed');

-- ============================================================
-- 表4:决策记录(decisions)
-- Agent做出的每个决策的完整记录
-- ============================================================
CREATE TABLE decisions (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    deployment_id   UUID REFERENCES deployments(id),
    
    -- 决策上下文
    decision_type   VARCHAR(100) NOT NULL,  -- deploy/rollback/scale/alert
    context_snapshot JSONB NOT NULL,        -- 做决策时的完整环境快照
    
    -- 决策过程
    path_used       VARCHAR(50) NOT NULL CHECK (
        path_used IN ('rule_engine', 'ml_model', 'llm_reasoning', 'human')
    ),
    rule_triggered  VARCHAR(200),           -- 如果是规则路径,触发的规则名
    model_version   VARCHAR(100),           -- 如果是ML路径,使用的模型版本
    llm_prompt_hash VARCHAR(64),            -- 如果是LLM路径,提示词哈希
    
    -- 决策结果
    action          VARCHAR(100) NOT NULL,
    confidence      DECIMAL(5, 4),          -- 置信度 0-1
    reasoning       TEXT,                   -- 决策理由
    alternatives    JSONB DEFAULT '[]',     -- 其他备选方案
    
    -- 执行结果
    was_executed    BOOLEAN DEFAULT FALSE,
    actual_outcome  VARCHAR(50) CHECK (
        actual_outcome IN ('success', 'failure', 'partial', 'overridden', 'pending')
    ),
    outcome_metrics JSONB DEFAULT '{}',
    
    -- 人工反馈
    human_feedback  VARCHAR(50) CHECK (
        human_feedback IN ('approved', 'rejected', 'modified', 'no_feedback')
    ),
    human_comment   TEXT,
    feedback_by     VARCHAR(100),
    feedback_at     TIMESTAMP WITH TIME ZONE,
    
    -- 时间
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    executed_at     TIMESTAMP WITH TIME ZONE
);

CREATE INDEX idx_decisions_deployment ON decisions(deployment_id);
CREATE INDEX idx_decisions_type ON decisions(decision_type);
CREATE INDEX idx_decisions_path ON decisions(path_used);
CREATE INDEX idx_decisions_created ON decisions(created_at DESC);
-- 用于学习管道:查询有真实结果反馈的决策
CREATE INDEX idx_decisions_with_outcome ON decisions(actual_outcome, created_at DESC)
    WHERE actual_outcome IS NOT NULL;

-- ============================================================
-- 表5:规则库(rules)
-- 规则引擎的规则定义(支持动态更新)
-- ============================================================
CREATE TABLE rules (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    rule_name       VARCHAR(200) NOT NULL UNIQUE,
    rule_type       VARCHAR(50) NOT NULL CHECK (
        rule_type IN ('deployment_gate', 'auto_rollback', 'scaling', 'alert', 'approval')
    ),
    
    -- 规则定义(DSL)
    conditions      JSONB NOT NULL,         -- 触发条件
    actions         JSONB NOT NULL,         -- 触发动作
    priority        INTEGER NOT NULL DEFAULT 100,  -- 越小优先级越高
    
    -- 范围
    applies_to_services  TEXT[] DEFAULT ARRAY[]::TEXT[],  -- 空=全局
    applies_to_envs      TEXT[] DEFAULT ARRAY['production']::TEXT[],
    
    -- 状态
    is_active       BOOLEAN NOT NULL DEFAULT TRUE,
    is_dry_run      BOOLEAN NOT NULL DEFAULT FALSE,  -- 不执行,只记录
    
    -- 统计
    trigger_count   INTEGER DEFAULT 0,
    last_triggered  TIMESTAMP WITH TIME ZONE,
    success_rate    DECIMAL(5, 4),
    
    -- 元数据
    description     TEXT,
    author          VARCHAR(100),
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_rules_active ON rules(is_active, priority) WHERE is_active = TRUE;
CREATE INDEX idx_rules_type ON rules(rule_type);

-- ============================================================
-- 表6:风险评估历史(risk_assessments)
-- 每次部署的风险评估结果
-- ============================================================
CREATE TABLE risk_assessments (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    deployment_id   UUID NOT NULL REFERENCES deployments(id),
    
    -- 评估结果
    overall_score   DECIMAL(5, 4) NOT NULL,  -- 综合风险分 0-1
    
    -- 分项评分
    code_change_risk    DECIMAL(5, 4),   -- 代码变更风险
    service_health_risk DECIMAL(5, 4),   -- 服务健康风险
    dependency_risk     DECIMAL(5, 4),   -- 依赖服务风险
    traffic_risk        DECIMAL(5, 4),   -- 流量模式风险
    timing_risk         DECIMAL(5, 4),   -- 时机风险(是否高峰期)
    history_risk        DECIMAL(5, 4),   -- 历史失败模式风险
    
    -- 风险因素详情
    risk_factors        JSONB NOT NULL DEFAULT '[]',
    blast_radius        JSONB DEFAULT '{}',  -- 爆炸半径分析结果
    
    -- 建议
    recommended_strategy    VARCHAR(50),
    recommended_window      VARCHAR(100),  -- 建议的部署时间窗口
    
    -- 评估元数据
    model_version       VARCHAR(100),
    assessment_duration_ms INTEGER,
    created_at          TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_risk_assessments_deployment ON risk_assessments(deployment_id);
CREATE INDEX idx_risk_assessments_score ON risk_assessments(overall_score);

-- ============================================================
-- 表7:服务依赖关系(service_dependencies)
-- 服务间的依赖关系图
-- ============================================================
CREATE TABLE service_dependencies (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    upstream_id     UUID NOT NULL REFERENCES service_profiles(id),
    downstream_id   UUID NOT NULL REFERENCES service_profiles(id),
    
    dependency_type VARCHAR(20) NOT NULL CHECK (
        dependency_type IN ('hard', 'soft', 'async', 'optional')
    ),
    
    traffic_percentage  DECIMAL(5, 2) DEFAULT 100.0,  -- 多少%流量依赖此路径
    fallback_strategy   VARCHAR(100),
    
    -- 运行时观测数据
    avg_call_rate_per_min   INTEGER,
    avg_latency_ms          INTEGER,
    error_rate_last_7d      DECIMAL(5, 4),
    
    -- 发现来源
    discovery_method    VARCHAR(50) CHECK (
        discovery_method IN ('manual', 'service_mesh', 'code_analysis', 'network_analysis')
    ),
    last_verified_at    TIMESTAMP WITH TIME ZONE,
    
    is_active           BOOLEAN DEFAULT TRUE,
    created_at          TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    
    UNIQUE(upstream_id, downstream_id)
);

CREATE INDEX idx_service_deps_upstream ON service_dependencies(upstream_id);
CREATE INDEX idx_service_deps_downstream ON service_dependencies(downstream_id);

-- ============================================================
-- 表8:事故记录(incidents)
-- 生产事故与部署的关联关系
-- ============================================================
CREATE TABLE incidents (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    incident_number VARCHAR(50) UNIQUE NOT NULL,  -- INC-2024-001格式
    
    -- 关联部署
    triggered_by_deployment UUID REFERENCES deployments(id),
    resolved_by_deployment  UUID REFERENCES deployments(id),
    
    -- 事故信息
    severity        VARCHAR(20) NOT NULL CHECK (
        severity IN ('P0', 'P1', 'P2', 'P3', 'P4')
    ),
    title           VARCHAR(500) NOT NULL,
    description     TEXT,
    
    -- 影响范围
    affected_services   TEXT[] NOT NULL,
    user_impact_count   INTEGER DEFAULT 0,
    revenue_impact_usd  DECIMAL(15, 2),
    
    -- 时间线
    detected_at         TIMESTAMP WITH TIME ZONE NOT NULL,
    acknowledged_at     TIMESTAMP WITH TIME ZONE,
    mitigated_at        TIMESTAMP WITH TIME ZONE,
    resolved_at         TIMESTAMP WITH TIME ZONE,
    
    -- 关键指标
    ttd_minutes         INTEGER,    -- Time to Detect
    tta_minutes         INTEGER,    -- Time to Acknowledge
    ttm_minutes         INTEGER,    -- Time to Mitigate
    ttr_minutes         INTEGER,    -- Time to Resolve
    
    -- RCA
    root_cause          TEXT,
    root_cause_category VARCHAR(50) CHECK (
        root_cause_category IN (
            'bad_deploy', 'config_change', 'capacity', 'dependency',
            'security', 'data_corruption', 'unknown'
        )
    ),
    prevention_actions  TEXT,
    
    -- 状态
    status          VARCHAR(20) DEFAULT 'active' CHECK (
        status IN ('active', 'mitigated', 'resolved', 'postmortem_done')
    ),
    
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_incidents_deployment ON incidents(triggered_by_deployment);
CREATE INDEX idx_incidents_severity ON incidents(severity);
CREATE INDEX idx_incidents_detected ON incidents(detected_at DESC);
CREATE INDEX idx_incidents_status ON incidents(status) WHERE status NOT IN ('resolved', 'postmortem_done');

-- ============================================================
-- 表9:模型版本(model_versions)
-- 机器学习模型的版本管理
-- ============================================================
CREATE TABLE model_versions (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    model_name      VARCHAR(200) NOT NULL,
    version         VARCHAR(50) NOT NULL,
    
    -- 模型信息
    model_type      VARCHAR(100) NOT NULL,  -- risk_scoring, anomaly_detection等
    algorithm       VARCHAR(100),
    hyperparameters JSONB DEFAULT '{}',
    
    -- 训练信息
    training_data_start TIMESTAMP WITH TIME ZONE,
    training_data_end   TIMESTAMP WITH TIME ZONE,
    training_samples    INTEGER,
    
    -- 性能指标
    validation_metrics  JSONB DEFAULT '{}',  -- AUC, F1, Precision, Recall等
    
    -- 生命周期
    status          VARCHAR(50) DEFAULT 'training' CHECK (
        status IN ('training', 'evaluating', 'shadow', 'canary', 'production', 'retired')
    ),
    
    -- 存储路径
    model_artifact_path VARCHAR(500),
    feature_schema      JSONB DEFAULT '{}',
    
    -- A/B测试结果
    ab_test_id          VARCHAR(100),
    ab_test_result      JSONB DEFAULT '{}',
    
    deployed_at         TIMESTAMP WITH TIME ZONE,
    retired_at          TIMESTAMP WITH TIME ZONE,
    created_at          TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_model_versions_name_status ON model_versions(model_name, status);
CREATE INDEX idx_model_versions_production ON model_versions(model_name, deployed_at DESC)
    WHERE status = 'production';

-- ============================================================
-- 表10:人工反馈(human_feedback)
-- 收集人类专家对Agent决策的评估
-- ============================================================
CREATE TABLE human_feedback (
    id              UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    decision_id     UUID NOT NULL REFERENCES decisions(id),
    
    -- 反馈者信息
    reviewer_id     VARCHAR(100) NOT NULL,
    reviewer_role   VARCHAR(50),  -- engineer, sre, manager
    
    -- 反馈内容
    rating          INTEGER CHECK (rating BETWEEN 1 AND 5),
    verdict         VARCHAR(20) NOT NULL CHECK (
        verdict IN ('correct', 'mostly_correct', 'incorrect', 'dangerous')
    ),
    
    -- 如果Agent决策有误,正确答案是什么
    correct_action  VARCHAR(100),
    correction_reason TEXT,
    
    -- 改进建议
    improvement_suggestion TEXT,
    additional_context     TEXT,
    
    -- 是否用于训练
    approved_for_training BOOLEAN DEFAULT FALSE,
    
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_human_feedback_decision ON human_feedback(decision_id);
CREATE INDEX idx_human_feedback_training ON human_feedback(approved_for_training, created_at)
    WHERE approved_for_training = TRUE;
CREATE INDEX idx_human_feedback_reviewer ON human_feedback(reviewer_id);

-- ============================================================
-- 自动更新 updated_at 触发器
-- ============================================================
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_service_profiles_updated_at
    BEFORE UPDATE ON service_profiles
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_deployments_updated_at
    BEFORE UPDATE ON deployments
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_rules_updated_at
    BEFORE UPDATE ON rules
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- ============================================================
-- 物化视图:部署健康仪表板
-- ============================================================
CREATE MATERIALIZED VIEW deployment_health_dashboard AS
SELECT
    sp.service_name,
    sp.criticality,
    sp.team_owner,
    COUNT(d.id) AS total_deployments_30d,
    SUM(CASE WHEN d.status = 'completed' THEN 1 ELSE 0 END) AS successful_deployments_30d,
    ROUND(
        SUM(CASE WHEN d.status = 'completed' THEN 1 ELSE 0 END)::DECIMAL / 
        NULLIF(COUNT(d.id), 0) * 100, 2
    ) AS success_rate_pct,
    AVG(d.duration_seconds) AS avg_duration_sec,
    MAX(d.created_at) AS last_deployment_at,
    COUNT(i.id) AS incidents_30d,
    AVG(COALESCE(ra.overall_score, 0)) AS avg_risk_score
FROM service_profiles sp
LEFT JOIN deployments d ON d.service_id = sp.id 
    AND d.created_at >= CURRENT_TIMESTAMP - INTERVAL '30 days'
    AND d.environment = 'production'
LEFT JOIN incidents i ON i.triggered_by_deployment = d.id
LEFT JOIN risk_assessments ra ON ra.deployment_id = d.id
WHERE sp.deleted_at IS NULL
GROUP BY sp.id, sp.service_name, sp.criticality, sp.team_owner
ORDER BY sp.criticality, success_rate_pct ASC;

-- 每小时刷新一次
CREATE UNIQUE INDEX ON deployment_health_dashboard(service_name);

3.4 手把手:搭建基础Agent框架

3.4.1 环境准备

本地Kubernetes集群搭建(Kind)

# 安装Kind
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind

# 创建多节点集群(1 control-plane + 2 workers)
cat << 'EOF' > kind-cluster.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
name: harness-dev
nodes:
- role: control-plane
  kubeadmConfigPatches:
  - |
    kind: InitConfiguration
    nodeRegistration:
      kubeletExtraArgs:
        node-labels: "ingress-ready=true"
  extraPortMappings:
  - containerPort: 80
    hostPort: 80
    protocol: TCP
  - containerPort: 443
    hostPort: 443
    protocol: TCP
- role: worker
  extraMounts:
  - hostPath: /tmp/harness-data
    containerPath: /data
- role: worker
  extraMounts:
  - hostPath: /tmp/harness-data
    containerPath: /data
EOF

kind create cluster --config kind-cluster.yaml

# 验证集群
kubectl cluster-info --context kind-harness-dev
kubectl get nodes

使用Helm安装基础组件

# 添加常用Helm仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo add grafana https://grafana.github.io/helm-charts
helm repo add strimzi https://strimzi.io/charts/
helm repo update

# 创建命名空间
kubectl create namespace harness-system
kubectl create namespace harness-data
kubectl create namespace harness-monitoring
kubectl create namespace kafka

# 1. 安装PostgreSQL
helm install postgres bitnami/postgresql \
  --namespace harness-data \
  --set auth.postgresPassword=harness_dev_pwd \
  --set auth.database=harness \
  --set primary.persistence.size=20Gi

# 2. 安装Redis Cluster
helm install redis bitnami/redis \
  --namespace harness-data \
  --set auth.password=redis_dev_pwd \
  --set cluster.enabled=true \
  --set cluster.slaveCount=1

# 3. 安装Kafka (使用Strimzi Operator)
helm install strimzi-operator strimzi/strimzi-kafka-operator \
  --namespace kafka

kubectl apply -n kafka -f - << 'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: harness-kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

# 4. 安装Prometheus + Grafana监控栈
helm install monitoring prometheus-community/kube-prometheus-stack \
  --namespace harness-monitoring \
  --set grafana.adminPassword=admin_dev \
  --set prometheus.prometheusSpec.retention=15d \
  --set prometheus.prometheusSpec.storageSpec.volumeClaimTemplate.spec.resources.requests.storage=50Gi

RBAC配置

# 创建Harness系统ServiceAccount和权限
kubectl apply -f - << 'EOF'
apiVersion: v1
kind: ServiceAccount
metadata:
  name: harness-agent
  namespace: harness-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: harness-agent-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: harness-agent-role
subjects:
- kind: ServiceAccount
  name: harness-agent
  namespace: harness-system
EOF

3.4.2 核心服务实现

以下是Orchestrator核心逻辑的完整实现(包含状态机设计):

# orchestrator/core/orchestrator.py
"""
Harness Orchestrator 核心实现
负责整个部署生命周期的协调管理
"""

import asyncio
import uuid
from enum import Enum
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
import structlog
from datetime import datetime, timezone

from ..models.deployment import (
    Deployment, DeploymentStatus, DeploymentStrategy,
    DeploymentRequest, DeploymentResult
)
from ..services.risk_assessor import RiskAssessor
from ..services.strategy_selector import StrategySelector
from ..services.health_checker import HealthChecker
from ..services.executor import DeploymentExecutor
from ..services.monitor import DeploymentMonitor
from ..services.notifier import NotificationService
from ..repositories.deployment_repo import DeploymentRepository
from ..repositories.decision_repo import DecisionRepository
from .state_machine import DeploymentStateMachine, DeploymentEvent

logger = structlog.get_logger(__name__)


class OrchestratorError(Exception):
    """Orchestrator基础异常"""
    pass


@dataclass
class OrchestratorConfig:
    """Orchestrator配置"""
    max_concurrent_deployments: int = 10
    risk_score_threshold_for_approval: float = 0.7
    canary_observation_minutes: int = 5
    rollback_timeout_seconds: int = 300
    enable_auto_rollback: bool = True
    dry_run_mode: bool = False


class DeploymentOrchestrator:
    """
    部署编排器:自进化智能体的核心协调组件
    
    职责:
    1. 接收并验证部署请求
    2. 协调风险评估
    3. 选择部署策略
    4. 管理部署状态机
    5. 协调回滚决策
    6. 记录决策供学习使用
    """
    
    def __init__(
        self,
        config: OrchestratorConfig,
        risk_assessor: RiskAssessor,
        strategy_selector: StrategySelector,
        health_checker: HealthChecker,
        executor: DeploymentExecutor,
        monitor: DeploymentMonitor,
        notifier: NotificationService,
        deployment_repo: DeploymentRepository,
        decision_repo: DecisionRepository,
    ):
        self.config = config
        self.risk_assessor = risk_assessor
        self.strategy_selector = strategy_selector
        self.health_checker = health_checker
        self.executor = executor
        self.monitor = monitor
        self.notifier = notifier
        self.deployment_repo = deployment_repo
        self.decision_repo = decision_repo
        
        # 并发控制
        self._semaphore = asyncio.Semaphore(config.max_concurrent_deployments)
        self._active_deployments: Dict[str, asyncio.Task] = {}
        
        self.log = logger.bind(component="orchestrator")
    
    async def submit_deployment(
        self,
        request: DeploymentRequest
    ) -> Deployment:
        """
        提交部署请求
        
        这是外部调用的唯一入口,负责:
        1. 创建部署记录
        2. 异步启动部署流程
        3. 返回部署对象(客户端通过轮询状态)
        """
        
        self.log.info(
            "Deployment request received",
            service=request.service_name,
            version=request.target_version,
            environment=request.environment,
            triggered_by=request.triggered_by
        )
        
        # 创建部署记录
        deployment = await self.deployment_repo.create({
            "id": str(uuid.uuid4()),
            "service_name": request.service_name,
            "target_version": request.target_version,
            "environment": request.environment,
            "triggered_by": request.triggered_by,
            "trigger_type": request.trigger_type,
            "status": DeploymentStatus.PENDING,
            "created_at": datetime.now(timezone.utc)
        })
        
        # 异步执行部署流程(不阻塞调用者)
        task = asyncio.create_task(
            self._execute_deployment_pipeline(deployment)
        )
        self._active_deployments[deployment.id] = task
        
        # 任务完成后清理
        task.add_done_callback(
            lambda t: self._active_deployments.pop(deployment.id, None)
        )
        
        return deployment
    
    async def _execute_deployment_pipeline(
        self,
        deployment: Deployment
    ) -> DeploymentResult:
        """
        部署流水线主流程
        
        这是核心状态机的实现,按顺序执行每个阶段:
        pending → risk_assessment → [awaiting_approval] → in_progress 
        → verifying → completed/rolled_back
        """
        
        async with self._semaphore:
            try:
                return await self._run_pipeline_steps(deployment)
            except asyncio.CancelledError:
                self.log.warning("Deployment cancelled", deployment_id=deployment.id)
                await self._handle_cancellation(deployment)
                raise
            except Exception as e:
                self.log.error(
                    "Deployment pipeline failed",
                    deployment_id=deployment.id,
                    error=str(e),
                    exc_info=True
                )
                await self._handle_pipeline_failure(deployment, e)
                raise
    
    async def _run_pipeline_steps(
        self,
        deployment: Deployment
    ) -> DeploymentResult:
        """执行完整的部署流水线步骤"""
        
        sm = DeploymentStateMachine(deployment)
        
        # ─────────────────────────────────────────────────────
        # 步骤1:前置健康检查
        # ─────────────────────────────────────────────────────
        self.log.info("Running pre-deployment health checks", 
                      deployment_id=deployment.id)
        
        health_result = await self.health_checker.check_all(
            service=deployment.service_name,
            environment=deployment.environment
        )
        
        if not health_result.is_healthy:
            await self._update_status(deployment, DeploymentStatus.FAILED, 
                                      reason=f"Pre-check failed: {health_result.reason}")
            await self.notifier.notify_failure(deployment, "pre_check_failed", health_result)
            return DeploymentResult.failure(deployment.id, "pre_check_failed")
        
        # ─────────────────────────────────────────────────────
        # 步骤2:风险评估
        # ─────────────────────────────────────────────────────
        await self._update_status(deployment, DeploymentStatus.RISK_ASSESSMENT)
        sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        
        risk_report = await self.risk_assessor.assess(deployment)
        
        self.log.info(
            "Risk assessment completed",
            deployment_id=deployment.id,
            risk_score=risk_report.overall_score,
            risk_factors=risk_report.risk_factors
        )
        
        # 记录风险评估决策
        await self.decision_repo.record({
            "deployment_id": deployment.id,
            "decision_type": "risk_assessment",
            "context_snapshot": deployment.to_dict(),
            "action": f"risk_score:{risk_report.overall_score:.3f}",
            "confidence": 1 - risk_report.overall_score,
            "reasoning": f"Risk factors: {', '.join(risk_report.risk_factors[:3])}",
            "path_used": "ml_model"
        })
        
        # ─────────────────────────────────────────────────────
        # 步骤3:高风险场景请求人工审批
        # ─────────────────────────────────────────────────────
        if risk_report.overall_score >= self.config.risk_score_threshold_for_approval:
            self.log.info(
                "High risk deployment requires approval",
                deployment_id=deployment.id,
                risk_score=risk_report.overall_score
            )
            
            await self._update_status(deployment, DeploymentStatus.AWAITING_APPROVAL)
            
            # 发送审批请求
            approval_result = await self.notifier.request_approval(
                deployment=deployment,
                risk_report=risk_report,
                timeout_seconds=3600  # 1小时超时
            )
            
            if not approval_result.approved:
                await self._update_status(deployment, DeploymentStatus.CANCELLED,
                                          reason="Rejected by reviewer")
                return DeploymentResult.cancelled(deployment.id, approval_result.reviewer)
        
        # ─────────────────────────────────────────────────────
        # 步骤4:选择部署策略
        # ─────────────────────────────────────────────────────
        strategy = await self.strategy_selector.select(
            deployment=deployment,
            risk_report=risk_report
        )
        
        self.log.info(
            "Deployment strategy selected",
            deployment_id=deployment.id,
            strategy=strategy.type,
            config=strategy.config
        )
        
        # ─────────────────────────────────────────────────────
        # 步骤5:执行部署
        # ─────────────────────────────────────────────────────
        await self._update_status(deployment, DeploymentStatus.IN_PROGRESS)
        sm.transition(DeploymentEvent.START_DEPLOYMENT)
        
        if self.config.dry_run_mode:
            self.log.info("DRY RUN: Skipping actual deployment")
            execution_result = MockExecutionResult(success=True)
        else:
            execution_result = await self.executor.execute(deployment, strategy)
        
        if not execution_result.success:
            self.log.error("Deployment execution failed", 
                          deployment_id=deployment.id,
                          error=execution_result.error)
            await self._trigger_rollback(deployment, reason=execution_result.error)
            return DeploymentResult.failure(deployment.id, execution_result.error)
        
        # ─────────────────────────────────────────────────────
        # 步骤6:验证观察期
        # ─────────────────────────────────────────────────────
        await self._update_status(deployment, DeploymentStatus.VERIFYING)
        sm.transition(DeploymentEvent.START_VERIFICATION)
        
        verify_result = await self.monitor.observe_and_validate(
            deployment=deployment,
            duration_minutes=self.config.canary_observation_minutes,
            thresholds=deployment.service_profile.slo_targets
        )
        
        if not verify_result.passed:
            self.log.warning(
                "Deployment verification failed",
                deployment_id=deployment.id,
                failed_checks=verify_result.failed_checks
            )
            
            if self.config.enable_auto_rollback:
                await self._trigger_rollback(
                    deployment, 
                    reason=f"Verification failed: {verify_result.summary}"
                )
                return DeploymentResult.rolled_back(deployment.id, verify_result.summary)
        
        # ─────────────────────────────────────────────────────
        # 步骤7:部署成功
        # ─────────────────────────────────────────────────────
        await self._update_status(deployment, DeploymentStatus.COMPLETED)
        sm.transition(DeploymentEvent.COMPLETE)
        
        # 发布成功通知
        await self.notifier.notify_success(deployment, verify_result)
        
        self.log.info(
            "Deployment completed successfully",
            deployment_id=deployment.id,
            duration_seconds=deployment.duration_seconds
        )
        
        return DeploymentResult.success(deployment.id, verify_result.metrics)
    
    async def _trigger_rollback(
        self,
        deployment: Deployment,
        reason: str
    ) -> None:
        """触发回滚流程"""
        
        self.log.warning(
            "Triggering rollback",
            deployment_id=deployment.id,
            reason=reason
        )
        
        await self._update_status(deployment, DeploymentStatus.ROLLING_BACK)
        
        try:
            rollback_result = await self.executor.rollback(
                deployment=deployment,
                timeout_seconds=self.config.rollback_timeout_seconds
            )
            
            if rollback_result.success:
                await self._update_status(deployment, DeploymentStatus.ROLLED_BACK,
                                          reason=reason)
                await self.notifier.notify_rollback(deployment, reason)
            else:
                # 回滚也失败了!这是严重问题
                await self._update_status(deployment, DeploymentStatus.FAILED,
                                          reason=f"Rollback also failed: {rollback_result.error}")
                await self.notifier.notify_critical(
                    deployment, 
                    f"CRITICAL: Rollback failed! Manual intervention required. "
                    f"Original error: {reason}. Rollback error: {rollback_result.error}"
                )
        except Exception as e:
            self.log.critical(
                "Rollback encountered exception",
                deployment_id=deployment.id,
                error=str(e),
                exc_info=True
            )
            raise
    
    async def _update_status(
        self,
        deployment: Deployment,
        status: DeploymentStatus,
        reason: Optional[str] = None
    ) -> None:
        """更新部署状态并持久化"""
        
        deployment.status = status
        deployment.updated_at = datetime.now(timezone.utc)
        
        if reason:
            deployment.status_reason = reason
        
        await self.deployment_repo.update_status(
            deployment_id=deployment.id,
            status=status,
            reason=reason
        )
    
    async def cancel_deployment(self, deployment_id: str, cancelled_by: str) -> bool:
        """取消正在进行的部署"""
        
        task = self._active_deployments.get(deployment_id)
        if task and not task.done():
            task.cancel()
            self.log.info("Deployment cancelled", 
                         deployment_id=deployment_id,
                         cancelled_by=cancelled_by)
            return True
        return False
    
    def get_active_deployments(self) -> List[str]:
        """获取当前活跃部署列表"""
        return list(self._active_deployments.keys())

部署状态机实现

# orchestrator/core/state_machine.py
"""
部署状态机:管理部署生命周期的状态转换
使用有限状态机确保状态变更的合法性
"""

from enum import Enum, auto
from typing import Set, Dict, Tuple
import structlog

logger = structlog.get_logger(__name__)


class DeploymentState(Enum):
    """部署状态枚举"""
    PENDING = "pending"
    RISK_ASSESSMENT = "risk_assessment"
    AWAITING_APPROVAL = "awaiting_approval"
    IN_PROGRESS = "in_progress"
    VERIFYING = "verifying"
    ROLLING_BACK = "rolling_back"
    COMPLETED = "completed"
    ROLLED_BACK = "rolled_back"
    FAILED = "failed"
    CANCELLED = "cancelled"


class DeploymentEvent(Enum):
    """状态转换事件"""
    START_RISK_ASSESSMENT = auto()
    RISK_HIGH = auto()           # 高风险,需要审批
    RISK_ACCEPTABLE = auto()     # 风险可接受,直接部署
    APPROVE = auto()             # 人工审批通过
    REJECT = auto()              # 人工审批拒绝
    START_DEPLOYMENT = auto()
    DEPLOYMENT_FAILED = auto()
    START_VERIFICATION = auto()
    VERIFICATION_PASSED = auto()
    VERIFICATION_FAILED = auto()
    COMPLETE = auto()
    START_ROLLBACK = auto()
    ROLLBACK_COMPLETED = auto()
    ROLLBACK_FAILED = auto()
    CANCEL = auto()
    FATAL_ERROR = auto()


class InvalidStateTransitionError(Exception):
    """非法状态转换异常"""
    pass


class DeploymentStateMachine:
    """
    部署状态机
    
    确保状态变更是合法的,防止出现不一致状态
    """
    
    # 合法状态转换表:{(当前状态, 事件): 目标状态}
    TRANSITIONS: Dict[Tuple[DeploymentState, DeploymentEvent], DeploymentState] = {
        # 启动风险评估
        (DeploymentState.PENDING, DeploymentEvent.START_RISK_ASSESSMENT): 
            DeploymentState.RISK_ASSESSMENT,
        
        # 风险评估完成
        (DeploymentState.RISK_ASSESSMENT, DeploymentEvent.RISK_HIGH): 
            DeploymentState.AWAITING_APPROVAL,
        (DeploymentState.RISK_ASSESSMENT, DeploymentEvent.RISK_ACCEPTABLE): 
            DeploymentState.IN_PROGRESS,
        
        # 审批处理
        (DeploymentState.AWAITING_APPROVAL, DeploymentEvent.APPROVE): 
            DeploymentState.IN_PROGRESS,
        (DeploymentState.AWAITING_APPROVAL, DeploymentEvent.REJECT): 
            DeploymentState.CANCELLED,
        
        # 部署执行
        (DeploymentState.IN_PROGRESS, DeploymentEvent.START_VERIFICATION): 
            DeploymentState.VERIFYING,
        (DeploymentState.IN_PROGRESS, DeploymentEvent.DEPLOYMENT_FAILED): 
            DeploymentState.ROLLING_BACK,
        
        # 验证观察
        (DeploymentState.VERIFYING, DeploymentEvent.VERIFICATION_PASSED): 
            DeploymentState.COMPLETED,
        (DeploymentState.VERIFYING, DeploymentEvent.VERIFICATION_FAILED): 
            DeploymentState.ROLLING_BACK,
        (DeploymentState.VERIFYING, DeploymentEvent.COMPLETE): 
            DeploymentState.COMPLETED,
        
        # 回滚流程
        (DeploymentState.ROLLING_BACK, DeploymentEvent.ROLLBACK_COMPLETED): 
            DeploymentState.ROLLED_BACK,
        (DeploymentState.ROLLING_BACK, DeploymentEvent.ROLLBACK_FAILED): 
            DeploymentState.FAILED,
        
        # 取消(可在多个阶段触发)
        (DeploymentState.PENDING, DeploymentEvent.CANCEL): 
            DeploymentState.CANCELLED,
        (DeploymentState.RISK_ASSESSMENT, DeploymentEvent.CANCEL): 
            DeploymentState.CANCELLED,
        (DeploymentState.AWAITING_APPROVAL, DeploymentEvent.CANCEL): 
            DeploymentState.CANCELLED,
        (DeploymentState.IN_PROGRESS, DeploymentEvent.CANCEL): 
            DeploymentState.ROLLING_BACK,
        
        # 致命错误
        (DeploymentState.RISK_ASSESSMENT, DeploymentEvent.FATAL_ERROR): 
            DeploymentState.FAILED,
        (DeploymentState.IN_PROGRESS, DeploymentEvent.FATAL_ERROR): 
            DeploymentState.FAILED,
        (DeploymentState.VERIFYING, DeploymentEvent.FATAL_ERROR): 
            DeploymentState.FAILED,
    }
    
    def __init__(self, deployment):
        self.deployment = deployment
        self.current_state = DeploymentState(deployment.status)
        self.history: list = []
        self.log = logger.bind(deployment_id=deployment.id)
    
    def transition(self, event: DeploymentEvent) -> DeploymentState:
        """
        执行状态转换
        
        Args:
            event: 触发事件
        
        Returns:
            新状态
        
        Raises:
            InvalidStateTransitionError: 如果状态转换不合法
        """
        
        transition_key = (self.current_state, event)
        
        if transition_key not in self.TRANSITIONS:
            raise InvalidStateTransitionError(
                f"Invalid transition: {self.current_state.value} + {event.name} "
                f"has no defined target state"
            )
        
        new_state = self.TRANSITIONS[transition_key]
        
        self.log.info(
            "State transition",
            from_state=self.current_state.value,
            event=event.name,
            to_state=new_state.value
        )
        
        # 记录转换历史
        self.history.append({
            "from": self.current_state.value,
            "event": event.name,
            "to": new_state.value,
            "timestamp": __import__('time').time()
        })
        
        self.current_state = new_state
        return new_state
    
    def can_transition(self, event: DeploymentEvent) -> bool:
        """检查是否可以执行指定事件的状态转换"""
        return (self.current_state, event) in self.TRANSITIONS
    
    def available_events(self) -> Set[DeploymentEvent]:
        """获取当前状态下可用的事件列表"""
        return {
            event for (state, event), _ in self.TRANSITIONS.items()
            if state == self.current_state
        }

3.4.3 消息总线集成

# orchestrator/messaging/kafka_client.py
"""
Kafka消息总线客户端
支持生产者和消费者两种模式
"""

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.errors import KafkaConnectionError
import json
import asyncio
from typing import Callable, Any, Optional
import structlog

logger = structlog.get_logger(__name__)


class HarnessMessageBus:
    """
    Harness平台消息总线
    
    封装Kafka的生产/消费逻辑,提供高级别的事件发布/订阅接口
    """
    
    # Topic常量
    TOPIC_DEPLOYMENT_EVENTS = "harness.deployments.events"
    TOPIC_DECISIONS = "harness.decisions.made"
    TOPIC_ALERTS = "harness.alerts.triggered"
    TOPIC_ROLLBACKS = "harness.rollback.triggered"
    TOPIC_LEARNING = "harness.learning.experiences"
    
    def __init__(self, bootstrap_servers: str):
        self.bootstrap_servers = bootstrap_servers
        self._producer: Optional[AIOKafkaProducer] = None
        self._consumers: dict = {}
        self._handlers: dict = {}
        self.log = logger.bind(component="message_bus")
    
    async def connect(self):
        """建立Kafka连接"""
        self._producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode(),
            key_serializer=lambda k: k.encode() if isinstance(k, str) else k,
            # 可靠性配置
            acks='all',                    # 所有副本确认
            enable_idempotence=True,       # 幂等生产者
            max_in_flight_requests_per_connection=5,
            compression_type='gzip',       # 压缩减少网络开销
            # 重试配置
            retries=10,
            retry_backoff_ms=200,
        )
        
        await self._producer.start()
        self.log.info("Kafka producer connected", servers=self.bootstrap_servers)
    
    async def disconnect(self):
        """断开连接"""
        if self._producer:
            await self._producer.stop()
        for consumer in self._consumers.values():
            await consumer.stop()
        self.log.info("Kafka connections closed")
    
    async def publish_deployment_event(
        self,
        deployment_id: str,
        event_type: str,
        payload: dict
    ) -> None:
        """发布部署事件"""
        
        message = {
            "event_type": event_type,
            "deployment_id": deployment_id,
            "payload": payload,
            "timestamp": __import__('time').time(),
            "source": "orchestrator"
        }
        
        await self._publish(
            topic=self.TOPIC_DEPLOYMENT_EVENTS,
            key=deployment_id,  # 按deployment_id分区,保证同一部署的事件有序
            message=message
        )
    
    async def publish_learning_experience(
        self,
        deployment_id: str,
        state_before: dict,
        action: str,
        reward: float,
        state_after: dict,
        metadata: dict = None
    ) -> None:
        """发布学习经验(供进化层消费)"""
        
        message = {
            "deployment_id": deployment_id,
            "experience": {
                "state": state_before,
                "action": action,
                "reward": reward,
                "next_state": state_after,
            },
            "metadata": metadata or {},
            "timestamp": __import__('time').time()
        }
        
        await self._publish(
            topic=self.TOPIC_LEARNING,
            key=deployment_id,
            message=message
        )
    
    async def subscribe(
        self,
        topic: str,
        handler: Callable,
        group_id: str,
        from_beginning: bool = False
    ) -> None:
        """订阅Topic并注册处理器"""
        
        consumer = AIOKafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode()),
            # 消费者配置
            auto_offset_reset='earliest' if from_beginning else 'latest',
            enable_auto_commit=False,  # 手动提交,确保处理后才确认
            max_poll_interval_ms=300000,  # 5分钟处理超时
        )
        
        await consumer.start()
        self._consumers[f"{topic}:{group_id}"] = consumer
        
        # 启动消费循环
        asyncio.create_task(
            self._consume_loop(consumer, handler, topic)
        )
        
        self.log.info("Subscribed to topic", topic=topic, group_id=group_id)
    
    async def _consume_loop(
        self,
        consumer: AIOKafkaConsumer,
        handler: Callable,
        topic: str
    ) -> None:
        """消费循环"""
        
        async for msg in consumer:
            try:
                await handler(msg.value)
                await consumer.commit()  # 处理成功后提交offset
            except Exception as e:
                self.log.error(
                    "Message handling failed",
                    topic=topic,
                    offset=msg.offset,
                    error=str(e),
                    exc_info=True
                )
                # 记录到死信队列,不影响其他消息的处理
                await self._send_to_dlq(msg, str(e))
    
    async def _publish(self, topic: str, key: str, message: dict) -> None:
        """内部发布方法"""
        if not self._producer:
            raise RuntimeError("Message bus not connected")
        
        await self._producer.send_and_wait(
            topic=topic,
            key=key,
            value=message
        )

3.4.4 基础感知能力

# collector/perception/metrics_collector.py
"""
指标采集器:从Prometheus采集运行时指标
"""

from prometheus_client.parser import text_string_to_metric_families
import aiohttp
import asyncio
from typing import Dict, List, Optional
import structlog
from dataclasses import dataclass

logger = structlog.get_logger(__name__)


@dataclass
class ServiceMetrics:
    """服务当前指标快照"""
    service_name: str
    timestamp: float
    
    # RED指标
    request_rate: float = 0.0        # 请求/秒
    error_rate: float = 0.0          # 错误率 (0-1)
    p50_latency_ms: float = 0.0
    p95_latency_ms: float = 0.0
    p99_latency_ms: float = 0.0
    
    # 资源指标
    cpu_usage_pct: float = 0.0
    memory_usage_pct: float = 0.0
    
    # 可用性
    availability: float = 1.0
    
    # 原始数据(供特征工程使用)
    raw_metrics: Dict = None


class PrometheusCollector:
    """
    Prometheus指标采集器
    
    使用PromQL查询关键指标,构建服务健康状态
    """
    
    def __init__(self, prometheus_url: str, timeout_seconds: int = 10):
        self.prometheus_url = prometheus_url.rstrip('/')
        self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        self.log = logger.bind(component="prometheus_collector")
    
    async def collect_service_metrics(
        self,
        service_name: str,
        time_window_minutes: int = 5
    ) -> ServiceMetrics:
        """采集指定服务的完整指标"""
        
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            # 并行查询所有指标(减少采集延迟)
            results = await asyncio.gather(
                self._query(session, f"""
                    sum(rate(http_requests_total{{service="{service_name}"}}[{time_window_minutes}m]))
                """),
                self._query(session, f"""
                    sum(rate(http_requests_total{{service="{service_name}",status=~"5.."}}[{time_window_minutes}m]))
                    / sum(rate(http_requests_total{{service="{service_name}"}}[{time_window_minutes}m]))
                """),
                self._query(session, f"""
                    histogram_quantile(0.99, sum(rate(
                        http_request_duration_seconds_bucket{{service="{service_name}"}}[{time_window_minutes}m]
                    )) by (le)) * 1000
                """),
                self._query(session, f"""
                    avg(container_cpu_usage_seconds_total{{pod=~"{service_name}-.*"}})
                """),
                return_exceptions=True
            )
        
        request_rate, error_rate, p99_latency, cpu_usage = results
        
        return ServiceMetrics(
            service_name=service_name,
            timestamp=__import__('time').time(),
            request_rate=self._safe_float(request_rate),
            error_rate=self._safe_float(error_rate),
            p99_latency_ms=self._safe_float(p99_latency),
            cpu_usage_pct=self._safe_float(cpu_usage) * 100,
        )
    
    async def _query(
        self,
        session: aiohttp.ClientSession,
        promql: str
    ) -> Optional[float]:
        """执行PromQL查询"""
        
        try:
            async with session.get(
                f"{self.prometheus_url}/api/v1/query",
                params={"query": promql.strip()}
            ) as resp:
                if resp.status != 200:
                    return None
                
                data = await resp.json()
                
                if data["status"] == "success" and data["data"]["result"]:
                    return float(data["data"]["result"][0]["value"][1])
                
                return None
                
        except Exception as e:
            self.log.warning("Prometheus query failed", 
                            error=str(e), query=promql[:100])
            return None
    
    def _safe_float(self, value) -> float:
        """安全转换为float,异常时返回0.0"""
        if isinstance(value, Exception) or value is None:
            return 0.0
        try:
            return float(value)
        except (TypeError, ValueError):
            return 0.0

3.4.5 简单决策引擎

# decision/rule_engine.py
"""
规则引擎:基于DSL定义的规则进行快速决策
"""

import yaml
from typing import Any, Callable, Dict, List, Optional
from dataclasses import dataclass, field
import operator
import structlog

logger = structlog.get_logger(__name__)


@dataclass
class Rule:
    """规则定义"""
    name: str
    description: str
    priority: int         # 数字越小优先级越高
    
    # 条件(AND关系)
    conditions: List[dict]
    
    # 触发动作
    action: str
    action_params: dict = field(default_factory=dict)
    
    # 元数据
    author: str = "system"
    is_active: bool = True
    tags: List[str] = field(default_factory=list)


@dataclass
class RuleMatchResult:
    """规则匹配结果"""
    matched: bool
    triggered_rule: Optional[Rule] = None
    confidence: float = 0.0
    action: Optional[str] = None
    action_params: dict = field(default_factory=dict)
    reasoning: str = ""


class ConditionEvaluator:
    """
    条件表达式求值器
    
    支持的操作符:
    - gt, gte, lt, lte: 数值比较
    - eq, neq: 相等判断
    - contains, not_contains: 列表/字符串包含
    - exists, not_exists: 字段存在性
    - between: 范围检查
    """
    
    OPERATORS: Dict[str, Callable] = {
        "gt": operator.gt,
        "gte": operator.ge,
        "lt": operator.lt,
        "lte": operator.le,
        "eq": operator.eq,
        "neq": operator.ne,
        "contains": lambda a, b: b in a,
        "not_contains": lambda a, b: b not in a,
        "exists": lambda a, _: a is not None,
        "not_exists": lambda a, _: a is None,
        "between": lambda a, b: b[0] <= a <= b[1],
    }
    
    def evaluate(self, condition: dict, context: dict) -> bool:
        """
        评估单个条件
        
        条件格式:
        {
          "field": "metrics.error_rate",  # 支持点号路径
          "op": "gt",
          "value": 0.05
        }
        """
        field_path = condition["field"]
        op = condition["op"]
        expected = condition.get("value")
        
        # 支持点号路径访问(如 metrics.error_rate)
        actual = self._get_nested(context, field_path)
        
        if op not in self.OPERATORS:
            raise ValueError(f"Unknown operator: {op}")
        
        try:
            return self.OPERATORS[op](actual, expected)
        except (TypeError, KeyError):
            return False
    
    def _get_nested(self, data: dict, path: str) -> Any:
        """点号路径访问嵌套字典"""
        keys = path.split(".")
        current = data
        for key in keys:
            if isinstance(current, dict) and key in current:
                current = current[key]
            else:
                return None
        return current


class RuleEngine:
    """
    规则引擎:加载YAML规则文件并进行条件匹配
    """
    
    def __init__(self):
        self.rules: List[Rule] = []
        self.evaluator = ConditionEvaluator()
        self.log = logger.bind(component="rule_engine")
    
    def load_from_yaml(self, yaml_content: str) -> int:
        """从YAML加载规则集,返回加载的规则数量"""
        
        data = yaml.safe_load(yaml_content)
        loaded_count = 0
        
        for rule_data in data.get("rules", []):
            try:
                rule = Rule(
                    name=rule_data["name"],
                    description=rule_data.get("description", ""),
                    priority=rule_data.get("priority", 100),
                    conditions=rule_data["conditions"],
                    action=rule_data["action"],
                    action_params=rule_data.get("action_params", {}),
                    author=rule_data.get("author", "system"),
                    is_active=rule_data.get("is_active", True),
                    tags=rule_data.get("tags", [])
                )
                self.rules.append(rule)
                loaded_count += 1
            except KeyError as e:
                self.log.warning(f"Invalid rule definition, missing field: {e}",
                                rule_name=rule_data.get("name", "unknown"))
        
        # 按优先级排序
        self.rules.sort(key=lambda r: r.priority)
        self.log.info(f"Loaded {loaded_count} rules")
        
        return loaded_count
    
    def match(self, context: dict) -> RuleMatchResult:
        """
        在给定上下文中匹配规则
        
        返回第一个所有条件都满足的规则(按优先级排序)
        """
        
        active_rules = [r for r in self.rules if r.is_active]
        
        for rule in active_rules:
            if self._evaluate_conditions(rule.conditions, context):
                
                self.log.info(
                    "Rule matched",
                    rule_name=rule.name,
                    action=rule.action
                )
                
                return RuleMatchResult(
                    matched=True,
                    triggered_rule=rule,
                    confidence=0.95,  # 规则匹配是确定性的
                    action=rule.action,
                    action_params=rule.action_params,
                    reasoning=f"Rule '{rule.name}' triggered: {rule.description}"
                )
        
        return RuleMatchResult(matched=False, reasoning="No rule matched")
    
    def _evaluate_conditions(
        self, 
        conditions: List[dict], 
        context: dict
    ) -> bool:
        """评估规则的所有条件(AND关系)"""
        
        return all(
            self.evaluator.evaluate(condition, context)
            for condition in conditions
        )


# 示例规则YAML配置
EXAMPLE_RULES_YAML = """
rules:

  # P0:生产故障自动回滚(最高优先级)
  - name: auto_rollback_on_high_error_rate
    description: "错误率超过5%时自动回滚"
    priority: 1
    conditions:
      - field: "metrics.error_rate"
        op: "gt"
        value: 0.05
      - field: "deployment.status"
        op: "eq"
        value: "in_progress"
      - field: "deployment.environment"
        op: "eq"
        value: "production"
    action: "rollback"
    action_params:
      reason: "error_rate_exceeded_threshold"
      urgency: "immediate"
    tags: ["production", "safety", "auto_rollback"]

  # P1:P99延迟恶化回滚
  - name: rollback_on_latency_degradation
    description: "P99延迟超过1秒,触发回滚"
    priority: 2
    conditions:
      - field: "metrics.p99_latency_ms"
        op: "gt"
        value: 1000
      - field: "deployment.status"
        op: "eq"
        value: "verifying"
    action: "rollback"
    action_params:
      reason: "latency_degradation"
      urgency: "high"

  # P2:Pod启动失败告警
  - name: alert_on_pod_crash_loop
    description: "Pod处于CrashLoopBackOff状态超过3分钟"
    priority: 10
    conditions:
      - field: "k8s.pod_crash_loop_count"
        op: "gt"
        value: 3
      - field: "k8s.pod_crash_duration_minutes"
        op: "gt"
        value: 3
    action: "alert_oncall"
    action_params:
      severity: "P1"
      channel: "deployment-alerts"

  # P3:高峰期阻止部署
  - name: block_deployment_during_peak_hours
    description: "工作日上午10点到下午2点(高峰期)不允许部署"
    priority: 20
    conditions:
      - field: "context.is_peak_hours"
        op: "eq"
        value: true
      - field: "deployment.environment"
        op: "eq"
        value: "production"
      - field: "deployment.trigger_type"
        op: "neq"
        value: "hotfix"  # 热修复例外
    action: "block_and_schedule"
    action_params:
      schedule_for: "off_peak"
      reason: "peak_hours_restriction"

  # P4:自动扩容
  - name: scale_up_on_high_cpu
    description: "CPU使用率超过80%,触发扩容"
    priority: 30
    conditions:
      - field: "metrics.cpu_usage_pct"
        op: "gt"
        value: 80
      - field: "k8s.current_replicas"
        op: "lt"
        value: 10  # 不超过最大副本数
    action: "scale_up"
    action_params:
      increment: 2
      max_replicas: 10
"""

3.5 真实案例:大型互联网公司的Agent架构实践

3.5.1 案例一:Airbnb的Intelligent Delivery系统

背景与挑战

Airbnb在2019年面临严峻的工程挑战:超过300个微服务,每天需要处理1000+次部署请求,但人工驾驭的发布流程已经成为工程效率的最大瓶颈。

具体挑战数据:

  • 平均每次部署需要工程师手动监控45分钟
  • 每月有12-18起因为人为失误导致的回滚事件
  • 高级工程师的30%时间用于发布相关的人工干预
  • 节假日(如感恩节)冻结期前的"发布冲刺"造成大量技术债

架构演进过程

Airbnb的智能发布系统经历了三个阶段的演进:

Phase 1(2019-2020):规则化自动化

第一阶段的目标是将最常见的人工决策场景自动化。核心是建立基于规则的安全门控系统:

Airbnb Phase 1 架构:
┌─────────────────────────────────────────────────────────┐
│                  部署触发(CI/CD)                         │
└────────────────────────┬────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────┐
│              Safety Gates(安全门控)                      │
│  • 单元测试通过率 > 98%                                    │
│  • 集成测试通过                                            │
│  • 代码覆盖率不降低                                        │
│  • 无已知安全漏洞(依赖扫描)                               │
└────────────────────────┬────────────────────────────────┘
                         │ 通过
                         ▼
┌─────────────────────────────────────────────────────────┐
│              Canary发布(基于规则)                         │
│  • 5% → 25% → 50% → 100%                                │
│  • 每步观察10分钟                                          │
│  • 自动监控错误率和延迟                                     │
│  • 阈值超限自动回滚                                         │
└─────────────────────────────────────────────────────────┘

效果:
  - 部署工程师每次部署时间:45分钟 → 15分钟(减少67%)
  - 自动化率:40%(低风险部署全自动,高风险仍需人工)

Phase 2(2020-2021):风险评分模型

第二阶段引入了机器学习风险评分,这是Airbnb Intelligent Delivery系统的核心创新:

核心思路:能否预测"这次部署会出问题吗?"

数据来源(特征工程):

# Airbnb风险评分模型的核心特征(公开资料整理)

RISK_FEATURES = {
    # 代码维度
    "code_change_size": "代码变更行数(经验法则:越大风险越高)",
    "file_change_count": "修改的文件数量",
    "test_coverage_delta": "测试覆盖率变化(降低=高风险)",
    "previous_deployment_failure": "最近3次部署是否有失败",
    "days_since_last_deploy": "距离上次成功部署的天数(太久=工程师不熟悉)",
    
    # 服务维度
    "service_criticality": "服务重要性等级",
    "service_age_days": "服务创建天数(新服务更容易出问题)",
    "dependency_count": "直接依赖的服务数量",
    "historical_failure_rate": "该服务近30天部署失败率",
    
    # 环境维度
    "hour_of_day": "发布时间(0-23)",
    "day_of_week": "周几(周五下午是高风险窗口)",
    "active_incidents": "当前是否有活跃事故",
    "traffic_multiplier": "当前流量相对于正常水平的倍数",
    
    # 工程师维度(匿名化)
    "deployer_experience_score": "发布者的历史成功率",
    "time_since_last_deploy": "工程师上次操作到现在的时间",
}

模型架构选择:

Airbnb选择了**Gradient Boosting(LightGBM)**而非深度学习,原因是:

  1. 可解释性:能解释为什么这次部署风险高
  2. 训练数据量不足以支撑深度学习(约10万条历史部署)
  3. 低延迟要求(评分必须在200ms内完成)
# 简化版的Airbnb风险评分模型实现

import lightgbm as lgb
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_recall_curve

class DeploymentRiskModel:
    """
    基于LightGBM的部署风险评分模型
    
    目标:预测"这次部署在24小时内发生回滚的概率"
    """
    
    def __init__(self):
        self.model = None
        self.feature_names = list(RISK_FEATURES.keys())
        self.threshold = 0.35  # 超过35%认为高风险
    
    def train(self, historical_deployments: list):
        """从历史部署数据训练模型"""
        
        # 提取特征和标签
        X, y = [], []
        for dep in historical_deployments:
            features = self._extract_features(dep)
            label = 1 if dep.outcome in ['rolled_back', 'failed'] else 0
            X.append(features)
            y.append(label)
        
        X = np.array(X)
        y = np.array(y)
        
        # 数据集分割
        X_train, X_val, y_train, y_val = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # LightGBM参数
        params = {
            'objective': 'binary',
            'metric': ['auc', 'binary_logloss'],
            'learning_rate': 0.05,
            'num_leaves': 31,
            'min_child_samples': 20,
            'feature_fraction': 0.8,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            # 处理类别不平衡(大多数部署是成功的)
            'scale_pos_weight': (y_train == 0).sum() / (y_train == 1).sum(),
            'verbose': -1
        }
        
        # 训练
        train_data = lgb.Dataset(X_train, label=y_train)
        val_data = lgb.Dataset(X_val, label=y_val)
        
        self.model = lgb.train(
            params,
            train_data,
            num_boost_round=500,
            valid_sets=[train_data, val_data],
            callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)]
        )
        
        # 评估
        val_pred = self.model.predict(X_val)
        auc = roc_auc_score(y_val, val_pred)
        print(f"Validation AUC: {auc:.4f}")
        
        # 找到最优阈值
        precision, recall, thresholds = precision_recall_curve(y_val, val_pred)
        f1_scores = 2 * precision * recall / (precision + recall + 1e-10)
        self.threshold = thresholds[np.argmax(f1_scores)]
        
        return {"auc": auc, "optimal_threshold": self.threshold}
    
    def predict_risk(self, deployment_features: dict) -> dict:
        """
        预测单次部署的风险
        
        Returns:
            {
                "risk_score": 0.0-1.0,
                "risk_level": "low|medium|high|critical",
                "top_risk_factors": [{"factor": "...", "importance": 0.xx}, ...],
                "recommendation": "proceed|review|block"
            }
        """
        
        features = self._extract_features(deployment_features)
        risk_score = float(self.model.predict([features])[0])
        
        # 计算特征重要性(用于解释)
        importance = self.model.feature_importance(importance_type='gain')
        top_factors = sorted(
            zip(self.feature_names, features, importance),
            key=lambda x: x[2],
            reverse=True
        )[:5]
        
        return {
            "risk_score": risk_score,
            "risk_level": self._categorize_risk(risk_score),
            "top_risk_factors": [
                {"factor": f, "value": v, "importance": i}
                for f, v, i in top_factors
            ],
            "recommendation": self._get_recommendation(risk_score)
        }
    
    def _categorize_risk(self, score: float) -> str:
        if score < 0.2: return "low"
        if score < 0.4: return "medium"
        if score < 0.7: return "high"
        return "critical"
    
    def _get_recommendation(self, score: float) -> str:
        if score < self.threshold: return "proceed"
        if score < 0.7: return "review"
        return "block"

效果数据

Phase 2实施后Airbnb的提升数据:

  • 故障率:降低60%(由风险模型阻止了大量高风险部署)
  • MTTR(平均恢复时间):从45分钟降至9分钟(减少80%)
  • 自动化率:从40%提升到85%
  • 工程师满意度:从3.2/5提升到4.1/5

Phase 3(2021-2022):LLM增强决策

第三阶段引入了大语言模型处理边缘案例和生成人类可读的决策解释:

关键应用场景:
1. 异常根因分析:当指标异常时,LLM分析日志和事件生成根因假设
2. 部署说明理解:解析工程师写的部署说明,提取风险因素
3. 历史事故检索:语义搜索相似历史事故,提供参考案例
4. 决策解释生成:为每个自动化决策生成自然语言解释报告

示例LLM决策报告:
---
"基于对此次部署的分析:

风险等级:HIGH (0.73)

主要风险因素:
1. 代码变更规模较大(+1847行),增加了引入回归的概率
2. payment-service是关键服务,历史上3/20次部署发生过故障
3. 当前时间为工作日下午6点,流量是平时的1.4倍
4. 5个服务依赖payment-service,故障影响范围大

建议:
请将初始Canary流量降低到1%(而非默认5%),并将观察时间从5分钟
延长到15分钟,重点关注支付成功率指标。

如需继续,请点击「确认高风险部署」,或选择「推迟到低峰期」。"
---

3.5.2 案例二:Shopify的智能发布平台

大促场景的特殊挑战

Shopify的工程挑战在于极端的流量波峰。黑色星期五期间,流量峰值可达正常的10倍。在这种环境下,部署的每一个决策都牵一发而动全身。

Shopify发布了名为Shipit的智能发布平台(部分开源),核心设计理念是:

“把每次部署都当成潜在的生产事故来预防,而不是事后处理。”

预测性扩容:超前感知流量

class PredictiveScalingAgent:
    """
    预测性扩容Agent
    
    核心:在流量到达之前提前扩容,而非等到CPU告警再扩容
    响应时间差:Pod启动通常需要60-120秒,如果等CPU>80%才扩容,
    前2分钟的流量会被压垮
    """
    
    def __init__(
        self,
        traffic_forecaster: TrafficForecaster,
        kubernetes_client: K8sClient
    ):
        self.forecaster = traffic_forecaster
        self.k8s = kubernetes_client
        
        # 安全缓冲:比预测流量多准备20%的容量
        self.safety_buffer = 1.2
        
        # 提前量:提前多少分钟开始扩容
        self.lead_time_minutes = 3
    
    async def run_prediction_loop(self):
        """持续运行的预测扩容循环"""
        
        while True:
            try:
                # 预测未来lead_time_minutes后的流量
                forecast = await self.forecaster.predict(
                    horizon_minutes=self.lead_time_minutes
                )
                
                for service, predicted_rps in forecast.items():
                    # 当前实际处理能力
                    current_capacity = await self._get_current_capacity(service)
                    
                    # 需要的目标容量(含安全缓冲)
                    required_capacity = predicted_rps * self.safety_buffer
                    
                    if required_capacity > current_capacity * 0.8:
                        # 预测流量将超过当前容量的80%,需要扩容
                        target_replicas = self._calculate_replicas(
                            required_capacity, 
                            service
                        )
                        
                        await self.k8s.scale_deployment(
                            service=service,
                            replicas=target_replicas,
                            reason=f"Predictive scaling: expected {predicted_rps:.0f} RPS in {self.lead_time_minutes}m"
                        )
            
            except Exception as e:
                logger.error("Prediction loop error", error=str(e))
            
            await asyncio.sleep(60)  # 每分钟检查一次
    
    async def _get_current_capacity(self, service: str) -> float:
        """计算当前服务的最大处理能力(RPS)"""
        
        deployment = await self.k8s.get_deployment(service)
        replicas = deployment.spec.replicas
        
        # 每个Pod的吞吐量基准(从历史数据学习)
        throughput_per_pod = await self._get_pod_throughput_baseline(service)
        
        return replicas * throughput_per_pod

实时回滚决策的实现

Shopify的一个关键创新是"智能回滚触发器",它能区分部署导致的问题和环境噪声:

class SmartRollbackDecisionEngine:
    """
    智能回滚决策引擎
    
    核心问题:我们如何知道指标恶化是因为新部署,还是因为:
    - 外部依赖(数据库、第三方API)出问题
    - 季节性流量波动
    - 其他服务的问题传播过来
    
    Shopify的解法:因果归因分析(Causal Attribution)
    """
    
    async def should_rollback(
        self,
        deployment: Deployment,
        current_metrics: ServiceMetrics
    ) -> RollbackDecision:
        
        # 1. 统计显著性检验:与基准对比是否显著恶化
        baseline = await self._get_baseline_metrics(
            service=deployment.service_name,
            lookback_hours=24
        )
        
        degradation = self._compute_degradation(current_metrics, baseline)
        
        if not degradation.is_statistically_significant():
            return RollbackDecision(
                should_rollback=False,
                confidence=0.9,
                reason="Metrics within normal variance"
            )
        
        # 2. 因果归因:恶化是否源自本次部署?
        causal_score = await self._compute_causal_attribution(
            deployment=deployment,
            degradation=degradation
        )
        
        # 3. 外部因素检查
        external_factors = await self._check_external_factors(
            deployment.service_name
        )
        
        # 综合判断
        if causal_score > 0.7 and not external_factors.has_issues:
            return RollbackDecision(
                should_rollback=True,
                confidence=causal_score,
                reason=f"Deployment likely caused degradation (causal score: {causal_score:.2f})"
            )
        elif external_factors.has_issues:
            return RollbackDecision(
                should_rollback=False,
                confidence=0.6,
                reason=f"External factors may explain degradation: {external_factors.description}"
            )
        else:
            return RollbackDecision(
                should_rollback=None,  # 不确定,请求人工判断
                confidence=0.3,
                reason="Ambiguous signal, requesting human review"
            )
    
    async def _compute_causal_attribution(
        self,
        deployment: Deployment,
        degradation: DegradationReport
    ) -> float:
        """
        因果归因评分(0-1,越高表示部署越可能是原因)
        
        核心逻辑:
        1. 时间相关性:恶化开始时间是否与部署时间高度相关?
        2. 服务局部性:只有这个服务恶化,还是系统性问题?
        3. 变更历史:这类变更历史上是否经常导致此类问题?
        """
        
        # 时间相关性分析
        degradation_start = degradation.start_timestamp
        deployment_complete = deployment.completed_at.timestamp()
        
        time_correlation = max(0, 1 - abs(degradation_start - deployment_complete) / 300)
        # 5分钟内的时间相关性 = 1,超过5分钟线性衰减
        
        # 局部性分析(其他服务是否正常)
        peer_health = await self._check_peer_services(deployment.service_name)
        locality_score = 1.0 if peer_health.all_healthy else 0.3
        
        # 历史模式匹配
        similar_incidents = await self._find_similar_historical_incidents(
            service=deployment.service_name,
            change_type=deployment.change_classification
        )
        history_score = min(1.0, len(similar_incidents) / 3)
        
        # 加权综合
        causal_score = (
            time_correlation * 0.5 +
            locality_score * 0.3 +
            history_score * 0.2
        )
        
        return causal_score

3.5.3 案例三:Stripe的支付系统零停机发布

金融级可靠性的架构要求

Stripe的挑战是最严苛的:支付系统不允许任何停机,每秒处理数十万笔交易,任何配置错误都可能造成直接的经济损失。

Stripe的发布系统有两个与众不同的特点:

特点一:形式化验证在关键路径的应用

Stripe使用TLA+对关键状态机进行形式化验证,确保在理论层面不存在死锁和竞态条件:

Stripe的TLA+验证的核心场景:
1. 滚动部署过程中的并发请求处理
2. 多区域切换时的一致性保证
3. 数据库迁移的原子性

形式化验证的价值:
- 在代码运行之前,用数学方法证明逻辑正确性
- 发现人工代码审查无法发现的边缘案例
- 为生产事故调查提供精确的因果分析工具

特点二:多区域协调发布

class MultiRegionDeploymentCoordinator:
    """
    多区域协调发布器
    
    Stripe的全球部署策略:
    1. 先在最小流量区域(如新加坡)部署
    2. 观察30分钟,验证核心指标
    3. 逐步扩展到欧洲、美国东部、美国西部
    4. 全程保持回滚能力
    
    关键约束:
    - 任何时刻最多两个区域处于不同版本(避免API兼容性问题)
    - 区域间版本差异不超过1个版本
    """
    
    DEPLOYMENT_ORDER = [
        {"region": "ap-southeast-1", "weight": 0.05},  # 新加坡:5%流量
        {"region": "eu-west-1", "weight": 0.25},        # 爱尔兰:25%
        {"region": "us-east-1", "weight": 0.45},        # 弗吉尼亚:45%
        {"region": "us-west-2", "weight": 0.25},        # 俄勒冈:25%
    ]
    
    async def deploy_globally(self, release: ReleaseBundle) -> GlobalDeployResult:
        """
        全球渐进式部署
        
        Args:
            release: 包含所有区域的部署包(预构建好的不可变制品)
        """
        
        deployed_regions = []
        
        for region_config in self.DEPLOYMENT_ORDER:
            region = region_config["region"]
            
            logger.info(f"Starting deployment to {region}")
            
            # 1. 部署到当前区域
            result = await self._deploy_to_region(region, release)
            
            if not result.success:
                # 立即回滚所有已部署的区域
                logger.error(f"Deployment failed in {region}, initiating global rollback")
                await self._global_rollback(deployed_regions, release)
                return GlobalDeployResult.failure(region, result.error)
            
            # 2. 区域健康验证
            health = await self._verify_region_health(
                region=region,
                duration_minutes=30,  # 30分钟观察期
                critical_metrics={
                    "payment_success_rate": ">= 0.9995",  # 99.95%成功率
                    "api_error_rate": "< 0.0001",          # 0.01%错误率
                    "p99_latency_ms": "< 300",             # P99 < 300ms
                }
            )
            
            if not health.passed:
                logger.warning(f"Health check failed in {region}: {health.reason}")
                await self._global_rollback(deployed_regions, release)
                return GlobalDeployResult.failure(region, health.reason)
            
            deployed_regions.append(region)
            logger.info(f"Successfully deployed to {region}, "
                       f"proceeding to next region ({len(deployed_regions)}/{len(self.DEPLOYMENT_ORDER)})")
        
        return GlobalDeployResult.success(deployed_regions)
    
    async def _verify_region_health(
        self,
        region: str,
        duration_minutes: int,
        critical_metrics: dict
    ) -> RegionHealthResult:
        """
        区域健康持续验证
        
        每分钟采集一次指标,连续duration_minutes分钟都满足阈值才算通过
        发现任何违规立即返回失败
        """
        
        violations = []
        
        for minute in range(duration_minutes):
            await asyncio.sleep(60)
            
            metrics = await self.metrics_client.get_region_metrics(region)
            
            for metric, threshold_expr in critical_metrics.items():
                value = getattr(metrics, metric, None)
                if value is None:
                    continue
                
                if not self._evaluate_threshold(value, threshold_expr):
                    violations.append({
                        "minute": minute + 1,
                        "metric": metric,
                        "value": value,
                        "threshold": threshold_expr
                    })
                    
                    # 对于支付核心指标,一次违规立即失败
                    if metric in ["payment_success_rate", "api_error_rate"]:
                        return RegionHealthResult(
                            passed=False,
                            reason=f"Critical metric violation at minute {minute+1}: "
                                  f"{metric}={value} (threshold: {threshold_expr})",
                            violations=violations
                        )
        
        return RegionHealthResult(
            passed=len(violations) == 0,
            reason="All metrics within thresholds" if not violations else f"{len(violations)} violations detected",
            violations=violations
        )

SLA保证机制

Stripe对外承诺99.9999%的可用性(“六个九”)。其实现依赖于:

六个九可用性的数学含义:
  每年允许停机时间 = 365天 × 24小时 × 3600秒 × 0.000001 = 31.5秒
  
保证机制(按重要性排序):
1. 多区域Active-Active架构:任何单个区域故障不影响整体
2. 渐进式发布:故障影响范围最大化限制
3. 自动回滚:P99故障恢复时间 < 2分钟
4. 熔断器:防止级联故障
5. 幂等API设计:请求可安全重试
6. 数据库读写分离+多副本:数据层高可用

3.6 实战:完整Agent骨架代码

3.6.1 项目结构设计

harness-agent/
├── README.md
├── pyproject.toml              # 依赖管理(Poetry)
├── Makefile                    # 开发命令
├── docker-compose.dev.yml      # 本地开发环境
│
├── src/
│   ├── agents/                 # Agent核心实现
│   │   ├── __init__.py
│   │   ├── base_agent.py       # BaseAgent基类
│   │   ├── orchestrator_agent.py
│   │   ├── risk_agent.py
│   │   ├── deployment_agent.py
│   │   ├── rollback_agent.py
│   │   └── learning_agent.py
│   │
│   ├── modules/                # 功能模块
│   │   ├── perception/
│   │   │   ├── __init__.py
│   │   │   ├── metrics_collector.py
│   │   │   ├── log_collector.py
│   │   │   └── event_collector.py
│   │   │
│   │   ├── cognition/
│   │   │   ├── __init__.py
│   │   │   ├── memory_manager.py
│   │   │   ├── knowledge_graph.py
│   │   │   └── context_builder.py
│   │   │
│   │   ├── decision/
│   │   │   ├── __init__.py
│   │   │   ├── rule_engine.py
│   │   │   ├── ml_model.py
│   │   │   ├── llm_reasoner.py
│   │   │   └── decision_router.py
│   │   │
│   │   ├── execution/
│   │   │   ├── __init__.py
│   │   │   ├── k8s_executor.py
│   │   │   ├── istio_executor.py
│   │   │   └── idempotent_executor.py
│   │   │
│   │   └── evolution/
│   │       ├── __init__.py
│   │       ├── online_learner.py
│   │       ├── offline_trainer.py
│   │       └── ab_tester.py
│   │
│   ├── models/                 # 数据模型
│   │   ├── deployment.py
│   │   ├── decision.py
│   │   ├── service_profile.py
│   │   └── metrics.py
│   │
│   ├── infrastructure/         # 基础设施客户端
│   │   ├── kafka_client.py
│   │   ├── redis_client.py
│   │   ├── postgres_client.py
│   │   └── k8s_client.py
│   │
│   └── api/                    # FastAPI入口
│       ├── main.py
│       ├── routers/
│       └── middleware/
│
├── tests/
│   ├── unit/
│   ├── integration/
│   └── e2e/
│
├── helm/                       # Kubernetes Helm Chart
│   └── harness-agent/
│       ├── Chart.yaml
│       ├── values.yaml
│       ├── values-production.yaml
│       └── templates/
│           ├── deployment.yaml
│           ├── service.yaml
│           ├── configmap.yaml
│           ├── secret.yaml
│           ├── hpa.yaml
│           ├── serviceaccount.yaml
│           └── rbac.yaml
│
└── scripts/
    ├── setup_dev.sh
    ├── run_tests.sh
    └── deploy.sh

3.6.2 核心Agent实现(完整代码)

# src/agents/base_agent.py
"""
BaseAgent:所有Agent的基类
实现了感知-决策-执行-学习的完整闭环
"""

import asyncio
import uuid
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timezone
import structlog

from ..modules.perception.metrics_collector import MetricsCollector
from ..modules.cognition.memory_manager import MemoryManager
from ..modules.decision.decision_router import DecisionRouter
from ..modules.execution.idempotent_executor import IdempotentExecutor
from ..modules.evolution.online_learner import OnlineLearner
from ..infrastructure.kafka_client import HarnessMessageBus

logger = structlog.get_logger(__name__)


@dataclass
class AgentConfig:
    """Agent通用配置"""
    agent_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    agent_name: str = "unnamed_agent"
    max_actions_per_minute: int = 60
    decision_timeout_seconds: float = 30.0
    execution_timeout_seconds: float = 300.0
    enable_learning: bool = True
    dry_run: bool = False
    log_level: str = "INFO"


@dataclass
class Observation:
    """Agent观测到的环境状态"""
    timestamp: float = field(default_factory=time.time)
    metrics: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict] = field(default_factory=list)
    context: Dict[str, Any] = field(default_factory=dict)
    raw_data: Dict[str, Any] = field(default_factory=dict)


@dataclass
class Action:
    """Agent决定执行的动作"""
    action_type: str
    params: Dict[str, Any] = field(default_factory=dict)
    confidence: float = 1.0
    reasoning: str = ""
    idempotency_key: str = field(default_factory=lambda: str(uuid.uuid4()))


@dataclass
class ActionResult:
    """动作执行结果"""
    success: bool
    action: Action
    outcome: Dict[str, Any] = field(default_factory=dict)
    error: Optional[str] = None
    duration_ms: float = 0.0
    timestamp: float = field(default_factory=time.time)


class BaseAgent(ABC):
    """
    Agent基类:定义所有Agent共同遵循的感知-决策-执行-学习闭环
    
    子类需要实现:
    - perceive(): 如何感知环境
    - should_act(): 是否需要行动(避免空转)
    - decide(): 决定执行什么动作
    - execute(): 执行具体动作
    - compute_reward(): 计算行动的奖励信号
    
    基类提供:
    - run_loop(): 主运行循环
    - 决策记录与学习
    - 并发控制与速率限制
    - 遥测与可观测性
    """
    
    def __init__(
        self,
        config: AgentConfig,
        memory: MemoryManager,
        message_bus: HarnessMessageBus,
        decision_router: DecisionRouter,
        executor: IdempotentExecutor,
        learner: Optional[OnlineLearner] = None
    ):
        self.config = config
        self.memory = memory
        self.message_bus = message_bus
        self.decision_router = decision_router
        self.executor = executor
        self.learner = learner
        
        # 速率限制
        self._action_timestamps: List[float] = []
        
        # 运行状态
        self._running = False
        self._loop_count = 0
        self._total_actions = 0
        self._total_errors = 0
        
        self.log = logger.bind(
            agent_id=config.agent_id,
            agent_name=config.agent_name
        )
    
    # ─────────────────────────────────────────────────────────
    # 抽象方法(子类必须实现)
    # ─────────────────────────────────────────────────────────
    
    @abstractmethod
    async def perceive(self) -> Observation:
        """
        感知当前环境状态
        
        子类应从各种数据源(Prometheus、日志、Kubernetes API等)
        采集数据并整合成Observation对象
        """
        pass
    
    @abstractmethod
    async def should_act(self, observation: Observation) -> bool:
        """
        判断是否需要行动
        
        避免Agent在环境正常时无谓地运行决策引擎,节省计算资源
        """
        pass
    
    @abstractmethod
    async def decide(self, observation: Observation) -> Optional[Action]:
        """
        基于观测制定行动决策
        
        调用decision_router选择合适的决策路径,
        返回None表示决定不行动
        """
        pass
    
    @abstractmethod
    async def execute(self, action: Action) -> ActionResult:
        """
        执行决定的动作
        
        通过executor确保幂等性,所有操作都应可安全重试
        """
        pass
    
    @abstractmethod
    async def compute_reward(
        self,
        observation: Observation,
        action: Action,
        result: ActionResult,
        next_observation: Observation
    ) -> float:
        """
        计算本次行动的奖励信号
        
        用于强化学习的在线更新
        正值:好的行动(指标改善)
        负值:坏的行动(指标恶化或引发故障)
        """
        pass
    
    # ─────────────────────────────────────────────────────────
    # 核心运行循环(基类实现)
    # ─────────────────────────────────────────────────────────
    
    async def run_loop(
        self,
        interval_seconds: float = 10.0,
        max_iterations: Optional[int] = None
    ) -> None:
        """
        Agent主运行循环
        
        循环结构:感知 → 判断 → 决策 → 执行 → 学习
        
        Args:
            interval_seconds: 循环间隔(秒)
            max_iterations: 最大迭代次数(None=无限运行)
        """
        
        self._running = True
        self.log.info("Agent starting", interval_seconds=interval_seconds)
        
        try:
            while self._running:
                if max_iterations and self._loop_count >= max_iterations:
                    break
                
                loop_start = time.time()
                self._loop_count += 1
                
                try:
                    await self._single_iteration()
                except Exception as e:
                    self._total_errors += 1
                    self.log.error(
                        "Iteration failed",
                        loop=self._loop_count,
                        error=str(e),
                        exc_info=True
                    )
                    # 不停止循环,继续下一次迭代
                
                # 保持固定间隔
                elapsed = time.time() - loop_start
                sleep_time = max(0, interval_seconds - elapsed)
                
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
        
        except asyncio.CancelledError:
            self.log.info("Agent loop cancelled")
        finally:
            self._running = False
            self.log.info(
                "Agent stopped",
                total_loops=self._loop_count,
                total_actions=self._total_actions,
                total_errors=self._total_errors
            )
    
    async def _single_iteration(self) -> None:
        """单次感知-决策-执行-学习迭代"""
        
        # 1. 感知
        observation = await asyncio.wait_for(
            self.perceive(),
            timeout=self.config.decision_timeout_seconds
        )
        
        # 将观测存入工作记忆
        await self.memory.store_observation(observation)
        
        # 2. 判断是否需要行动
        if not await self.should_act(observation):
            return
        
        # 速率限制检查
        if not self._check_rate_limit():
            self.log.warning("Rate limit reached, skipping action")
            return
        
        # 3. 决策
        action = await asyncio.wait_for(
            self.decide(observation),
            timeout=self.config.decision_timeout_seconds
        )
        
        if action is None:
            return  # Agent决定不行动
        
        self.log.info(
            "Action decided",
            action_type=action.action_type,
            confidence=action.confidence,
            reasoning=action.reasoning[:200]
        )
        
        # 4. 执行
        if self.config.dry_run:
            self.log.info("DRY RUN: Would execute", action=action.action_type)
            return
        
        result = await asyncio.wait_for(
            self.execute(action),
            timeout=self.config.execution_timeout_seconds
        )
        
        self._total_actions += 1
        
        if not result.success:
            self.log.error(
                "Action execution failed",
                action_type=action.action_type,
                error=result.error
            )
        
        # 5. 学习(异步,不阻塞主循环)
        if self.config.enable_learning and self.learner:
            next_observation = await self.perceive()
            reward = await self.compute_reward(
                observation, action, result, next_observation
            )
            
            asyncio.create_task(
                self._async_learn(observation, action, reward, next_observation)
            )
    
    async def _async_learn(
        self,
        observation: Observation,
        action: Action,
        reward: float,
        next_observation: Observation
    ) -> None:
        """异步学习:不阻塞主循环"""
        
        try:
            await self.learner.record_experience(
                state=observation.metrics,
                action=action.action_type,
                reward=reward,
                next_state=next_observation.metrics,
                metadata={
                    "agent_id": self.config.agent_id,
                    "loop_count": self._loop_count
                }
            )
            
            # 发布学习经验到Kafka,供离线训练使用
            await self.message_bus.publish_learning_experience(
                deployment_id=action.params.get("deployment_id", "unknown"),
                state_before=observation.metrics,
                action=action.action_type,
                reward=reward,
                state_after=next_observation.metrics
            )
        
        except Exception as e:
            self.log.warning("Learning failed", error=str(e))
    
    def _check_rate_limit(self) -> bool:
        """检查速率限制(每分钟最大行动次数)"""
        now = time.time()
        
        # 清除1分钟前的记录
        self._action_timestamps = [
            ts for ts in self._action_timestamps 
            if now - ts < 60
        ]
        
        if len(self._action_timestamps) >= self.config.max_actions_per_minute:
            return False
        
        self._action_timestamps.append(now)
        return True
    
    def stop(self) -> None:
        """停止Agent运行"""
        self._running = False
    
    def get_stats(self) -> dict:
        """获取运行统计"""
        return {
            "agent_id": self.config.agent_id,
            "agent_name": self.config.agent_name,
            "is_running": self._running,
            "total_loops": self._loop_count,
            "total_actions": self._total_actions,
            "total_errors": self._total_errors,
            "error_rate": self._total_errors / max(1, self._loop_count)
        }


# ─────────────────────────────────────────────────────────────────
# src/agents/deployment_agent.py
# 具体实现:部署监控Agent
# ─────────────────────────────────────────────────────────────────

class DeploymentMonitorAgent(BaseAgent):
    """
    部署监控Agent
    
    职责:实时监控进行中的部署,发现异常时触发回滚或告警
    """
    
    def __init__(
        self,
        config: AgentConfig,
        deployment_id: str,
        service_name: str,
        slo_targets: Dict[str, float],
        k8s_client,
        prometheus_collector: MetricsCollector,
        **kwargs
    ):
        super().__init__(config, **kwargs)
        
        self.deployment_id = deployment_id
        self.service_name = service_name
        self.slo_targets = slo_targets
        self.k8s_client = k8s_client
        self.prometheus_collector = prometheus_collector
        
        # 异常计数(连续N次异常才触发行动,避免误判)
        self._anomaly_streak = 0
        self._anomaly_threshold = 3
        
        # 基准指标(部署前快照)
        self._baseline_metrics: Optional[Dict] = None
    
    async def perceive(self) -> Observation:
        """采集当前服务的所有指标"""
        
        # 并行采集多源数据
        metrics, k8s_state, events = await asyncio.gather(
            self.prometheus_collector.collect_service_metrics(self.service_name),
            self._get_k8s_state(),
            self._get_recent_events(),
            return_exceptions=True
        )
        
        return Observation(
            metrics={
                "error_rate": getattr(metrics, 'error_rate', 0.0),
                "p99_latency_ms": getattr(metrics, 'p99_latency_ms', 0.0),
                "request_rate": getattr(metrics, 'request_rate', 0.0),
                "cpu_usage_pct": getattr(metrics, 'cpu_usage_pct', 0.0),
                "memory_usage_pct": getattr(metrics, 'memory_usage_pct', 0.0),
            },
            events=events if isinstance(events, list) else [],
            context={
                "deployment_id": self.deployment_id,
                "service_name": self.service_name,
                "k8s_state": k8s_state if not isinstance(k8s_state, Exception) else {},
                "slo_targets": self.slo_targets,
                "baseline": self._baseline_metrics or {}
            }
        )
    
    async def should_act(self, observation: Observation) -> bool:
        """只有在发现潜在问题时才触发决策"""
        
        metrics = observation.metrics
        
        # 快速路径:检查明显的SLO违规
        if metrics.get("error_rate", 0) > self.slo_targets.get("error_rate", 0.01) * 2:
            return True  # 错误率超过SLO目标2倍
        
        if metrics.get("p99_latency_ms", 0) > self.slo_targets.get("p99_latency_ms", 500) * 1.5:
            return True  # 延迟超过SLO目标1.5倍
        
        # 检查Kubernetes事件(如Pod崩溃)
        for event in observation.events:
            if event.get("reason") in ["CrashLoopBackOff", "OOMKilled", "BackOff"]:
                return True
        
        return False
    
    async def decide(self, observation: Observation) -> Optional[Action]:
        """使用三路径决策路由器制定行动"""
        
        context = {
            "metrics": observation.metrics,
            "deployment": {
                "id": self.deployment_id,
                "service": self.service_name,
                "status": "in_progress"
            },
            "slo_targets": self.slo_targets,
            "baseline": self._baseline_metrics,
            "k8s": observation.context.get("k8s_state", {})
        }
        
        decision = await self.decision_router.route_decision(context)
        
        if decision.action == "rollback":
            return Action(
                action_type="rollback",
                params={
                    "deployment_id": self.deployment_id,
                    "reason": decision.reasoning,
                    "urgency": "high" if observation.metrics.get("error_rate", 0) > 0.05 else "normal"
                },
                confidence=decision.confidence,
                reasoning=decision.reasoning
            )
        
        elif decision.action == "alert_oncall":
            return Action(
                action_type="alert",
                params={
                    "deployment_id": self.deployment_id,
                    "severity": "P1",
                    "message": decision.reasoning,
                    "metrics": observation.metrics
                },
                confidence=decision.confidence,
                reasoning=decision.reasoning
            )
        
        return None  # 不行动
    
    async def execute(self, action: Action) -> ActionResult:
        """执行回滚或告警操作"""
        
        start_time = time.time()
        
        try:
            if action.action_type == "rollback":
                result = await self.executor.execute(
                    operation=RollbackOperation(
                        deployment_id=action.params["deployment_id"],
                        k8s_client=self.k8s_client,
                        urgency=action.params.get("urgency", "normal")
                    ),
                    idempotency_key=f"rollback:{action.params['deployment_id']}"
                )
                
                return ActionResult(
                    success=result.success,
                    action=action,
                    outcome={"rollback_result": result.to_dict()},
                    duration_ms=(time.time() - start_time) * 1000
                )
            
            elif action.action_type == "alert":
                await self.message_bus.publish_deployment_event(
                    deployment_id=action.params["deployment_id"],
                    event_type="alert_triggered",
                    payload=action.params
                )
                
                return ActionResult(
                    success=True,
                    action=action,
                    outcome={"alert_sent": True},
                    duration_ms=(time.time() - start_time) * 1000
                )
        
        except Exception as e:
            return ActionResult(
                success=False,
                action=action,
                error=str(e),
                duration_ms=(time.time() - start_time) * 1000
            )
    
    async def compute_reward(
        self,
        observation: Observation,
        action: Action,
        result: ActionResult,
        next_observation: Observation
    ) -> float:
        """计算行动奖励"""
        
        if not result.success:
            return -1.0  # 执行失败:负奖励
        
        # 对比行动前后的错误率
        pre_error_rate = observation.metrics.get("error_rate", 0)
        post_error_rate = next_observation.metrics.get("error_rate", 0)
        
        if action.action_type == "rollback":
            if post_error_rate < pre_error_rate:
                # 回滚有效地降低了错误率
                improvement = (pre_error_rate - post_error_rate) / pre_error_rate
                return improvement * 5  # 放大奖励信号
            else:
                return -0.5  # 回滚没有改善,轻微惩罚
        
        return 0.0
    
    async def _get_k8s_state(self) -> dict:
        """获取Kubernetes部署状态"""
        try:
            deployment = await self.k8s_client.get_deployment(self.service_name)
            pods = await self.k8s_client.get_pods(
                label_selector=f"app={self.service_name}"
            )
            
            return {
                "desired_replicas": deployment.spec.replicas,
                "ready_replicas": deployment.status.ready_replicas or 0,
                "pod_statuses": [
                    {
                        "name": pod.metadata.name,
                        "phase": pod.status.phase,
                        "ready": all(c.ready for c in (pod.status.conditions or []))
                    }
                    for pod in pods.items
                ]
            }
        except Exception as e:
            self.log.warning("Failed to get k8s state", error=str(e))
            return {}
    
    async def _get_recent_events(self) -> list:
        """获取最近5分钟的Kubernetes事件"""
        try:
            events = await self.k8s_client.get_events(
                field_selector=f"involvedObject.name={self.service_name}",
                since_minutes=5
            )
            return [
                {
                    "reason": e.reason,
                    "message": e.message,
                    "count": e.count,
                    "last_time": e.last_timestamp.isoformat() if e.last_timestamp else None
                }
                for e in events.items
            ]
        except Exception:
            return []
    
    async def set_baseline(self) -> None:
        """设置部署前的基准指标"""
        obs = await self.perceive()
        self._baseline_metrics = obs.metrics.copy()
        self.log.info("Baseline metrics captured", baseline=self._baseline_metrics)

3.6.3 Kubernetes部署配置(完整Helm Chart)

# helm/harness-agent/Chart.yaml
apiVersion: v2
name: harness-agent
description: Harness AI Agent Platform
type: application
version: 0.1.0
appVersion: "1.0.0"
dependencies:
  - name: postgresql
    version: "12.x.x"
    repository: "https://charts.bitnami.com/bitnami"
    condition: postgresql.enabled
  - name: redis
    version: "17.x.x"
    repository: "https://charts.bitnami.com/bitnami"
    condition: redis.enabled
# helm/harness-agent/values.yaml
# Default values for harness-agent

global:
  imageRegistry: ""
  imagePullSecrets: []

# ─────────────────────────────────────────────
# Orchestrator服务
# ─────────────────────────────────────────────
orchestrator:
  enabled: true
  replicaCount: 2
  
  image:
    repository: harness/orchestrator
    pullPolicy: IfNotPresent
    tag: "latest"
  
  service:
    type: ClusterIP
    port: 8080
  
  resources:
    requests:
      cpu: 500m
      memory: 512Mi
    limits:
      cpu: 2000m
      memory: 2Gi
  
  autoscaling:
    enabled: true
    minReplicas: 2
    maxReplicas: 10
    targetCPUUtilizationPercentage: 70
    targetMemoryUtilizationPercentage: 80
  
  config:
    maxConcurrentDeployments: 10
    riskThresholdForApproval: 0.7
    canaryObservationMinutes: 5
    enableAutoRollback: true
    logLevel: "INFO"
  
  env:
    - name: DATABASE_URL
      valueFrom:
        secretKeyRef:
          name: harness-secrets
          key: database-url
    - name: REDIS_URL
      valueFrom:
        secretKeyRef:
          name: harness-secrets
          key: redis-url
    - name: KAFKA_BROKERS
      value: "kafka:9092"
    - name: ANTHROPIC_API_KEY
      valueFrom:
        secretKeyRef:
          name: harness-secrets
          key: anthropic-api-key

# ─────────────────────────────────────────────
# Analyzer服务
# ─────────────────────────────────────────────
analyzer:
  enabled: true
  replicaCount: 2
  
  image:
    repository: harness/analyzer
    tag: "latest"
  
  resources:
    requests:
      cpu: 1000m
      memory: 1Gi
    limits:
      cpu: 4000m
      memory: 4Gi

# ─────────────────────────────────────────────
# Learner服务(ML训练)
# ─────────────────────────────────────────────
learner:
  enabled: true
  replicaCount: 1  # 训练服务通常单实例
  
  image:
    repository: harness/learner
    tag: "latest"
  
  resources:
    requests:
      cpu: 2000m
      memory: 4Gi
    limits:
      cpu: 8000m
      memory: 16Gi
  
  config:
    trainingSchedule: "0 2 * * *"  # 每天凌晨2点训练
    batchSize: 256
    learningRate: 0.001
    modelOutputPath: "/models"
  
  persistence:
    enabled: true
    size: 50Gi
    storageClass: fast-ssd

# ─────────────────────────────────────────────
# 依赖服务配置
# ─────────────────────────────────────────────
postgresql:
  enabled: true
  auth:
    postgresPassword: ""  # 必须在values-production.yaml中覆盖
    database: harness
  primary:
    persistence:
      size: 100Gi
      storageClass: fast-ssd

redis:
  enabled: true
  auth:
    password: ""  # 必须在values-production.yaml中覆盖
  replica:
    replicaCount: 2

# ─────────────────────────────────────────────
# 监控与可观测性
# ─────────────────────────────────────────────
monitoring:
  enabled: true
  serviceMonitor:
    enabled: true
    namespace: harness-monitoring
  
  dashboards:
    enabled: true
    grafana:
      namespace: harness-monitoring

# 网络策略
networkPolicy:
  enabled: true
  ingress:
    - from:
      - namespaceSelector:
          matchLabels:
            name: harness-system
# helm/harness-agent/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "harness-agent.fullname" . }}-orchestrator
  namespace: {{ .Release.Namespace }}
  labels:
    {{- include "harness-agent.labels" . | nindent 4 }}
    app.kubernetes.io/component: orchestrator
spec:
  replicas: {{ .Values.orchestrator.replicaCount }}
  selector:
    matchLabels:
      {{- include "harness-agent.selectorLabels" . | nindent 6 }}
      app.kubernetes.io/component: orchestrator
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0  # 零停机更新
  template:
    metadata:
      labels:
        {{- include "harness-agent.selectorLabels" . | nindent 8 }}
        app.kubernetes.io/component: orchestrator
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9090"
        prometheus.io/path: "/metrics"
    spec:
      serviceAccountName: {{ include "harness-agent.serviceAccountName" . }}
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      
      # 优雅关闭配置
      terminationGracePeriodSeconds: 60
      
      containers:
      - name: orchestrator
        image: "{{ .Values.orchestrator.image.repository }}:{{ .Values.orchestrator.image.tag }}"
        imagePullPolicy: {{ .Values.orchestrator.image.pullPolicy }}
        
        ports:
        - name: http
          containerPort: 8080
          protocol: TCP
        - name: metrics
          containerPort: 9090
          protocol: TCP
        
        env:
        {{- toYaml .Values.orchestrator.env | nindent 8 }}
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        
        envFrom:
        - configMapRef:
            name: {{ include "harness-agent.fullname" . }}-config
        
        # 健康检查
        livenessProbe:
          httpGet:
            path: /api/v1/health/live
            port: http
          initialDelaySeconds: 30
          periodSeconds: 10
          failureThreshold: 3
        
        readinessProbe:
          httpGet:
            path: /api/v1/health/ready
            port: http
          initialDelaySeconds: 10
          periodSeconds: 5
          failureThreshold: 3
        
        # 优雅关闭
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 10"]
        
        resources:
          {{- toYaml .Values.orchestrator.resources | nindent 10 }}
        
        volumeMounts:
        - name: config
          mountPath: /app/config
          readOnly: true
        - name: tmp
          mountPath: /tmp
      
      volumes:
      - name: config
        configMap:
          name: {{ include "harness-agent.fullname" . }}-config
      - name: tmp
        emptyDir: {}
      
      # Pod反亲和性:确保不同副本分布到不同节点
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app.kubernetes.io/component
                  operator: In
                  values:
                  - orchestrator
              topologyKey: kubernetes.io/hostname
      
      # 容忍度:允许调度到有特定标签的节点
      tolerations:
      - key: "harness-workload"
        operator: "Exists"
        effect: "NoSchedule"
# helm/harness-agent/templates/hpa.yaml
{{- if .Values.orchestrator.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: {{ include "harness-agent.fullname" . }}-orchestrator
  namespace: {{ .Release.Namespace }}
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {{ include "harness-agent.fullname" . }}-orchestrator
  minReplicas: {{ .Values.orchestrator.autoscaling.minReplicas }}
  maxReplicas: {{ .Values.orchestrator.autoscaling.maxReplicas }}
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: {{ .Values.orchestrator.autoscaling.targetCPUUtilizationPercentage }}
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: {{ .Values.orchestrator.autoscaling.targetMemoryUtilizationPercentage }}
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Pods
        value: 2
        periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300  # 5分钟稳定窗口,避免频繁缩容
      policies:
      - type: Pods
        value: 1
        periodSeconds: 120
{{- end }}

3.6.4 本地调试与测试

单元测试框架

# tests/unit/test_deployment_state_machine.py
"""
部署状态机单元测试
验证所有合法的状态转换和非法转换的异常处理
"""

import pytest
from src.agents.core.state_machine import (
    DeploymentStateMachine, 
    DeploymentState, 
    DeploymentEvent,
    InvalidStateTransitionError
)


@pytest.fixture
def mock_deployment():
    """创建测试用的部署对象"""
    class MockDeployment:
        id = "test-deployment-001"
        status = "pending"
    return MockDeployment()


class TestDeploymentStateMachine:
    
    def test_initial_state_is_pending(self, mock_deployment):
        sm = DeploymentStateMachine(mock_deployment)
        assert sm.current_state == DeploymentState.PENDING
    
    def test_valid_transition_pending_to_risk_assessment(self, mock_deployment):
        sm = DeploymentStateMachine(mock_deployment)
        new_state = sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        assert new_state == DeploymentState.RISK_ASSESSMENT
    
    def test_valid_full_happy_path(self, mock_deployment):
        """测试完整的成功部署路径"""
        sm = DeploymentStateMachine(mock_deployment)
        
        sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        sm.transition(DeploymentEvent.RISK_ACCEPTABLE)
        sm.transition(DeploymentEvent.START_VERIFICATION)
        sm.transition(DeploymentEvent.COMPLETE)
        
        assert sm.current_state == DeploymentState.COMPLETED
        assert len(sm.history) == 4
    
    def test_valid_rollback_path(self, mock_deployment):
        """测试回滚路径"""
        sm = DeploymentStateMachine(mock_deployment)
        
        sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        sm.transition(DeploymentEvent.RISK_ACCEPTABLE)
        sm.transition(DeploymentEvent.START_VERIFICATION)
        sm.transition(DeploymentEvent.VERIFICATION_FAILED)
        sm.transition(DeploymentEvent.ROLLBACK_COMPLETED)
        
        assert sm.current_state == DeploymentState.ROLLED_BACK
    
    def test_high_risk_requires_approval(self, mock_deployment):
        """测试高风险部署需要审批"""
        sm = DeploymentStateMachine(mock_deployment)
        
        sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        sm.transition(DeploymentEvent.RISK_HIGH)
        
        assert sm.current_state == DeploymentState.AWAITING_APPROVAL
    
    def test_invalid_transition_raises_error(self, mock_deployment):
        """测试非法状态转换应抛出异常"""
        sm = DeploymentStateMachine(mock_deployment)
        
        with pytest.raises(InvalidStateTransitionError):
            # PENDING状态不能直接触发COMPLETE事件
            sm.transition(DeploymentEvent.COMPLETE)
    
    def test_cancel_from_in_progress_triggers_rollback(self, mock_deployment):
        """测试进行中的部署取消后应进入回滚状态"""
        sm = DeploymentStateMachine(mock_deployment)
        
        sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        sm.transition(DeploymentEvent.RISK_ACCEPTABLE)
        sm.transition(DeploymentEvent.CANCEL)
        
        assert sm.current_state == DeploymentState.ROLLING_BACK
    
    def test_available_events_from_pending(self, mock_deployment):
        """测试PENDING状态下的可用事件"""
        sm = DeploymentStateMachine(mock_deployment)
        
        available = sm.available_events()
        
        assert DeploymentEvent.START_RISK_ASSESSMENT in available
        assert DeploymentEvent.CANCEL in available
        assert DeploymentEvent.COMPLETE not in available
    
    def test_history_records_all_transitions(self, mock_deployment):
        """测试状态历史的完整记录"""
        sm = DeploymentStateMachine(mock_deployment)
        
        sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
        sm.transition(DeploymentEvent.RISK_HIGH)
        sm.transition(DeploymentEvent.APPROVE)
        
        assert len(sm.history) == 3
        assert sm.history[0]["from"] == "pending"
        assert sm.history[0]["to"] == "risk_assessment"


# tests/unit/test_rule_engine.py
import pytest
from src.modules.decision.rule_engine import RuleEngine, EXAMPLE_RULES_YAML


@pytest.fixture
def rule_engine():
    engine = RuleEngine()
    engine.load_from_yaml(EXAMPLE_RULES_YAML)
    return engine


class TestRuleEngine:
    
    def test_auto_rollback_on_high_error_rate(self, rule_engine):
        """验证高错误率触发自动回滚"""
        
        context = {
            "metrics": {"error_rate": 0.08},  # 8% > 5% threshold
            "deployment": {
                "status": "in_progress",
                "environment": "production"
            }
        }
        
        result = rule_engine.match(context)
        
        assert result.matched is True
        assert result.action == "rollback"
    
    def test_no_rollback_when_error_rate_normal(self, rule_engine):
        """验证正常错误率不触发回滚"""
        
        context = {
            "metrics": {"error_rate": 0.001},  # 0.1% < 5% threshold
            "deployment": {
                "status": "in_progress",
                "environment": "production"
            }
        }
        
        result = rule_engine.match(context)
        
        # 可能匹配其他规则,但不应是rollback
        if result.matched:
            assert result.action != "rollback"
    
    def test_rule_priority_ordering(self, rule_engine):
        """验证规则优先级:高优先级规则先匹配"""
        
        # 同时满足多个规则的场景
        context = {
            "metrics": {
                "error_rate": 0.1,  # 触发auto_rollback(优先级1)
                "p99_latency_ms": 1500  # 触发latency_rollback(优先级2)
            },
            "deployment": {
                "status": "in_progress",
                "environment": "production"
            }
        }
        
        result = rule_engine.match(context)
        
        assert result.matched is True
        # 应该匹配优先级1的error_rate规则
        assert result.triggered_rule.name == "auto_rollback_on_high_error_rate"

集成测试环境搭建

# tests/integration/conftest.py
"""
集成测试配置:使用testcontainers启动真实依赖服务
"""

import pytest
import asyncio
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer

from src.infrastructure.postgres_client import PostgresClient
from src.infrastructure.redis_client import RedisClient
from src.infrastructure.kafka_client import HarnessMessageBus


@pytest.fixture(scope="session")
def event_loop():
    """创建会话级别的事件循环"""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="session")
async def postgres_container():
    """启动测试用PostgreSQL容器"""
    with PostgresContainer("postgres:15") as postgres:
        yield postgres


@pytest.fixture(scope="session")
async def redis_container():
    """启动测试用Redis容器"""
    with RedisContainer("redis:7") as redis:
        yield redis


@pytest.fixture(scope="session")
async def db_client(postgres_container):
    """创建数据库连接"""
    client = PostgresClient(
        dsn=postgres_container.get_connection_url()
    )
    await client.connect()
    
    # 运行Schema迁移
    await client.run_migrations("migrations/")
    
    yield client
    
    await client.disconnect()


@pytest.fixture(autouse=True)
async def clean_db(db_client):
    """每个测试前清理数据库"""
    yield
    # 测试后回滚
    await db_client.execute("TRUNCATE deployments, decisions, risk_assessments CASCADE")
# tests/integration/test_deployment_pipeline.py
"""
端到端集成测试:完整部署流程
"""

import pytest
from unittest.mock import AsyncMock, patch

from src.agents.orchestrator_agent import DeploymentOrchestrator, OrchestratorConfig
from src.models.deployment import DeploymentRequest, DeploymentStatus


@pytest.mark.integration
class TestDeploymentPipeline:
    
    @pytest.mark.asyncio
    async def test_successful_low_risk_deployment(
        self,
        orchestrator: DeploymentOrchestrator
    ):
        """测试低风险部署的完整成功流程"""
        
        request = DeploymentRequest(
            service_name="test-service",
            target_version="v1.2.3",
            environment="production",
            triggered_by="test_user"
        )
        
        # 模拟低风险评估(分数0.2)
        with patch.object(
            orchestrator.risk_assessor, 
            'assess',
            return_value=MockRiskReport(overall_score=0.2)
        ):
            with patch.object(
                orchestrator.monitor,
                'observe_and_validate',
                return_value=MockVerifyResult(passed=True)
            ):
                deployment = await orchestrator.submit_deployment(request)
                
                # 等待异步完成(最多30秒)
                for _ in range(30):
                    await asyncio.sleep(1)
                    updated = await orchestrator.deployment_repo.get(deployment.id)
                    if updated.status in [
                        DeploymentStatus.COMPLETED, 
                        DeploymentStatus.FAILED,
                        DeploymentStatus.ROLLED_BACK
                    ]:
                        break
                
                final_status = await orchestrator.deployment_repo.get(deployment.id)
                
                assert final_status.status == DeploymentStatus.COMPLETED
    
    @pytest.mark.asyncio
    async def test_auto_rollback_on_verification_failure(
        self,
        orchestrator: DeploymentOrchestrator
    ):
        """测试验证失败时的自动回滚"""
        
        request = DeploymentRequest(
            service_name="unstable-service",
            target_version="v2.0.0-broken",
            environment="production",
            triggered_by="ci_system"
        )
        
        with patch.object(
            orchestrator.risk_assessor, 
            'assess',
            return_value=MockRiskReport(overall_score=0.3)
        ):
            # 模拟验证失败(指标恶化)
            with patch.object(
                orchestrator.monitor,
                'observe_and_validate',
                return_value=MockVerifyResult(
                    passed=False,
                    failed_checks=["error_rate: 0.08 > 0.01"]
                )
            ):
                deployment = await orchestrator.submit_deployment(request)
                
                # 等待处理完成
                await asyncio.sleep(5)
                
                final_status = await orchestrator.deployment_repo.get(deployment.id)
                
                assert final_status.status == DeploymentStatus.ROLLED_BACK

3.7 本章小结

本章从Agent的本质出发,构建了一个完整的自进化超级智能体架构蓝图。以下是核心要点回顾:

架构核心:五层设计

感知层(Perception Layer)
  ↓  采集+处理多源遥测数据
认知层(Cognition Layer)
  ↓  四级记忆管理+知识图谱
决策层(Decision Layer)
  ↓  三路径决策路由(规则/ML/LLM)
执行层(Execution Layer)
  ↓  幂等执行+Saga事务
进化层(Evolution Layer)
     在线学习+离线训练+A/B测试

关键设计原则

原则 实现机制 为什么重要
感知先于决策 多源数据融合+质量保障 错误的输入必然导致错误的决策
分层决策路由 规则/ML/LLM三路径 不同场景需要不同的决策速度和精度
幂等执行 操作唯一ID+状态存储 分布式环境中保证操作的安全可重试性
持续学习 经验缓冲+增量更新 Agent必须随环境变化而进化
人类在环 高风险审批+反馈收集 AI决策需要人类监督和知识注入

三大案例的核心启示

  • Airbnb:用ML风险评分模型替代"一刀切"的发布策略,故障率降低60%
  • Shopify:预测性扩容(提前3分钟)比被动响应更有效
  • Stripe:形式化验证+渐进式多区域发布是金融级可靠性的必要条件

下一章预告

第四章将深入感知层的实现细节,包括:

  • 如何设计一个健壮的遥测数据管道
  • 异常检测算法的工程化实现
  • 如何从海量指标中提取有意义的"信号"而非"噪声"

本章代码资源

本章所有代码均已整理到配套代码仓库:

git clone https://github.com/harness-book/chapter3-agent-skeleton
cd chapter3-agent-skeleton
make setup     # 安装依赖
make test      # 运行测试
make dev       # 启动本地开发环境

思考题

  1. 在设计奖励函数时,如果业务目标之间存在冲突(如降低成本 vs 提升可用性),你会如何设计权重?

  2. Saga模式在某些情况下无法完全补偿(如无法"取消"一封已发送的邮件通知),你会如何处理这类不可逆操作?

  3. 假设你的AI Agent连续5次做出错误的回滚决策,你会如何设计熔断机制,防止Agent继续误操作?

  4. 对于Airbnb的风险评分模型,如果模型开始漂移(生产环境的数据分布与训练数据不同),你会设计什么样的监控机制来检测这个问题?


本章完

更多推荐