第三章 自进化超级智能体——系统架构设计与蓝图《从0到1实现一个企业级harness平台》
第三章 自进化超级智能体——系统架构设计与蓝图
“架构不是关于技术选型,而是关于如何组织复杂性,使系统能够在不确定的环境中持续演进。”
—— Martin Fowler,《企业应用架构模式》
章节导读
本章是全书的核心技术章节。我们将从Agent的本质出发,逐层剖析自进化超级智能体的完整架构蓝图。不同于停留在概念层面的讨论,本章将提供:
- 可实施的架构设计:每个设计决策都附带权衡分析和替代方案
- 完整的代码实现:超过500行Python核心框架代码
- 真实的工业案例:Airbnb、Shopify、Stripe的实战经验
- 立即可用的基础设施配置:完整的Kubernetes Helm Chart
读完本章,你将能够:
- 理解Agent与传统自动化的本质区别
- 掌握五层架构(感知-认知-决策-执行-进化)的设计方法
- 搭建一个可运行的基础Agent框架
- 针对真实业务场景进行架构裁剪
3.1 Agent的本质:从程序到自主体
3.1.1 Agent的定义与分类
什么是Agent?
在人工智能领域,Agent(智能体)的定义经历了数十年的演变。Russell和Norvig在《人工智能:一种现代方法》中给出了经典定义:
智能体是任何能够感知其环境并对该环境采取行动的事物。
这个定义看似简单,却包含了三个关键要素:感知(Perceive)、决策(Decide)、行动(Act)。
在软件工程领域,我们对Agent的理解需要更加具体和实用。一个软件交付Agent必须能够:
- 感知环境状态:读取Prometheus指标、解析日志流、理解系统拓扑
- 形成内部表征:将原始数据转化为结构化的情境理解
- 制定行动计划:基于当前状态和目标,选择最优行动序列
- 执行并观察:实施行动并监控其效果
- 从经验中学习:根据执行结果更新自身的知识和策略
四种基础Agent类型
根据内部架构的复杂度,我们可以将Agent分为四种基本类型:
┌─────────────────────────────────────────────────────────────────┐
│ Agent类型分类 │
├────────────────────┬────────────────────────────────────────────┤
│ 类型 │ 特征 │
├────────────────────┼────────────────────────────────────────────┤
│ 简单反射Agent │ 条件→动作规则,无内部状态 │
│ 基于模型的Agent │ 维护世界模型,理解部分可观测环境 │
│ 目标驱动Agent │ 具备目标表征,可规划多步行动序列 │
│ 学习型Agent │ 通过经验改进行为,适应新环境 │
└────────────────────┴────────────────────────────────────────────┘
简单反射Agent(Simple Reflex Agent)
这是最基础的Agent形式。其决策逻辑完全由一组条件-动作规则(Condition-Action Rules)驱动:
# 简单反射Agent示例:基于阈值的告警响应
class SimpleReflexAgent:
def __init__(self):
self.rules = [
{"condition": lambda m: m["cpu_usage"] > 90,
"action": "scale_up"},
{"condition": lambda m: m["error_rate"] > 0.05,
"action": "rollback"},
{"condition": lambda m: m["latency_p99"] > 500,
"action": "alert_oncall"},
]
def act(self, metrics: dict) -> str:
for rule in self.rules:
if rule["condition"](metrics):
return rule["action"]
return "no_action"
优点:实现简单,响应快速,行为可预测。
缺点:无法处理规则冲突,不理解行动的后果,在复杂场景下失效。
在实际的运维场景中,简单反射Agent就是我们熟悉的"PagerDuty告警规则"——CPU超过90%就告警,这类规则存在大量误报。
基于模型的Agent(Model-Based Agent)
基于模型的Agent维护一个关于世界的内部表征(Internal State),用于处理环境的部分可观测性:
class ModelBasedAgent:
def __init__(self):
self.world_model = {
"services": {}, # 服务状态快照
"deployments": [], # 进行中的部署
"dependencies": {}, # 服务依赖图
}
def update_model(self, observation: dict):
"""根据新观测更新世界模型"""
service_id = observation["service_id"]
self.world_model["services"][service_id] = {
"health": observation["health_score"],
"last_deploy": observation["last_deploy_time"],
"error_rate": observation["error_rate"],
"updated_at": time.time()
}
def act(self, current_observation: dict) -> str:
self.update_model(current_observation)
# 基于完整世界模型做决策,而非仅当前观测
return self._decide_based_on_model()
def _decide_based_on_model(self) -> str:
# 检查服务间依赖关系,避免级联故障
for service_id, state in self.world_model["services"].items():
if state["error_rate"] > 0.05:
# 检查是否有依赖此服务的上游服务
upstreams = self.world_model["dependencies"].get(service_id, [])
if upstreams:
return f"quarantine_{service_id}"
return "normal"
目标驱动Agent(Goal-Based Agent)
目标驱动Agent不仅知道"现在的状态",还明确知道"期望的状态",能够规划达成目标的行动序列:
class GoalBasedAgent:
def __init__(self, goal_slo: dict):
self.goal_slo = goal_slo # 目标服务级别目标
# 例如: {"error_rate": 0.001, "p99_latency_ms": 200, "availability": 0.9999}
def plan(self, current_state: dict) -> list:
"""生成达成目标的行动序列"""
plan = []
# 计算与目标的差距
gaps = self._compute_gaps(current_state)
# 按优先级排序问题
prioritized_gaps = sorted(gaps, key=lambda x: x["severity"], reverse=True)
for gap in prioritized_gaps:
actions = self._generate_actions_for_gap(gap)
plan.extend(actions)
return plan
def _compute_gaps(self, state: dict) -> list:
gaps = []
for metric, target in self.goal_slo.items():
current = state.get(metric, 0)
if metric == "error_rate" and current > target:
gaps.append({
"metric": metric,
"current": current,
"target": target,
"severity": (current - target) / target
})
return gaps
学习型Agent(Learning Agent)
学习型Agent是最复杂也是最强大的形式。它能够从历史经验中提炼规律,不断改进自身的决策质量:
class LearningAgent:
def __init__(self):
self.experience_buffer = [] # 经验回放缓冲区
self.policy_model = None # 策略模型(可以是ML模型或LLM)
self.performance_metrics = [] # 性能历史
def act(self, observation: dict) -> str:
action = self.policy_model.predict(observation)
return action
def learn(self, observation: dict, action: str, reward: float, next_obs: dict):
"""记录经验并更新策略"""
experience = {
"observation": observation,
"action": action,
"reward": reward,
"next_observation": next_obs,
"timestamp": time.time()
}
self.experience_buffer.append(experience)
# 当积累足够经验后,触发模型更新
if len(self.experience_buffer) >= self.batch_size:
self._update_policy()
def _update_policy(self):
"""从经验缓冲区学习,更新策略模型"""
batch = random.sample(self.experience_buffer, self.batch_size)
# 使用强化学习算法(如PPO、SAC)更新策略
self.policy_model.fit(batch)
软件交付Agent的核心特征
在软件交付场景中,Agent需要具备以下特征,这些特征将其与简单自动化脚本根本区分开来:
| 特征维度 | 传统自动化脚本 | 软件交付Agent |
|---|---|---|
| 决策方式 | 预定义规则,无例外处理 | 上下文感知,动态决策 |
| 失败处理 | 失败即停止 | 自适应重试,备选方案 |
| 学习能力 | 无,每次执行完全独立 | 从历史学习,持续优化 |
| 目标理解 | 执行步骤,不理解目的 | 理解业务目标,灵活路径 |
| 环境感知 | 仅感知触发条件 | 全面感知多维环境状态 |
| 协作能力 | 独立运行 | 可与其他Agent协作 |
| 解释能力 | 日志记录执行步骤 | 能解释决策理由 |
| 边界识别 | 按配置执行 | 识别超出能力的场景并升级 |
Agent成熟度模型(Level 0-5)
与驾驶自动化的SAE Level类似,我们定义软件交付Agent的成熟度等级:
Level 0 - 完全手动(Full Manual)
• 所有操作需要人工执行
• 工具辅助但不自动化
• 例:Confluence文档+人工操作Kubectl
Level 1 - 辅助自动化(Assisted Automation)
• 单一场景的脚本自动化
• 人工触发,机器执行
• 例:Jenkins Pipeline,GitHub Actions
Level 2 - 部分自动化(Partial Automation)
• 多场景自动化,规则驱动
• 人工监督,关键节点确认
• 例:带审批门控的CD流水线
Level 3 - 条件自动化(Conditional Automation)
• 环境感知,动态决策
• 部分场景全自动,复杂场景人工干预
• 例:带智能回滚的Canary发布系统
Level 4 - 高度自动化(High Automation)
• 大多数场景全自动,包括异常处理
• 人工仅在定义边界外介入
• 例:具备学习能力的自适应发布系统
Level 5 - 完全自动化(Full Automation)
• 所有场景全自动
• 自我优化,持续进化
• 例:本书的自进化超级智能体
当前大多数企业处于Level 1-2,领先企业(Netflix、Google、Airbnb)正在探索Level 3-4,Level 5是我们本书的目标架构。
3.1.2 软件交付场景的Agent建模
要构建一个有效的软件交付Agent,我们需要首先对其运行的环境和任务空间进行精确建模。这一步骤是整个架构设计的基础,直接决定了系统的能力边界。
环境建模:定义Agent的世界
在强化学习理论中,我们用马尔可夫决策过程(MDP)来描述Agent与环境的交互:
MDP = (S, A, P, R, γ)
S - 状态空间(State Space)
A - 行动空间(Action Space)
P - 转移概率(Transition Probability)
R - 奖励函数(Reward Function)
γ - 折扣因子(Discount Factor)
对于软件交付Agent,这个抽象框架具体化为:
状态空间(S)的定义
软件交付的状态空间可以从三个维度来刻画:
@dataclass
class DeliveryEnvironmentState:
"""软件交付环境的完整状态表征"""
# 维度1:CI/CD流水线状态
pipeline_state: PipelineState = field(default_factory=PipelineState)
# 维度2:生产环境状态
production_state: ProductionState = field(default_factory=ProductionState)
# 维度3:成本与资源状态
resource_state: ResourceState = field(default_factory=ResourceState)
# 维度4:外部上下文(时间、活动、依赖服务)
context: EnvironmentContext = field(default_factory=EnvironmentContext)
@dataclass
class PipelineState:
"""流水线当前状态"""
active_deployments: List[Deployment] # 进行中的部署
pending_approvals: List[ApprovalGate] # 待审批节点
test_results: Dict[str, TestResult] # 测试结果缓存
artifact_registry: Dict[str, Artifact] # 制品注册表
@dataclass
class ProductionState:
"""生产环境状态"""
services: Dict[str, ServiceHealth] # 服务健康状态
error_rates: Dict[str, float] # 错误率(按服务)
latency_percentiles: Dict[str, LatencyStats] # 延迟分布
traffic_patterns: TrafficPattern # 当前流量模式
active_incidents: List[Incident] # 进行中的事故
@dataclass
class ResourceState:
"""资源与成本状态"""
cpu_utilization: Dict[str, float] # CPU使用率
memory_utilization: Dict[str, float] # 内存使用率
pod_counts: Dict[str, int] # Pod数量
hourly_cost: float # 当前每小时成本
cost_budget_remaining: float # 剩余预算
@dataclass
class EnvironmentContext:
"""环境上下文"""
timestamp: datetime # 当前时间
is_peak_hours: bool # 是否高峰时段
active_marketing_campaigns: List[str] # 活跃营销活动
dependency_health: Dict[str, bool] # 外部依赖健康
on_call_engineer: str # 当前值班工程师
感知接口:数据采集管道
Agent的"感官"是其感知环境的接口。在软件交付场景中,这包括四类核心数据源:
┌─────────────────────────────────────────────────────────────────┐
│ 感知数据分类 │
├────────────────┬────────────────────────────────────────────────┤
│ 数据类型 │ 具体指标/事件 │
├────────────────┼────────────────────────────────────────────────┤
│ Metrics │ RED指标(请求率/错误率/延迟) │
│ │ USE指标(利用率/饱和度/错误) │
│ │ 业务指标(转化率/GMV/活跃用户) │
├────────────────┼────────────────────────────────────────────────┤
│ Logs │ 应用错误日志 │
│ │ 访问日志(Nginx/ALB) │
│ │ 审计日志 │
├────────────────┼────────────────────────────────────────────────┤
│ Traces │ 分布式追踪(Jaeger/Zipkin) │
│ │ 服务调用链路 │
│ │ 数据库查询性能 │
├────────────────┼────────────────────────────────────────────────┤
│ Events │ Kubernetes事件 │
│ │ 部署状态变更 │
│ │ 告警触发/恢复 │
└────────────────┴────────────────────────────────────────────────┘
行动空间(A)的定义
Agent的行动空间定义了它能够对环境施加哪些干预。对于软件交付Agent,行动可以分为五大类:
class ActionSpace:
"""软件交付Agent的完整行动空间"""
# 部署操作
DEPLOY_ACTIONS = [
"start_canary_deployment", # 开始金丝雀发布(初始5%流量)
"advance_canary", # 推进金丝雀(5%→20%→50%→100%)
"pause_canary", # 暂停金丝雀发布
"abort_canary", # 中止金丝雀,全量回滚
"blue_green_cutover", # 蓝绿切换
"rolling_update", # 滚动更新
]
# 回滚操作
ROLLBACK_ACTIONS = [
"rollback_to_previous", # 回滚到上一版本
"rollback_to_stable", # 回滚到最近稳定版本
"partial_rollback", # 部分回滚(特定区域/集群)
"emergency_rollback", # 紧急完全回滚
]
# 扩缩容操作
SCALING_ACTIONS = [
"scale_up_pods", # 增加Pod数量
"scale_down_pods", # 减少Pod数量
"scale_up_nodes", # 增加节点
"enable_hpa", # 启用自动扩缩容
"adjust_resource_limits", # 调整资源限制
]
# 流量管理
TRAFFIC_ACTIONS = [
"shift_traffic_percentage", # 调整流量分配
"enable_circuit_breaker", # 启用熔断器
"add_rate_limiting", # 添加限流
"enable_bulkhead", # 隔离故障服务
]
# 通知与升级
NOTIFICATION_ACTIONS = [
"send_slack_alert", # 发送Slack告警
"page_on_call", # 触发PagerDuty
"create_incident", # 创建事故工单
"request_human_approval", # 请求人工审批
"escalate_to_manager", # 升级给管理者
]
奖励信号的设计
奖励函数是强化学习中最难设计的部分。一个设计不当的奖励函数会导致Agent的行为偏离我们的意图(奖励破解,Reward Hacking)。
对于软件交付场景,我们设计一个多目标奖励函数:
class RewardFunction:
"""
软件交付Agent奖励函数设计
核心原则:
1. 对齐业务目标(SLO达成)
2. 惩罚负面影响(故障、延迟、成本浪费)
3. 鼓励主动优化(而非被动响应)
"""
def __init__(self):
# 权重配置(可通过业务优先级调整)
self.weights = {
"slo_achievement": 0.4, # SLO达成:最高权重
"deployment_success": 0.3, # 部署成功率
"cost_efficiency": 0.15, # 成本效率
"mttr_improvement": 0.15, # MTTR改善
}
def compute_reward(
self,
pre_action_state: dict,
action: str,
post_action_state: dict,
outcome: str
) -> float:
"""计算单步奖励"""
reward = 0.0
# 1. SLO达成奖励
slo_score = self._compute_slo_reward(pre_action_state, post_action_state)
reward += self.weights["slo_achievement"] * slo_score
# 2. 部署成功奖励
deploy_score = self._compute_deployment_reward(outcome, action)
reward += self.weights["deployment_success"] * deploy_score
# 3. 成本效率奖励
cost_score = self._compute_cost_reward(pre_action_state, post_action_state)
reward += self.weights["cost_efficiency"] * cost_score
# 4. 严重惩罚项(不可通过其他奖励抵消)
if outcome == "production_incident":
reward -= 10.0 # 生产事故:严重惩罚
if outcome == "data_loss":
reward -= 100.0 # 数据丢失:极端惩罚
return reward
def _compute_slo_reward(self, pre: dict, post: dict) -> float:
"""SLO改善奖励"""
pre_slo = self._slo_compliance_score(pre)
post_slo = self._slo_compliance_score(post)
return (post_slo - pre_slo) * 10 # 归一化到[-10, 10]
def _slo_compliance_score(self, state: dict) -> float:
"""计算SLO达成分数 [0, 1]"""
scores = []
# 错误率:目标 < 0.1%
error_rate = state.get("error_rate", 0)
scores.append(max(0, 1 - error_rate / 0.001))
# P99延迟:目标 < 500ms
p99 = state.get("p99_latency_ms", 0)
scores.append(max(0, 1 - p99 / 500))
# 可用性:目标 > 99.9%
availability = state.get("availability", 1.0)
scores.append(availability / 0.999)
return sum(scores) / len(scores)
3.1.3 多Agent协作架构
单一Agent难以处理软件交付的全部复杂性。在大规模系统中,我们通常采用多Agent协作架构,将问题分解为多个专业化Agent共同解决。
专用Agent vs 通用Agent的权衡
┌─────────────────────────────────────────────────────────────────┐
│ Agent专化程度的权衡 │
├──────────────────────┬──────────────────────────────────────────┤
│ 专用Agent │ 通用Agent │
├──────────────────────┼──────────────────────────────────────────┤
│ 在特定域内性能最优 │ 跨域任务灵活处理 │
│ 推理成本低 │ 推理成本高 │
│ 边界清晰,易于测试 │ 行为多变,难以预测 │
│ 知识迁移能力弱 │ 知识迁移能力强 │
│ 维护多个Agent负担重 │ 单一入口,维护简单 │
│ 适合:高频确定性任务 │ 适合:低频复杂任务 │
└──────────────────────┴──────────────────────────────────────────┘
我们的架构采用混合策略:核心操作使用专用Agent(保证效率和安全),跨域协调使用Orchestrator Agent(保证灵活性)。
Agent间通信协议设计
Agent间通信需要满足:异步解耦、消息可靠投递、类型安全三个核心需求。我们定义统一的Agent通信协议(ACP, Agent Communication Protocol):
from dataclasses import dataclass
from typing import Any, Optional, Union
from enum import Enum
import uuid
import time
class MessageType(Enum):
"""Agent消息类型"""
REQUEST = "request" # 请求执行某动作
RESPONSE = "response" # 响应请求结果
EVENT = "event" # 状态变更事件
QUERY = "query" # 查询信息
BROADCAST = "broadcast" # 广播消息给所有Agent
class Priority(Enum):
"""消息优先级"""
CRITICAL = 0 # 生产事故:立即处理
HIGH = 1 # 部署失败:30秒内处理
NORMAL = 2 # 常规操作:5分钟内处理
LOW = 3 # 优化建议:1小时内处理
@dataclass
class AgentMessage:
"""Agent间通信的标准消息格式"""
# 消息元数据
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: Optional[str] = None # 关联请求的ID
timestamp: float = field(default_factory=time.time)
# 路由信息
sender_id: str = "" # 发送方Agent ID
receiver_id: str = "" # 接收方Agent ID(可以是广播)
message_type: MessageType = MessageType.EVENT
priority: Priority = Priority.NORMAL
# 消息内容
action: str = "" # 请求的操作
payload: dict = field(default_factory=dict) # 操作参数
context: dict = field(default_factory=dict) # 上下文信息
# 结果(用于Response类型)
success: Optional[bool] = None
result: Optional[Any] = None
error: Optional[str] = None
# 可靠性
ttl_seconds: int = 300 # 消息存活时间
retry_count: int = 0 # 当前重试次数
max_retries: int = 3 # 最大重试次数
Orchestrator + Worker 模式
这是我们架构的核心协作模式。Orchestrator负责任务分解和协调,Worker Agent负责具体执行:
┌──────────────────┐
│ Orchestrator │
│ Agent (主控器) │
└────────┬─────────┘
│ 分解任务
┌────────┼─────────┐
▼ ▼ ▼
┌─────────────┐ ┌──────┐ ┌──────────────┐
│ Deployment │ │ Risk │ │ Rollback │
│ Agent │ │Agent │ │ Agent │
└─────────────┘ └──────┘ └──────────────┘
Worker Agents(专业执行者)
class OrchestratorAgent:
"""
主控Agent:负责高层任务分解和多Agent协调
"""
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.worker_registry: Dict[str, WorkerCapability] = {}
self.active_tasks: Dict[str, TaskExecution] = {}
async def handle_deployment_request(
self,
request: DeploymentRequest
) -> DeploymentPlan:
"""
处理部署请求:分解任务并协调多个Worker Agent
"""
# 步骤1:风险评估(委托给Risk Agent)
risk_report = await self._delegate(
worker="risk_agent",
action="assess_deployment_risk",
payload={
"service": request.service_name,
"version": request.target_version,
"environment": request.environment
}
)
# 步骤2:基于风险选择部署策略
strategy = self._select_deployment_strategy(
risk_score=risk_report.overall_score,
service_criticality=request.criticality
)
# 步骤3:并行执行前置检查
pre_checks = await asyncio.gather(
self._delegate("health_agent", "check_cluster_health", {}),
self._delegate("capacity_agent", "verify_capacity", {
"replicas": strategy.target_replicas
}),
self._delegate("dependency_agent", "check_dependencies", {
"service": request.service_name
})
)
# 步骤4:如果前置检查通过,开始部署
if all(check.passed for check in pre_checks):
deployment_task = await self._delegate(
"deployment_agent",
"execute_deployment",
{"strategy": strategy, "request": request}
)
# 步骤5:持续监控部署进度
return await self._monitor_deployment(deployment_task.id)
else:
# 前置检查失败,请求人工介入
await self._delegate(
"notification_agent",
"request_human_review",
{"reason": "pre_checks_failed", "details": pre_checks}
)
def _select_deployment_strategy(
self,
risk_score: float,
service_criticality: str
) -> DeploymentStrategy:
"""基于风险分数选择部署策略"""
if service_criticality == "critical" or risk_score > 0.7:
return CanaryStrategy(
initial_weight=1, # 从1%流量开始
increment_step=5, # 每步增加5%
observation_duration=300 # 每步观察5分钟
)
elif risk_score > 0.4:
return CanaryStrategy(
initial_weight=5,
increment_step=20,
observation_duration=120
)
else:
return RollingUpdateStrategy(
max_surge=25,
max_unavailable=0
)
async def _delegate(
self,
worker: str,
action: str,
payload: dict
) -> AgentResponse:
"""向Worker Agent委派任务"""
msg = AgentMessage(
sender_id="orchestrator",
receiver_id=worker,
message_type=MessageType.REQUEST,
action=action,
payload=payload,
priority=Priority.HIGH
)
# 发送请求并等待响应(带超时)
response = await self.message_bus.send_and_wait(
msg,
timeout_seconds=60
)
if not response.success:
raise AgentDelegationError(
f"Worker {worker} failed to execute {action}: {response.error}"
)
return response
共识机制:多Agent决策冲突解决
当多个Agent对同一问题给出不同的判断时,我们需要一个可靠的共识机制:
class ConsensusEngine:
"""
多Agent共识引擎
用于解决Agent间的决策冲突
"""
def __init__(self, quorum_threshold: float = 0.6):
self.quorum_threshold = quorum_threshold # 60%多数同意即达成共识
async def reach_consensus(
self,
question: str,
participating_agents: List[str],
context: dict
) -> ConsensusResult:
"""
向多个Agent发起投票,达成共识
应用场景:
- 是否需要回滚当前部署?
- 是否需要触发PagerDuty?
- 是否允许在当前高风险时间窗口发布?
"""
# 并行收集所有Agent的意见
votes = await asyncio.gather(*[
self._get_agent_vote(agent_id, question, context)
for agent_id in participating_agents
])
# 统计投票结果
positive_votes = [v for v in votes if v.decision == "yes"]
confidence_weighted_score = sum(
v.confidence for v in positive_votes
) / len(votes)
# 判断是否达成共识
simple_majority = len(positive_votes) / len(votes)
if simple_majority >= self.quorum_threshold:
decision = "approved"
elif simple_majority < (1 - self.quorum_threshold):
decision = "rejected"
else:
decision = "escalate_to_human" # 无法达成共识,升级
return ConsensusResult(
question=question,
decision=decision,
votes=votes,
confidence=confidence_weighted_score,
requires_human=decision == "escalate_to_human"
)
3.2 自进化架构的核心设计原则
本节详细阐述自进化超级智能体的五层架构设计。每一层都有明确的职责边界和接口规范,共同构成一个具备持续学习能力的完整系统。
3.2.1 感知层设计(Perception Layer)
感知层是Agent与外部世界交互的"感官系统"。其核心目标是:将分散、异构、嘈杂的原始数据转化为结构化、高质量的情境信息。
统一遥测标准:OpenTelemetry详解
OpenTelemetry(OTel)是云原生可观测性的工业标准。采用OTel的最大优势是供应商无关性——今天用Jaeger,明天换Zipkin,不需要修改应用代码。
OpenTelemetry的核心概念:
OpenTelemetry 数据模型
Traces(追踪)
└── Span(操作片段)
├── TraceID(全局唯一追踪ID)
├── SpanID(当前Span ID)
├── ParentSpanID(父Span,构成调用链)
├── Name(操作名称)
├── StartTime / EndTime
├── Attributes(键值对,最多128个)
├── Events(时间戳+消息的有序列表)
└── Status(OK/Error)
Metrics(指标)
└── 五种数据类型:
├── Counter(单调递增计数器)
├── Gauge(可上下波动的值)
├── Histogram(值分布统计)
├── UpDownCounter(可增可减的计数器)
└── ObservableXxx(异步观测版本)
Logs(日志)
└── LogRecord
├── Timestamp
├── SeverityNumber(1-24,对应TRACE/DEBUG/INFO/WARN/ERROR/FATAL)
├── Body(日志消息)
└── Attributes(结构化字段)
在Python应用中集成OpenTelemetry:
# requirements.txt
# opentelemetry-api==1.20.0
# opentelemetry-sdk==1.20.0
# opentelemetry-exporter-otlp==1.20.0
# opentelemetry-instrumentation-fastapi==0.41b0
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
def setup_telemetry(service_name: str, otlp_endpoint: str):
"""初始化OpenTelemetry"""
# 配置Trace Provider
trace_provider = TracerProvider(
resource=Resource.create({
"service.name": service_name,
"service.version": os.getenv("APP_VERSION", "unknown"),
"deployment.environment": os.getenv("ENVIRONMENT", "production"),
})
)
trace_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=otlp_endpoint)
)
)
trace.set_tracer_provider(trace_provider)
# 配置Metrics Provider
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=otlp_endpoint),
export_interval_millis=10000 # 10秒导出一次
)
metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# 在业务代码中使用
tracer, meter = setup_telemetry("payment-service", "http://otel-collector:4317")
# 自定义业务指标
deployment_counter = meter.create_counter(
"deployments_total",
description="Total number of deployments",
unit="1"
)
deployment_duration = meter.create_histogram(
"deployment_duration_seconds",
description="Deployment duration in seconds",
unit="s"
)
deployment_success_rate = meter.create_observable_gauge(
"deployment_success_rate",
callbacks=[lambda: deployment_success_rate_value],
description="Deployment success rate (7 day rolling)",
unit="1"
)
# 在部署完成时记录
async def record_deployment_completion(
service: str,
version: str,
duration: float,
success: bool
):
deployment_counter.add(1, {
"service": service,
"version": version,
"status": "success" if success else "failure",
"environment": "production"
})
deployment_duration.record(duration, {"service": service})
多源数据融合架构
单一数据源无法提供完整的环境视图。我们的感知层需要融合四类数据源:
┌─────────────────────────────────────────────────────────────────┐
│ 多源数据融合管道 │
├───────────────────────────────────────────────────────────────┐ │
│ 数据源层 │ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │Prometheus│ │Loki/ELK │ │ Jaeger │ │ 业务指标API │ │ │
│ │(Metrics) │ │ (Logs) │ │ (Traces) │ │ (GMV/用户数) │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────────┬─────────┘ │ │
│ │ │ │ │ │ │
├───────┼─────────────┼─────────────┼────────────────┼───────────┘ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Kafka 消息总线(统一数据通道) │ │
│ │ topic: metrics.raw | logs.error | traces.spans | biz.events│ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Flink 流处理引擎(实时计算) │ │
│ │ • 时序对齐(Event Time + Watermarks) │ │
│ │ • 数据关联(Trace与Metrics跨源JOIN) │ │
│ │ • 特征计算(滑动窗口聚合) │ │
│ │ • 异常检测(在线学习模型) │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 结构化环境状态(供Agent消费) │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
数据质量保障:去噪、填充、归一化
原始遥测数据充满噪声,直接喂给Agent会导致频繁误操作。数据质量处理是感知层不可或缺的模块:
class DataQualityProcessor:
"""
遥测数据质量处理器
处理顺序:
1. 去噪(过滤毛刺)
2. 缺失值填充
3. 归一化(量纲统一)
4. 异常检测(标记可疑数据点)
"""
def __init__(self, config: DataQualityConfig):
self.config = config
self.z_score_threshold = 3.0 # 超过3σ视为异常
self.ewma_alpha = 0.3 # 指数加权移动平均系数
def denoise(self, series: pd.Series) -> pd.Series:
"""
去噪:使用指数加权移动平均(EWMA)平滑时间序列
选择EWMA而非简单移动平均的原因:
- 对近期数据赋予更高权重
- 更快响应真实的状态变化
- 同时过滤随机噪声
"""
return series.ewm(alpha=self.ewma_alpha, adjust=False).mean()
def fill_missing(self, series: pd.Series) -> pd.Series:
"""
缺失值填充策略:
- 短期缺失(<5分钟):线性插值
- 中期缺失(5-30分钟):前向填充
- 长期缺失(>30分钟):标记为数据不可用,不填充
"""
gap_threshold_short = pd.Timedelta(minutes=5)
gap_threshold_medium = pd.Timedelta(minutes=30)
# 检测连续缺失段
null_groups = series.isnull().astype(int).groupby(
(series.notnull().astype(int).cumsum())
)
result = series.copy()
for _, group in null_groups:
if group.sum() == 0: # 非缺失段,跳过
continue
gap_duration = group.index[-1] - group.index[0]
if gap_duration <= gap_threshold_short:
# 短期缺失:线性插值
result = result.interpolate(method='time')
elif gap_duration <= gap_threshold_medium:
# 中期缺失:前向填充
result = result.fillna(method='ffill')
else:
# 长期缺失:保持NaN,下游Agent需处理数据不可用
pass
return result
def normalize(
self,
series: pd.Series,
method: str = "min_max"
) -> pd.Series:
"""
归一化:将不同量纲的指标统一到[0, 1]区间
"""
if method == "min_max":
min_val = series.min()
max_val = series.max()
if max_val == min_val:
return pd.Series([0.5] * len(series), index=series.index)
return (series - min_val) / (max_val - min_val)
elif method == "z_score":
mean = series.mean()
std = series.std()
if std == 0:
return pd.Series([0.0] * len(series), index=series.index)
return (series - mean) / std
def detect_anomalies(self, series: pd.Series) -> pd.Series:
"""
异常检测:使用Z分数方法标记异常点
返回布尔序列,True表示异常
"""
z_scores = np.abs(stats.zscore(series.dropna()))
anomaly_mask = pd.Series(False, index=series.index)
anomaly_mask[series.dropna().index] = z_scores > self.z_score_threshold
return anomaly_mask
def process_pipeline(self, raw_series: pd.Series) -> ProcessedMetric:
"""完整的数据质量处理流水线"""
# 1. 去噪
denoised = self.denoise(raw_series)
# 2. 缺失值填充
filled = self.fill_missing(denoised)
# 3. 归一化
normalized = self.normalize(filled)
# 4. 异常检测
anomalies = self.detect_anomalies(raw_series) # 对原始数据检测异常
return ProcessedMetric(
values=normalized,
anomaly_flags=anomalies,
data_quality_score=1 - (raw_series.isnull().sum() / len(raw_series)),
has_significant_gaps=raw_series.isnull().sum() > len(raw_series) * 0.1
)
3.2.2 认知层设计(Cognition Layer)
认知层是Agent的"大脑",负责将感知到的信息与历史经验相结合,形成决策所需的完整情境理解。其核心是记忆管理系统。
四级记忆架构
受人类认知心理学启发,我们设计了四级记忆系统:
┌─────────────────────────────────────────────────────────────────┐
│ 四级记忆架构 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 工作记忆(Working Memory) │ │
│ │ • 容量:当前会话的完整上下文 │ │
│ │ • 存储:内存(Redis) │ │
│ │ • 时效:当前部署会话(数小时) │ │
│ │ • 内容:当前决策的所有相关信息 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 短期记忆(Short-Term Memory) │ │
│ │ • 容量:最近30天的部署历史 │ │
│ │ • 存储:PostgreSQL(热数据) │ │
│ │ • 时效:30天 │ │
│ │ • 内容:近期决策-结果对,用于快速模式匹配 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 长期记忆(Long-Term Memory) │ │
│ │ • 容量:全历史部署记录 │ │
│ │ • 存储:PostgreSQL + 时序DB │ │
│ │ • 时效:永久(定期压缩) │ │
│ │ • 内容:服务特征档案,历史模式,异常记录 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 知识图谱(Knowledge Graph) │ │
│ │ • 容量:系统全局知识 │ │
│ │ • 存储:Neo4j / 向量数据库 │ │
│ │ • 时效:持续更新 │ │
│ │ • 内容:服务依赖关系,风险历史,最佳实践 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
知识图谱:服务依赖关系建模
服务依赖关系是风险评估中最重要的知识之一。我们用图数据结构建模:
@dataclass
class ServiceNode:
"""服务节点"""
service_id: str
service_name: str
tier: str # "frontend" | "backend" | "data"
criticality: str # "critical" | "high" | "medium" | "low"
slo_targets: Dict[str, float]
# 历史数据
avg_deployment_duration: float # 平均部署时长(秒)
deployment_success_rate: float # 近30天部署成功率
last_incident_date: Optional[datetime]
incident_count_30d: int
@dataclass
class ServiceDependency:
"""服务依赖边"""
upstream: str # 依赖方(调用者)
downstream: str # 被依赖方(提供者)
dependency_type: str # "hard" | "soft" | "async"
# hard: 同步调用,下游不可用则上游请求失败
# soft: 有降级逻辑,下游不可用时有备选
# async: 异步消费,延迟可接受
traffic_percentage: float # 流量中有多少比例依赖此路径
fallback_strategy: Optional[str]
class ServiceDependencyGraph:
"""
服务依赖图:用于分析部署影响范围
"""
def __init__(self):
self.graph = nx.DiGraph() # 有向图
self.nodes: Dict[str, ServiceNode] = {}
def add_service(self, node: ServiceNode):
self.nodes[node.service_id] = node
self.graph.add_node(node.service_id, **asdict(node))
def add_dependency(self, dep: ServiceDependency):
self.graph.add_edge(
dep.upstream,
dep.downstream,
**asdict(dep)
)
def get_blast_radius(
self,
service_id: str,
failure_type: str = "total"
) -> BlastRadiusReport:
"""
计算爆炸半径:service_id 发生故障后,哪些服务会受影响?
这是部署风险评估的核心算法。
"""
# 找到所有依赖此服务的上游服务(可能受影响的服务)
affected_services = []
for upstream in self.graph.predecessors(service_id):
edge_data = self.graph.edges[upstream, service_id]
dep_type = edge_data["dependency_type"]
if failure_type == "total" and dep_type == "hard":
# 硬依赖:上游服务会完全中断
impact_level = "critical"
elif failure_type == "degraded" and dep_type == "hard":
# 性能下降:上游受影响但不中断
impact_level = "high"
elif dep_type == "soft":
# 软依赖:上游会降级服务
impact_level = "medium"
else:
impact_level = "low"
affected_services.append({
"service_id": upstream,
"impact_level": impact_level,
"dependency_type": dep_type,
"traffic_affected": edge_data["traffic_percentage"]
})
# 递归计算传播链(二阶影响)
secondary_affected = []
for service in affected_services:
if service["impact_level"] in ["critical", "high"]:
secondary = self.get_blast_radius(
service["service_id"],
failure_type="degraded"
)
secondary_affected.extend(secondary.direct_impact)
return BlastRadiusReport(
epicenter=service_id,
direct_impact=affected_services,
secondary_impact=secondary_affected,
total_affected_count=len(affected_services) + len(secondary_affected),
highest_criticality=max(
[s["impact_level"] for s in affected_services],
default="none",
key=lambda x: ["none", "low", "medium", "high", "critical"].index(x)
)
)
3.2.3 决策层设计(Decision Layer)
决策层是整个系统的核心,负责将情境信息转化为具体的行动计划。我们采用三路径决策架构(3-Path Decision Architecture),根据问题的复杂度和紧迫性选择不同的决策机制。
分层决策架构
┌─────────────────────────────────────────────────────────────────┐
│ 分层决策架构 │
│ │
│ 战略层(Strategic Layer) │
│ • 时间跨度:周/月级别 │
│ • 决策内容:发布策略调整,架构优化建议,团队流程改进 │
│ • 决策主体:LLM推理 + 人类最终审批 │
│ │
│ 战术层(Tactical Layer) │
│ • 时间跨度:小时/天级别 │
│ • 决策内容:发布计划制定,风险评分,部署窗口选择 │
│ • 决策主体:ML模型 + 规则引擎 │
│ │
│ 操作层(Operational Layer) │
│ • 时间跨度:秒/分钟级别 │
│ • 决策内容:扩缩容,流量切换,告警响应,自动回滚 │
│ • 决策主体:规则引擎(快速响应,可预测) │
└─────────────────────────────────────────────────────────────────┘
三路径决策路由
class ThreePathDecisionRouter:
"""
三路径决策路由器
核心问题:给定一个决策请求,应该使用哪种决策机制?
路径1 - 规则引擎(Fast Path):已知场景,<100ms响应
路径2 - ML模型(Learn Path):复杂但有历史数据,<1s响应
路径3 - LLM推理(Complex Path):语义理解,<30s响应
"""
def __init__(
self,
rule_engine: RuleEngine,
ml_model: DeploymentMLModel,
llm_reasoner: LLMReasoner
):
self.rule_engine = rule_engine
self.ml_model = ml_model
self.llm_reasoner = llm_reasoner
async def route_decision(
self,
context: DecisionContext
) -> Decision:
"""智能路由:选择最合适的决策路径"""
# 路由条件1:是否命中规则库(已知明确场景)
rule_match = self.rule_engine.match(context)
if rule_match.confidence >= 0.95:
return await self._fast_path(context, rule_match)
# 路由条件2:是否有足够的历史数据支撑ML模型
has_sufficient_data = await self._check_ml_eligibility(context)
if has_sufficient_data and not context.requires_explanation:
return await self._learn_path(context)
# 路由条件3:复杂场景或需要自然语言解释时使用LLM
return await self._complex_path(context)
async def _fast_path(
self,
context: DecisionContext,
rule_match: RuleMatch
) -> Decision:
"""
快速路径:规则引擎直接决策
适用场景:
- CPU > 90% → 扩容
- 错误率 > 5% → 回滚
- 部署时间 > 30分钟 → 告警
"""
start = time.time()
action = rule_match.triggered_rule.action
return Decision(
action=action,
confidence=rule_match.confidence,
reasoning=f"Rule triggered: {rule_match.triggered_rule.name}",
path_used="rule_engine",
latency_ms=(time.time() - start) * 1000
)
async def _learn_path(self, context: DecisionContext) -> Decision:
"""
学习路径:ML模型决策
适用场景:
- 风险评分计算(需要多因素建模)
- 发布窗口预测(时间序列模型)
- 异常根因分类
"""
start = time.time()
features = self._extract_features(context)
prediction = await self.ml_model.predict(features)
return Decision(
action=prediction.action,
confidence=prediction.confidence,
reasoning=f"ML model prediction (confidence: {prediction.confidence:.2f})",
feature_importance=prediction.feature_importance,
path_used="ml_model",
latency_ms=(time.time() - start) * 1000
)
async def _complex_path(self, context: DecisionContext) -> Decision:
"""
复杂路径:LLM推理
适用场景:
- 全新的故障模式(无历史案例)
- 需要跨域知识推理
- 需要自然语言解释
- 边缘场景处理
"""
start = time.time()
# 构造结构化提示词
prompt = self._build_decision_prompt(context)
# 调用LLM
llm_response = await self.llm_reasoner.reason(
prompt=prompt,
temperature=0.1, # 低温度:更确定性的输出
max_tokens=1000
)
# 解析LLM响应
decision = self._parse_llm_response(llm_response)
decision.path_used = "llm_reasoning"
decision.latency_ms = (time.time() - start) * 1000
return decision
def _build_decision_prompt(self, context: DecisionContext) -> str:
"""构造决策提示词"""
return f"""你是一个软件交付系统的决策专家。请分析以下部署场景并给出决策。
## 当前系统状态
- 服务:{context.service_name}
- 部署版本:{context.target_version}
- 环境:{context.environment}
- 当前错误率:{context.current_error_rate:.2%}
- P99延迟:{context.p99_latency_ms}ms
- 可用性:{context.availability:.4%}
## 部署历史(最近5次)
{self._format_history(context.deployment_history)}
## 当前异常信号
{self._format_anomalies(context.anomaly_signals)}
## 服务依赖影响
{self._format_blast_radius(context.blast_radius)}
请按以下JSON格式输出你的决策:
{{
"action": "continue_deployment|pause_deployment|rollback|request_human_review",
"confidence": 0.0-1.0,
"reasoning": "决策理由(1-3句话)",
"risk_factors": ["风险因素1", "风险因素2"],
"monitoring_focus": ["需要重点监控的指标"]
}}"""
3.2.4 执行层设计(Execution Layer)
执行层负责将决策层的指令安全、可靠地转化为实际的系统操作。在分布式环境中,执行层需要处理网络故障、部分失败、并发冲突等复杂情况。
幂等性保证
所有执行操作必须是幂等的——相同的操作执行多次,结果与执行一次相同。这是保证分布式系统可靠性的基础:
class IdempotentExecutor:
"""
幂等执行器
核心机制:基于操作的唯一ID(Idempotency Key)实现幂等性
"""
def __init__(self, state_store: Redis):
self.state_store = state_store
self.ttl_seconds = 86400 # 24小时过期
async def execute(
self,
operation: DeliveryOperation,
idempotency_key: str
) -> OperationResult:
"""
幂等执行操作
流程:
1. 检查操作是否已执行(查询Redis)
2. 如果已完成,直接返回历史结果(幂等返回)
3. 如果正在执行,等待或返回进行中状态
4. 如果未执行,执行并存储结果
"""
state_key = f"operation:{idempotency_key}"
# 检查已有状态
existing = await self.state_store.get(state_key)
if existing:
state = OperationState.from_json(existing)
if state.status == "completed":
# 幂等:返回历史结果
return state.result
elif state.status == "in_progress":
# 操作正在进行中,等待完成
return await self._wait_for_completion(state_key)
elif state.status == "failed":
# 上次失败,需要重新执行
if state.retry_count < state.max_retries:
pass # 继续下面的执行逻辑
else:
raise MaxRetriesExceeded(idempotency_key)
# 标记为进行中(原子操作,防并发)
acquired = await self.state_store.set(
state_key,
OperationState(
status="in_progress",
started_at=time.time(),
retry_count=existing.retry_count + 1 if existing else 0
).to_json(),
nx=True, # 仅当key不存在时设置
ex=self.ttl_seconds
)
if not acquired:
# 其他进程已获得执行权
return await self._wait_for_completion(state_key)
try:
# 执行实际操作
result = await operation.execute()
# 标记完成
await self.state_store.set(
state_key,
OperationState(
status="completed",
result=result,
completed_at=time.time()
).to_json(),
ex=self.ttl_seconds
)
return result
except Exception as e:
# 标记失败
await self.state_store.set(
state_key,
OperationState(
status="failed",
error=str(e),
failed_at=time.time()
).to_json(),
ex=self.ttl_seconds
)
raise
Saga模式:分布式事务处理
部署操作涉及多个服务的协调变更,这是一个分布式事务场景。我们使用Saga模式处理:
class DeploymentSaga:
"""
部署Saga:将复杂的多步骤部署封装为可回滚的事务
Saga = 一系列本地事务 + 每步的补偿事务
示例:Canary发布的Saga
步骤1: 部署Canary Pod → 补偿: 删除Canary Pod
步骤2: 切换5%流量到Canary → 补偿: 切换回100%到Stable
步骤3: 监控5分钟 → 无补偿(观察步骤)
步骤4: 切换20%流量 → 补偿: 切换回Stable
步骤5: 全量切换100% → 补偿: 回滚到旧版本
步骤6: 删除旧版本Pod → 补偿: 恢复旧版本Pod(最后手段)
"""
def __init__(self, saga_id: str, state_store: PostgreSQL):
self.saga_id = saga_id
self.state_store = state_store
self.steps: List[SagaStep] = []
self.completed_steps: List[int] = []
def add_step(
self,
name: str,
transaction: Callable,
compensation: Callable
) -> "DeploymentSaga":
"""添加Saga步骤"""
self.steps.append(SagaStep(
index=len(self.steps),
name=name,
transaction=transaction,
compensation=compensation
))
return self # 支持链式调用
async def execute(self) -> SagaResult:
"""执行Saga,失败时自动触发补偿"""
for step in self.steps:
try:
# 执行步骤
result = await step.transaction()
self.completed_steps.append(step.index)
# 持久化进度
await self._save_progress(step.index, "completed", result)
except Exception as e:
# 步骤失败,触发补偿(逆序执行)
await self._compensate()
raise SagaExecutionError(
saga_id=self.saga_id,
failed_step=step.name,
cause=e,
compensated_steps=self.completed_steps
)
return SagaResult(saga_id=self.saga_id, status="completed")
async def _compensate(self):
"""逆序执行已完成步骤的补偿事务"""
for step_index in reversed(self.completed_steps):
step = self.steps[step_index]
try:
await step.compensation()
await self._save_progress(step_index, "compensated")
except Exception as e:
# 补偿失败:记录并继续(人工介入处理)
logger.critical(
f"Saga compensation failed for step {step.name}",
extra={"saga_id": self.saga_id, "error": str(e)}
)
# 发送紧急告警
await self._alert_human(step, e)
# 使用示例:构建Canary发布Saga
async def create_canary_deployment_saga(
service: str,
version: str,
saga_id: str
) -> DeploymentSaga:
saga = DeploymentSaga(saga_id, state_store)
saga.add_step(
name="deploy_canary_pods",
transaction=lambda: k8s_client.deploy_canary(service, version, replicas=1),
compensation=lambda: k8s_client.delete_canary(service)
).add_step(
name="shift_5_percent_traffic",
transaction=lambda: istio_client.set_traffic_split(service, canary=5, stable=95),
compensation=lambda: istio_client.set_traffic_split(service, canary=0, stable=100)
).add_step(
name="observe_5_minutes",
transaction=lambda: monitor.observe_and_validate(service, duration_minutes=5),
compensation=lambda: None # 观察步骤无补偿
).add_step(
name="shift_50_percent_traffic",
transaction=lambda: istio_client.set_traffic_split(service, canary=50, stable=50),
compensation=lambda: istio_client.set_traffic_split(service, canary=0, stable=100)
).add_step(
name="shift_100_percent_traffic",
transaction=lambda: istio_client.set_traffic_split(service, canary=100, stable=0),
compensation=lambda: istio_client.set_traffic_split(service, canary=0, stable=100)
)
return saga
3.2.5 自进化机制(Evolution Layer)
自进化是本书架构的最大亮点,也是与传统自动化系统的根本区别。进化层使Agent能够从每次经历中学习,持续提升决策质量。
在线学习:实时策略更新
class OnlineLearningEngine:
"""
在线学习引擎
核心理念:每次部署都是一个学习机会
每次决策→执行→观察结果的循环,都用于更新模型
"""
def __init__(self, model: RiskScoringModel):
self.model = model
self.experience_buffer = ExperienceReplayBuffer(max_size=10000)
self.update_frequency = 100 # 每100个经验样本触发一次更新
async def record_experience(
self,
state: dict,
action: str,
reward: float,
next_state: dict,
metadata: dict
):
"""记录一次部署经验"""
experience = Experience(
state=state,
action=action,
reward=reward,
next_state=next_state,
timestamp=time.time(),
metadata=metadata
)
self.experience_buffer.add(experience)
# 触发增量更新
if len(self.experience_buffer) % self.update_frequency == 0:
await self._incremental_update()
async def _incremental_update(self):
"""
增量模型更新
使用随机经验回放(Experience Replay)避免过拟合
"""
# 从缓冲区采样一批经验
batch = self.experience_buffer.sample(batch_size=64)
# 计算目标Q值(使用Bellman方程)
target_values = []
for exp in batch:
if exp.next_state is None: # 终止状态
target = exp.reward
else:
# Q-learning更新
next_q = self.model.predict_q_value(exp.next_state)
target = exp.reward + 0.95 * max(next_q) # γ=0.95
target_values.append(target)
# 执行梯度更新
states = [exp.state for exp in batch]
actions = [exp.action for exp in batch]
loss = await self.model.update(
states=states,
actions=actions,
target_values=target_values
)
logger.info(f"Online learning update: loss={loss:.4f}")
A/B测试框架:策略对比验证
在将新学习到的策略推广之前,必须通过A/B测试验证其效果:
class DeploymentStrategyABTest:
"""
部署策略A/B测试框架
用途:验证新的决策策略是否优于当前策略
方法:将部署随机分配到控制组(旧策略)和实验组(新策略)
评估:基于成功率、MTTR、部署时长等指标比较
"""
def __init__(self, experiment_id: str, traffic_split: float = 0.1):
self.experiment_id = experiment_id
self.traffic_split = traffic_split # 10%流量使用新策略
self.control_results = []
self.treatment_results = []
def assign_group(self, deployment_id: str) -> str:
"""基于哈希的确定性分组"""
hash_val = int(hashlib.md5(
f"{self.experiment_id}:{deployment_id}".encode()
).hexdigest(), 16)
if (hash_val % 100) < (self.traffic_split * 100):
return "treatment" # 实验组(新策略)
else:
return "control" # 控制组(旧策略)
def record_result(
self,
group: str,
deployment_id: str,
outcome: dict
):
"""记录实验结果"""
result = {
"deployment_id": deployment_id,
"success": outcome["success"],
"duration_seconds": outcome["duration"],
"error_rate_delta": outcome["error_rate_delta"],
"timestamp": time.time()
}
if group == "control":
self.control_results.append(result)
else:
self.treatment_results.append(result)
def compute_significance(self) -> ABTestResult:
"""计算统计显著性(使用双比例Z检验)"""
if len(self.control_results) < 30 or len(self.treatment_results) < 30:
return ABTestResult(
status="insufficient_data",
control_size=len(self.control_results),
treatment_size=len(self.treatment_results)
)
# 计算成功率
control_successes = sum(1 for r in self.control_results if r["success"])
treatment_successes = sum(1 for r in self.treatment_results if r["success"])
n1 = len(self.control_results)
n2 = len(self.treatment_results)
p1 = control_successes / n1
p2 = treatment_successes / n2
# Z检验
pooled_p = (control_successes + treatment_successes) / (n1 + n2)
se = math.sqrt(pooled_p * (1 - pooled_p) * (1/n1 + 1/n2))
if se == 0:
z_score = 0
else:
z_score = (p2 - p1) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
# 效果量(Cohen's h)
effect_size = 2 * math.asin(math.sqrt(p2)) - 2 * math.asin(math.sqrt(p1))
return ABTestResult(
status="completed",
control_success_rate=p1,
treatment_success_rate=p2,
relative_improvement=(p2 - p1) / p1,
p_value=p_value,
effect_size=effect_size,
is_significant=p_value < 0.05 and effect_size > 0.2,
recommendation="promote" if (p_value < 0.05 and p2 > p1) else "reject"
)
3.3 技术架构详细设计
3.3.1 整体架构图
以下Mermaid图展示了自进化超级智能体的完整架构:
3.3.2 核心技术栈选型
容器编排:Kubernetes 1.28+
选择Kubernetes的理由不仅是行业标准,更是因为其API驱动的设计非常适合Agent控制:
# Kubernetes版本要求
apiVersion: v1
# 核心功能依赖:
# - KEP-3998: CronJob v2 (稳定版调度)
# - KEP-3157: Allow informers for getting a stream of resource changes
# - Server-Side Apply (避免多控制器冲突)
# - TopologySpreadConstraints (跨可用区高可用)
# 最低版本: 1.26, 推荐版本: 1.28+
---
# Agent专用ServiceAccount(最小权限原则)
apiVersion: v1
kind: ServiceAccount
metadata:
name: harness-agent
namespace: harness-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: harness-agent-role
rules:
# 部署控制
- apiGroups: ["apps"]
resources: ["deployments", "replicasets", "statefulsets", "daemonsets"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
# Pod管理
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status", "pods/exec"]
verbs: ["get", "list", "watch", "delete"]
# 服务与配置
- apiGroups: [""]
resources: ["services", "configmaps", "endpoints"]
verbs: ["get", "list", "watch"]
# HPA自动扩缩容
- apiGroups: ["autoscaling"]
resources: ["horizontalpodautoscalers"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
# 事件读取(感知层需要)
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch"]
消息队列:Apache Kafka Topic设计
Kafka的Topic设计决定了整个系统的数据流向和扩展能力:
Kafka集群配置
brokers: 3(高可用最小配置)
replication_factor: 3
min_insync_replicas: 2
Topic列表:
┌───────────────────────────────────────────────────────────────┐
│ Topic名称 分区数 副本数 保留时间 │
├───────────────────────────────────────────────────────────────┤
│ harness.deployments.events 12 3 7天 │
│ harness.metrics.raw 24 3 1天 │
│ harness.metrics.processed 6 3 7天 │
│ harness.logs.errors 6 3 3天 │
│ harness.alerts.triggered 3 3 30天 │
│ harness.decisions.made 3 3 90天 │
│ harness.decisions.feedback 3 3 365天 │
│ harness.rollback.triggered 3 3 90天 │
│ harness.learning.experiences 6 3 365天 │
│ harness.human.feedback 3 3 无限 │
└───────────────────────────────────────────────────────────────┘
分区策略:
- deployments.events: 按service_name分区(保证同一服务有序)
- metrics.raw: 按service_name哈希分区
- decisions.*: 按deployment_id分区(保证决策序列有序)
时序数据库:VictoriaMetrics配置
# VictoriaMetrics 集群版配置
# 数据保留策略:
# - 原始数据:15天(精细粒度,每15秒一点)
# - 降采样5分钟:90天
# - 降采样1小时:2年
# victoriametrics-cluster values.yaml
vmcluster:
spec:
# 数据保留
retentionPeriod: "15d"
vminsert:
replicaCount: 2
resources:
requests:
cpu: "500m"
memory: "512Mi"
vmstorage:
replicaCount: 3
storageDataPath: /vm-data
storage:
volumeClaimTemplate:
spec:
resources:
requests:
storage: 200Gi
storageClassName: fast-ssd
vmselect:
replicaCount: 2
cacheMountPath: /select-cache
# 预聚合规则(减少查询延迟)
recording_rules:
- record: harness:deployment:success_rate:5m
expr: |
sum(rate(harness_deployment_total{status="success"}[5m])) by (service)
/
sum(rate(harness_deployment_total[5m])) by (service)
- record: harness:service:error_rate:5m
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m])) by (service)
/
sum(rate(http_requests_total[5m])) by (service)
3.3.3 微服务架构设计
Orchestrator服务:核心协调器
Orchestrator服务职责:
1. 接收部署请求(REST API)
2. 协调多个Worker Agent
3. 维护部署状态机
4. 路由决策请求到决策层
5. 处理异常和升级逻辑
技术选型:
- 框架:FastAPI(异步,高性能)
- 任务队列:Celery + Redis(分布式任务调度)
- 状态持久化:PostgreSQL(ACID保证)
- 服务发现:Kubernetes DNS
- 配置管理:ConfigMap + Secrets
关键指标(SLO):
- API响应时间:P99 < 200ms
- 任务调度延迟:P99 < 1s
- 可用性:99.9%
# orchestrator/main.py - Orchestrator服务入口
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import structlog
from .routers import deployments, rollbacks, decisions, health
from .dependencies import get_db, get_message_bus, get_agent_registry
from .config import settings
from .telemetry import setup_telemetry
logger = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
logger.info("Orchestrator starting up", version=settings.version)
# 初始化组件
await setup_telemetry(settings.service_name, settings.otlp_endpoint)
await get_message_bus().connect()
# 启动后台任务
app.state.health_monitor = asyncio.create_task(
run_health_monitor()
)
yield
# 优雅关闭
logger.info("Orchestrator shutting down")
app.state.health_monitor.cancel()
await get_message_bus().disconnect()
app = FastAPI(
title="Harness Orchestrator API",
version="1.0.0",
lifespan=lifespan
)
# 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_methods=["*"],
allow_headers=["*"],
)
# 路由注册
app.include_router(deployments.router, prefix="/api/v1/deployments", tags=["deployments"])
app.include_router(rollbacks.router, prefix="/api/v1/rollbacks", tags=["rollbacks"])
app.include_router(decisions.router, prefix="/api/v1/decisions", tags=["decisions"])
app.include_router(health.router, prefix="/api/v1/health", tags=["health"])
3.3.4 数据模型设计
完整的PostgreSQL数据库设计,支撑所有核心业务功能:
-- ============================================================
-- Harness Platform 数据库 Schema
-- 版本:1.0.0
-- 包含:10张核心业务表 + 索引 + 触发器
-- ============================================================
-- 启用必要扩展
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_trgm"; -- 模糊文本搜索
CREATE EXTENSION IF NOT EXISTS "btree_gin"; -- 复合索引优化
CREATE EXTENSION IF NOT EXISTS "pg_stat_statements"; -- 慢查询分析
-- ============================================================
-- 表1:服务档案(service_profiles)
-- 存储每个服务的静态信息和历史特征
-- ============================================================
CREATE TABLE service_profiles (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
service_name VARCHAR(255) NOT NULL UNIQUE,
service_alias VARCHAR(255),
team_owner VARCHAR(255) NOT NULL,
tier VARCHAR(50) NOT NULL CHECK (tier IN ('frontend', 'backend', 'data', 'infra')),
criticality VARCHAR(50) NOT NULL CHECK (criticality IN ('critical', 'high', 'medium', 'low')),
-- SLO目标
slo_error_rate_target DECIMAL(10, 6) NOT NULL DEFAULT 0.001, -- 0.1%
slo_p99_latency_ms INTEGER NOT NULL DEFAULT 500,
slo_availability_target DECIMAL(10, 6) NOT NULL DEFAULT 0.9999,
-- 技术栈信息
language VARCHAR(50),
framework VARCHAR(100),
container_image VARCHAR(500),
kubernetes_namespace VARCHAR(100),
kubernetes_deployment VARCHAR(100),
-- 统计数据(定期更新)
avg_deployment_duration_sec INTEGER DEFAULT 0,
deployment_success_rate_30d DECIMAL(5, 4) DEFAULT 1.0,
incident_count_30d INTEGER DEFAULT 0,
last_incident_at TIMESTAMP WITH TIME ZONE,
-- 元数据
tags JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP WITH TIME ZONE -- 软删除
);
CREATE INDEX idx_service_profiles_criticality ON service_profiles(criticality);
CREATE INDEX idx_service_profiles_team ON service_profiles(team_owner);
CREATE INDEX idx_service_profiles_namespace ON service_profiles(kubernetes_namespace);
-- ============================================================
-- 表2:部署记录(deployments)
-- 每次部署的完整生命周期记录
-- ============================================================
CREATE TABLE deployments (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
service_id UUID NOT NULL REFERENCES service_profiles(id),
-- 版本信息
current_version VARCHAR(255) NOT NULL, -- 当前运行版本
target_version VARCHAR(255) NOT NULL, -- 目标部署版本
git_commit_sha CHAR(40),
docker_image_tag VARCHAR(500),
-- 触发信息
triggered_by VARCHAR(100) NOT NULL, -- user_id或system
trigger_type VARCHAR(50) NOT NULL CHECK (
trigger_type IN ('manual', 'scheduled', 'auto_rollback', 'hotfix', 'ci_cd')
),
trigger_reason TEXT,
-- 部署配置
strategy VARCHAR(50) NOT NULL CHECK (
strategy IN ('canary', 'blue_green', 'rolling', 'recreate')
),
environment VARCHAR(50) NOT NULL CHECK (
environment IN ('dev', 'staging', 'production', 'canary')
),
-- 状态机
status VARCHAR(50) NOT NULL DEFAULT 'pending' CHECK (
status IN (
'pending', -- 等待执行
'risk_assessment', -- 风险评估中
'awaiting_approval', -- 等待人工审批
'in_progress', -- 部署执行中
'verifying', -- 验证观察中
'completed', -- 部署成功
'failed', -- 部署失败
'rolled_back', -- 已回滚
'cancelled' -- 已取消
)
),
-- 风险信息
risk_score DECIMAL(5, 4), -- 0-1风险评分
risk_factors JSONB DEFAULT '[]', -- 风险因素列表
-- 时间戳
scheduled_at TIMESTAMP WITH TIME ZONE,
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
duration_seconds INTEGER,
-- 结果
rollback_of UUID REFERENCES deployments(id), -- 如果这是回滚操作
rollback_reason TEXT,
outcome_metrics JSONB DEFAULT '{}', -- 部署后的关键指标快照
-- 元数据
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_deployments_service ON deployments(service_id);
CREATE INDEX idx_deployments_status ON deployments(status);
CREATE INDEX idx_deployments_environment ON deployments(environment);
CREATE INDEX idx_deployments_created_at ON deployments(created_at DESC);
CREATE INDEX idx_deployments_git_commit ON deployments(git_commit_sha) WHERE git_commit_sha IS NOT NULL;
-- 复合索引:按服务查最近N次部署(高频查询)
CREATE INDEX idx_deployments_service_created ON deployments(service_id, created_at DESC);
-- ============================================================
-- 表3:部署步骤(deployment_steps)
-- 记录每个部署的详细执行步骤
-- ============================================================
CREATE TABLE deployment_steps (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
deployment_id UUID NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
step_name VARCHAR(100) NOT NULL,
step_order INTEGER NOT NULL,
step_type VARCHAR(50) NOT NULL CHECK (
step_type IN ('pre_check', 'deploy', 'verify', 'promote', 'rollback', 'notify')
),
status VARCHAR(50) NOT NULL DEFAULT 'pending' CHECK (
status IN ('pending', 'running', 'completed', 'failed', 'skipped', 'compensated')
),
-- 执行详情
input_params JSONB DEFAULT '{}',
output JSONB DEFAULT '{}',
error_message TEXT,
-- 时间
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
duration_ms INTEGER,
-- 执行者
executor_type VARCHAR(50), -- rule_engine, ml_model, llm, human
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_deployment_steps_deployment ON deployment_steps(deployment_id, step_order);
CREATE INDEX idx_deployment_steps_status ON deployment_steps(status) WHERE status IN ('running', 'failed');
-- ============================================================
-- 表4:决策记录(decisions)
-- Agent做出的每个决策的完整记录
-- ============================================================
CREATE TABLE decisions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
deployment_id UUID REFERENCES deployments(id),
-- 决策上下文
decision_type VARCHAR(100) NOT NULL, -- deploy/rollback/scale/alert
context_snapshot JSONB NOT NULL, -- 做决策时的完整环境快照
-- 决策过程
path_used VARCHAR(50) NOT NULL CHECK (
path_used IN ('rule_engine', 'ml_model', 'llm_reasoning', 'human')
),
rule_triggered VARCHAR(200), -- 如果是规则路径,触发的规则名
model_version VARCHAR(100), -- 如果是ML路径,使用的模型版本
llm_prompt_hash VARCHAR(64), -- 如果是LLM路径,提示词哈希
-- 决策结果
action VARCHAR(100) NOT NULL,
confidence DECIMAL(5, 4), -- 置信度 0-1
reasoning TEXT, -- 决策理由
alternatives JSONB DEFAULT '[]', -- 其他备选方案
-- 执行结果
was_executed BOOLEAN DEFAULT FALSE,
actual_outcome VARCHAR(50) CHECK (
actual_outcome IN ('success', 'failure', 'partial', 'overridden', 'pending')
),
outcome_metrics JSONB DEFAULT '{}',
-- 人工反馈
human_feedback VARCHAR(50) CHECK (
human_feedback IN ('approved', 'rejected', 'modified', 'no_feedback')
),
human_comment TEXT,
feedback_by VARCHAR(100),
feedback_at TIMESTAMP WITH TIME ZONE,
-- 时间
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
executed_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX idx_decisions_deployment ON decisions(deployment_id);
CREATE INDEX idx_decisions_type ON decisions(decision_type);
CREATE INDEX idx_decisions_path ON decisions(path_used);
CREATE INDEX idx_decisions_created ON decisions(created_at DESC);
-- 用于学习管道:查询有真实结果反馈的决策
CREATE INDEX idx_decisions_with_outcome ON decisions(actual_outcome, created_at DESC)
WHERE actual_outcome IS NOT NULL;
-- ============================================================
-- 表5:规则库(rules)
-- 规则引擎的规则定义(支持动态更新)
-- ============================================================
CREATE TABLE rules (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
rule_name VARCHAR(200) NOT NULL UNIQUE,
rule_type VARCHAR(50) NOT NULL CHECK (
rule_type IN ('deployment_gate', 'auto_rollback', 'scaling', 'alert', 'approval')
),
-- 规则定义(DSL)
conditions JSONB NOT NULL, -- 触发条件
actions JSONB NOT NULL, -- 触发动作
priority INTEGER NOT NULL DEFAULT 100, -- 越小优先级越高
-- 范围
applies_to_services TEXT[] DEFAULT ARRAY[]::TEXT[], -- 空=全局
applies_to_envs TEXT[] DEFAULT ARRAY['production']::TEXT[],
-- 状态
is_active BOOLEAN NOT NULL DEFAULT TRUE,
is_dry_run BOOLEAN NOT NULL DEFAULT FALSE, -- 不执行,只记录
-- 统计
trigger_count INTEGER DEFAULT 0,
last_triggered TIMESTAMP WITH TIME ZONE,
success_rate DECIMAL(5, 4),
-- 元数据
description TEXT,
author VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_rules_active ON rules(is_active, priority) WHERE is_active = TRUE;
CREATE INDEX idx_rules_type ON rules(rule_type);
-- ============================================================
-- 表6:风险评估历史(risk_assessments)
-- 每次部署的风险评估结果
-- ============================================================
CREATE TABLE risk_assessments (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
deployment_id UUID NOT NULL REFERENCES deployments(id),
-- 评估结果
overall_score DECIMAL(5, 4) NOT NULL, -- 综合风险分 0-1
-- 分项评分
code_change_risk DECIMAL(5, 4), -- 代码变更风险
service_health_risk DECIMAL(5, 4), -- 服务健康风险
dependency_risk DECIMAL(5, 4), -- 依赖服务风险
traffic_risk DECIMAL(5, 4), -- 流量模式风险
timing_risk DECIMAL(5, 4), -- 时机风险(是否高峰期)
history_risk DECIMAL(5, 4), -- 历史失败模式风险
-- 风险因素详情
risk_factors JSONB NOT NULL DEFAULT '[]',
blast_radius JSONB DEFAULT '{}', -- 爆炸半径分析结果
-- 建议
recommended_strategy VARCHAR(50),
recommended_window VARCHAR(100), -- 建议的部署时间窗口
-- 评估元数据
model_version VARCHAR(100),
assessment_duration_ms INTEGER,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_risk_assessments_deployment ON risk_assessments(deployment_id);
CREATE INDEX idx_risk_assessments_score ON risk_assessments(overall_score);
-- ============================================================
-- 表7:服务依赖关系(service_dependencies)
-- 服务间的依赖关系图
-- ============================================================
CREATE TABLE service_dependencies (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
upstream_id UUID NOT NULL REFERENCES service_profiles(id),
downstream_id UUID NOT NULL REFERENCES service_profiles(id),
dependency_type VARCHAR(20) NOT NULL CHECK (
dependency_type IN ('hard', 'soft', 'async', 'optional')
),
traffic_percentage DECIMAL(5, 2) DEFAULT 100.0, -- 多少%流量依赖此路径
fallback_strategy VARCHAR(100),
-- 运行时观测数据
avg_call_rate_per_min INTEGER,
avg_latency_ms INTEGER,
error_rate_last_7d DECIMAL(5, 4),
-- 发现来源
discovery_method VARCHAR(50) CHECK (
discovery_method IN ('manual', 'service_mesh', 'code_analysis', 'network_analysis')
),
last_verified_at TIMESTAMP WITH TIME ZONE,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(upstream_id, downstream_id)
);
CREATE INDEX idx_service_deps_upstream ON service_dependencies(upstream_id);
CREATE INDEX idx_service_deps_downstream ON service_dependencies(downstream_id);
-- ============================================================
-- 表8:事故记录(incidents)
-- 生产事故与部署的关联关系
-- ============================================================
CREATE TABLE incidents (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
incident_number VARCHAR(50) UNIQUE NOT NULL, -- INC-2024-001格式
-- 关联部署
triggered_by_deployment UUID REFERENCES deployments(id),
resolved_by_deployment UUID REFERENCES deployments(id),
-- 事故信息
severity VARCHAR(20) NOT NULL CHECK (
severity IN ('P0', 'P1', 'P2', 'P3', 'P4')
),
title VARCHAR(500) NOT NULL,
description TEXT,
-- 影响范围
affected_services TEXT[] NOT NULL,
user_impact_count INTEGER DEFAULT 0,
revenue_impact_usd DECIMAL(15, 2),
-- 时间线
detected_at TIMESTAMP WITH TIME ZONE NOT NULL,
acknowledged_at TIMESTAMP WITH TIME ZONE,
mitigated_at TIMESTAMP WITH TIME ZONE,
resolved_at TIMESTAMP WITH TIME ZONE,
-- 关键指标
ttd_minutes INTEGER, -- Time to Detect
tta_minutes INTEGER, -- Time to Acknowledge
ttm_minutes INTEGER, -- Time to Mitigate
ttr_minutes INTEGER, -- Time to Resolve
-- RCA
root_cause TEXT,
root_cause_category VARCHAR(50) CHECK (
root_cause_category IN (
'bad_deploy', 'config_change', 'capacity', 'dependency',
'security', 'data_corruption', 'unknown'
)
),
prevention_actions TEXT,
-- 状态
status VARCHAR(20) DEFAULT 'active' CHECK (
status IN ('active', 'mitigated', 'resolved', 'postmortem_done')
),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_incidents_deployment ON incidents(triggered_by_deployment);
CREATE INDEX idx_incidents_severity ON incidents(severity);
CREATE INDEX idx_incidents_detected ON incidents(detected_at DESC);
CREATE INDEX idx_incidents_status ON incidents(status) WHERE status NOT IN ('resolved', 'postmortem_done');
-- ============================================================
-- 表9:模型版本(model_versions)
-- 机器学习模型的版本管理
-- ============================================================
CREATE TABLE model_versions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
model_name VARCHAR(200) NOT NULL,
version VARCHAR(50) NOT NULL,
-- 模型信息
model_type VARCHAR(100) NOT NULL, -- risk_scoring, anomaly_detection等
algorithm VARCHAR(100),
hyperparameters JSONB DEFAULT '{}',
-- 训练信息
training_data_start TIMESTAMP WITH TIME ZONE,
training_data_end TIMESTAMP WITH TIME ZONE,
training_samples INTEGER,
-- 性能指标
validation_metrics JSONB DEFAULT '{}', -- AUC, F1, Precision, Recall等
-- 生命周期
status VARCHAR(50) DEFAULT 'training' CHECK (
status IN ('training', 'evaluating', 'shadow', 'canary', 'production', 'retired')
),
-- 存储路径
model_artifact_path VARCHAR(500),
feature_schema JSONB DEFAULT '{}',
-- A/B测试结果
ab_test_id VARCHAR(100),
ab_test_result JSONB DEFAULT '{}',
deployed_at TIMESTAMP WITH TIME ZONE,
retired_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_model_versions_name_status ON model_versions(model_name, status);
CREATE INDEX idx_model_versions_production ON model_versions(model_name, deployed_at DESC)
WHERE status = 'production';
-- ============================================================
-- 表10:人工反馈(human_feedback)
-- 收集人类专家对Agent决策的评估
-- ============================================================
CREATE TABLE human_feedback (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
decision_id UUID NOT NULL REFERENCES decisions(id),
-- 反馈者信息
reviewer_id VARCHAR(100) NOT NULL,
reviewer_role VARCHAR(50), -- engineer, sre, manager
-- 反馈内容
rating INTEGER CHECK (rating BETWEEN 1 AND 5),
verdict VARCHAR(20) NOT NULL CHECK (
verdict IN ('correct', 'mostly_correct', 'incorrect', 'dangerous')
),
-- 如果Agent决策有误,正确答案是什么
correct_action VARCHAR(100),
correction_reason TEXT,
-- 改进建议
improvement_suggestion TEXT,
additional_context TEXT,
-- 是否用于训练
approved_for_training BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_human_feedback_decision ON human_feedback(decision_id);
CREATE INDEX idx_human_feedback_training ON human_feedback(approved_for_training, created_at)
WHERE approved_for_training = TRUE;
CREATE INDEX idx_human_feedback_reviewer ON human_feedback(reviewer_id);
-- ============================================================
-- 自动更新 updated_at 触发器
-- ============================================================
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_service_profiles_updated_at
BEFORE UPDATE ON service_profiles
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_deployments_updated_at
BEFORE UPDATE ON deployments
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_rules_updated_at
BEFORE UPDATE ON rules
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
-- ============================================================
-- 物化视图:部署健康仪表板
-- ============================================================
CREATE MATERIALIZED VIEW deployment_health_dashboard AS
SELECT
sp.service_name,
sp.criticality,
sp.team_owner,
COUNT(d.id) AS total_deployments_30d,
SUM(CASE WHEN d.status = 'completed' THEN 1 ELSE 0 END) AS successful_deployments_30d,
ROUND(
SUM(CASE WHEN d.status = 'completed' THEN 1 ELSE 0 END)::DECIMAL /
NULLIF(COUNT(d.id), 0) * 100, 2
) AS success_rate_pct,
AVG(d.duration_seconds) AS avg_duration_sec,
MAX(d.created_at) AS last_deployment_at,
COUNT(i.id) AS incidents_30d,
AVG(COALESCE(ra.overall_score, 0)) AS avg_risk_score
FROM service_profiles sp
LEFT JOIN deployments d ON d.service_id = sp.id
AND d.created_at >= CURRENT_TIMESTAMP - INTERVAL '30 days'
AND d.environment = 'production'
LEFT JOIN incidents i ON i.triggered_by_deployment = d.id
LEFT JOIN risk_assessments ra ON ra.deployment_id = d.id
WHERE sp.deleted_at IS NULL
GROUP BY sp.id, sp.service_name, sp.criticality, sp.team_owner
ORDER BY sp.criticality, success_rate_pct ASC;
-- 每小时刷新一次
CREATE UNIQUE INDEX ON deployment_health_dashboard(service_name);
3.4 手把手:搭建基础Agent框架
3.4.1 环境准备
本地Kubernetes集群搭建(Kind)
# 安装Kind
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind
# 创建多节点集群(1 control-plane + 2 workers)
cat << 'EOF' > kind-cluster.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
name: harness-dev
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
- role: worker
extraMounts:
- hostPath: /tmp/harness-data
containerPath: /data
- role: worker
extraMounts:
- hostPath: /tmp/harness-data
containerPath: /data
EOF
kind create cluster --config kind-cluster.yaml
# 验证集群
kubectl cluster-info --context kind-harness-dev
kubectl get nodes
使用Helm安装基础组件
# 添加常用Helm仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo add grafana https://grafana.github.io/helm-charts
helm repo add strimzi https://strimzi.io/charts/
helm repo update
# 创建命名空间
kubectl create namespace harness-system
kubectl create namespace harness-data
kubectl create namespace harness-monitoring
kubectl create namespace kafka
# 1. 安装PostgreSQL
helm install postgres bitnami/postgresql \
--namespace harness-data \
--set auth.postgresPassword=harness_dev_pwd \
--set auth.database=harness \
--set primary.persistence.size=20Gi
# 2. 安装Redis Cluster
helm install redis bitnami/redis \
--namespace harness-data \
--set auth.password=redis_dev_pwd \
--set cluster.enabled=true \
--set cluster.slaveCount=1
# 3. 安装Kafka (使用Strimzi Operator)
helm install strimzi-operator strimzi/strimzi-kafka-operator \
--namespace kafka
kubectl apply -n kafka -f - << 'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: harness-kafka
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
EOF
# 4. 安装Prometheus + Grafana监控栈
helm install monitoring prometheus-community/kube-prometheus-stack \
--namespace harness-monitoring \
--set grafana.adminPassword=admin_dev \
--set prometheus.prometheusSpec.retention=15d \
--set prometheus.prometheusSpec.storageSpec.volumeClaimTemplate.spec.resources.requests.storage=50Gi
RBAC配置
# 创建Harness系统ServiceAccount和权限
kubectl apply -f - << 'EOF'
apiVersion: v1
kind: ServiceAccount
metadata:
name: harness-agent
namespace: harness-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: harness-agent-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: harness-agent-role
subjects:
- kind: ServiceAccount
name: harness-agent
namespace: harness-system
EOF
3.4.2 核心服务实现
以下是Orchestrator核心逻辑的完整实现(包含状态机设计):
# orchestrator/core/orchestrator.py
"""
Harness Orchestrator 核心实现
负责整个部署生命周期的协调管理
"""
import asyncio
import uuid
from enum import Enum
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
import structlog
from datetime import datetime, timezone
from ..models.deployment import (
Deployment, DeploymentStatus, DeploymentStrategy,
DeploymentRequest, DeploymentResult
)
from ..services.risk_assessor import RiskAssessor
from ..services.strategy_selector import StrategySelector
from ..services.health_checker import HealthChecker
from ..services.executor import DeploymentExecutor
from ..services.monitor import DeploymentMonitor
from ..services.notifier import NotificationService
from ..repositories.deployment_repo import DeploymentRepository
from ..repositories.decision_repo import DecisionRepository
from .state_machine import DeploymentStateMachine, DeploymentEvent
logger = structlog.get_logger(__name__)
class OrchestratorError(Exception):
"""Orchestrator基础异常"""
pass
@dataclass
class OrchestratorConfig:
"""Orchestrator配置"""
max_concurrent_deployments: int = 10
risk_score_threshold_for_approval: float = 0.7
canary_observation_minutes: int = 5
rollback_timeout_seconds: int = 300
enable_auto_rollback: bool = True
dry_run_mode: bool = False
class DeploymentOrchestrator:
"""
部署编排器:自进化智能体的核心协调组件
职责:
1. 接收并验证部署请求
2. 协调风险评估
3. 选择部署策略
4. 管理部署状态机
5. 协调回滚决策
6. 记录决策供学习使用
"""
def __init__(
self,
config: OrchestratorConfig,
risk_assessor: RiskAssessor,
strategy_selector: StrategySelector,
health_checker: HealthChecker,
executor: DeploymentExecutor,
monitor: DeploymentMonitor,
notifier: NotificationService,
deployment_repo: DeploymentRepository,
decision_repo: DecisionRepository,
):
self.config = config
self.risk_assessor = risk_assessor
self.strategy_selector = strategy_selector
self.health_checker = health_checker
self.executor = executor
self.monitor = monitor
self.notifier = notifier
self.deployment_repo = deployment_repo
self.decision_repo = decision_repo
# 并发控制
self._semaphore = asyncio.Semaphore(config.max_concurrent_deployments)
self._active_deployments: Dict[str, asyncio.Task] = {}
self.log = logger.bind(component="orchestrator")
async def submit_deployment(
self,
request: DeploymentRequest
) -> Deployment:
"""
提交部署请求
这是外部调用的唯一入口,负责:
1. 创建部署记录
2. 异步启动部署流程
3. 返回部署对象(客户端通过轮询状态)
"""
self.log.info(
"Deployment request received",
service=request.service_name,
version=request.target_version,
environment=request.environment,
triggered_by=request.triggered_by
)
# 创建部署记录
deployment = await self.deployment_repo.create({
"id": str(uuid.uuid4()),
"service_name": request.service_name,
"target_version": request.target_version,
"environment": request.environment,
"triggered_by": request.triggered_by,
"trigger_type": request.trigger_type,
"status": DeploymentStatus.PENDING,
"created_at": datetime.now(timezone.utc)
})
# 异步执行部署流程(不阻塞调用者)
task = asyncio.create_task(
self._execute_deployment_pipeline(deployment)
)
self._active_deployments[deployment.id] = task
# 任务完成后清理
task.add_done_callback(
lambda t: self._active_deployments.pop(deployment.id, None)
)
return deployment
async def _execute_deployment_pipeline(
self,
deployment: Deployment
) -> DeploymentResult:
"""
部署流水线主流程
这是核心状态机的实现,按顺序执行每个阶段:
pending → risk_assessment → [awaiting_approval] → in_progress
→ verifying → completed/rolled_back
"""
async with self._semaphore:
try:
return await self._run_pipeline_steps(deployment)
except asyncio.CancelledError:
self.log.warning("Deployment cancelled", deployment_id=deployment.id)
await self._handle_cancellation(deployment)
raise
except Exception as e:
self.log.error(
"Deployment pipeline failed",
deployment_id=deployment.id,
error=str(e),
exc_info=True
)
await self._handle_pipeline_failure(deployment, e)
raise
async def _run_pipeline_steps(
self,
deployment: Deployment
) -> DeploymentResult:
"""执行完整的部署流水线步骤"""
sm = DeploymentStateMachine(deployment)
# ─────────────────────────────────────────────────────
# 步骤1:前置健康检查
# ─────────────────────────────────────────────────────
self.log.info("Running pre-deployment health checks",
deployment_id=deployment.id)
health_result = await self.health_checker.check_all(
service=deployment.service_name,
environment=deployment.environment
)
if not health_result.is_healthy:
await self._update_status(deployment, DeploymentStatus.FAILED,
reason=f"Pre-check failed: {health_result.reason}")
await self.notifier.notify_failure(deployment, "pre_check_failed", health_result)
return DeploymentResult.failure(deployment.id, "pre_check_failed")
# ─────────────────────────────────────────────────────
# 步骤2:风险评估
# ─────────────────────────────────────────────────────
await self._update_status(deployment, DeploymentStatus.RISK_ASSESSMENT)
sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
risk_report = await self.risk_assessor.assess(deployment)
self.log.info(
"Risk assessment completed",
deployment_id=deployment.id,
risk_score=risk_report.overall_score,
risk_factors=risk_report.risk_factors
)
# 记录风险评估决策
await self.decision_repo.record({
"deployment_id": deployment.id,
"decision_type": "risk_assessment",
"context_snapshot": deployment.to_dict(),
"action": f"risk_score:{risk_report.overall_score:.3f}",
"confidence": 1 - risk_report.overall_score,
"reasoning": f"Risk factors: {', '.join(risk_report.risk_factors[:3])}",
"path_used": "ml_model"
})
# ─────────────────────────────────────────────────────
# 步骤3:高风险场景请求人工审批
# ─────────────────────────────────────────────────────
if risk_report.overall_score >= self.config.risk_score_threshold_for_approval:
self.log.info(
"High risk deployment requires approval",
deployment_id=deployment.id,
risk_score=risk_report.overall_score
)
await self._update_status(deployment, DeploymentStatus.AWAITING_APPROVAL)
# 发送审批请求
approval_result = await self.notifier.request_approval(
deployment=deployment,
risk_report=risk_report,
timeout_seconds=3600 # 1小时超时
)
if not approval_result.approved:
await self._update_status(deployment, DeploymentStatus.CANCELLED,
reason="Rejected by reviewer")
return DeploymentResult.cancelled(deployment.id, approval_result.reviewer)
# ─────────────────────────────────────────────────────
# 步骤4:选择部署策略
# ─────────────────────────────────────────────────────
strategy = await self.strategy_selector.select(
deployment=deployment,
risk_report=risk_report
)
self.log.info(
"Deployment strategy selected",
deployment_id=deployment.id,
strategy=strategy.type,
config=strategy.config
)
# ─────────────────────────────────────────────────────
# 步骤5:执行部署
# ─────────────────────────────────────────────────────
await self._update_status(deployment, DeploymentStatus.IN_PROGRESS)
sm.transition(DeploymentEvent.START_DEPLOYMENT)
if self.config.dry_run_mode:
self.log.info("DRY RUN: Skipping actual deployment")
execution_result = MockExecutionResult(success=True)
else:
execution_result = await self.executor.execute(deployment, strategy)
if not execution_result.success:
self.log.error("Deployment execution failed",
deployment_id=deployment.id,
error=execution_result.error)
await self._trigger_rollback(deployment, reason=execution_result.error)
return DeploymentResult.failure(deployment.id, execution_result.error)
# ─────────────────────────────────────────────────────
# 步骤6:验证观察期
# ─────────────────────────────────────────────────────
await self._update_status(deployment, DeploymentStatus.VERIFYING)
sm.transition(DeploymentEvent.START_VERIFICATION)
verify_result = await self.monitor.observe_and_validate(
deployment=deployment,
duration_minutes=self.config.canary_observation_minutes,
thresholds=deployment.service_profile.slo_targets
)
if not verify_result.passed:
self.log.warning(
"Deployment verification failed",
deployment_id=deployment.id,
failed_checks=verify_result.failed_checks
)
if self.config.enable_auto_rollback:
await self._trigger_rollback(
deployment,
reason=f"Verification failed: {verify_result.summary}"
)
return DeploymentResult.rolled_back(deployment.id, verify_result.summary)
# ─────────────────────────────────────────────────────
# 步骤7:部署成功
# ─────────────────────────────────────────────────────
await self._update_status(deployment, DeploymentStatus.COMPLETED)
sm.transition(DeploymentEvent.COMPLETE)
# 发布成功通知
await self.notifier.notify_success(deployment, verify_result)
self.log.info(
"Deployment completed successfully",
deployment_id=deployment.id,
duration_seconds=deployment.duration_seconds
)
return DeploymentResult.success(deployment.id, verify_result.metrics)
async def _trigger_rollback(
self,
deployment: Deployment,
reason: str
) -> None:
"""触发回滚流程"""
self.log.warning(
"Triggering rollback",
deployment_id=deployment.id,
reason=reason
)
await self._update_status(deployment, DeploymentStatus.ROLLING_BACK)
try:
rollback_result = await self.executor.rollback(
deployment=deployment,
timeout_seconds=self.config.rollback_timeout_seconds
)
if rollback_result.success:
await self._update_status(deployment, DeploymentStatus.ROLLED_BACK,
reason=reason)
await self.notifier.notify_rollback(deployment, reason)
else:
# 回滚也失败了!这是严重问题
await self._update_status(deployment, DeploymentStatus.FAILED,
reason=f"Rollback also failed: {rollback_result.error}")
await self.notifier.notify_critical(
deployment,
f"CRITICAL: Rollback failed! Manual intervention required. "
f"Original error: {reason}. Rollback error: {rollback_result.error}"
)
except Exception as e:
self.log.critical(
"Rollback encountered exception",
deployment_id=deployment.id,
error=str(e),
exc_info=True
)
raise
async def _update_status(
self,
deployment: Deployment,
status: DeploymentStatus,
reason: Optional[str] = None
) -> None:
"""更新部署状态并持久化"""
deployment.status = status
deployment.updated_at = datetime.now(timezone.utc)
if reason:
deployment.status_reason = reason
await self.deployment_repo.update_status(
deployment_id=deployment.id,
status=status,
reason=reason
)
async def cancel_deployment(self, deployment_id: str, cancelled_by: str) -> bool:
"""取消正在进行的部署"""
task = self._active_deployments.get(deployment_id)
if task and not task.done():
task.cancel()
self.log.info("Deployment cancelled",
deployment_id=deployment_id,
cancelled_by=cancelled_by)
return True
return False
def get_active_deployments(self) -> List[str]:
"""获取当前活跃部署列表"""
return list(self._active_deployments.keys())
部署状态机实现
# orchestrator/core/state_machine.py
"""
部署状态机:管理部署生命周期的状态转换
使用有限状态机确保状态变更的合法性
"""
from enum import Enum, auto
from typing import Set, Dict, Tuple
import structlog
logger = structlog.get_logger(__name__)
class DeploymentState(Enum):
"""部署状态枚举"""
PENDING = "pending"
RISK_ASSESSMENT = "risk_assessment"
AWAITING_APPROVAL = "awaiting_approval"
IN_PROGRESS = "in_progress"
VERIFYING = "verifying"
ROLLING_BACK = "rolling_back"
COMPLETED = "completed"
ROLLED_BACK = "rolled_back"
FAILED = "failed"
CANCELLED = "cancelled"
class DeploymentEvent(Enum):
"""状态转换事件"""
START_RISK_ASSESSMENT = auto()
RISK_HIGH = auto() # 高风险,需要审批
RISK_ACCEPTABLE = auto() # 风险可接受,直接部署
APPROVE = auto() # 人工审批通过
REJECT = auto() # 人工审批拒绝
START_DEPLOYMENT = auto()
DEPLOYMENT_FAILED = auto()
START_VERIFICATION = auto()
VERIFICATION_PASSED = auto()
VERIFICATION_FAILED = auto()
COMPLETE = auto()
START_ROLLBACK = auto()
ROLLBACK_COMPLETED = auto()
ROLLBACK_FAILED = auto()
CANCEL = auto()
FATAL_ERROR = auto()
class InvalidStateTransitionError(Exception):
"""非法状态转换异常"""
pass
class DeploymentStateMachine:
"""
部署状态机
确保状态变更是合法的,防止出现不一致状态
"""
# 合法状态转换表:{(当前状态, 事件): 目标状态}
TRANSITIONS: Dict[Tuple[DeploymentState, DeploymentEvent], DeploymentState] = {
# 启动风险评估
(DeploymentState.PENDING, DeploymentEvent.START_RISK_ASSESSMENT):
DeploymentState.RISK_ASSESSMENT,
# 风险评估完成
(DeploymentState.RISK_ASSESSMENT, DeploymentEvent.RISK_HIGH):
DeploymentState.AWAITING_APPROVAL,
(DeploymentState.RISK_ASSESSMENT, DeploymentEvent.RISK_ACCEPTABLE):
DeploymentState.IN_PROGRESS,
# 审批处理
(DeploymentState.AWAITING_APPROVAL, DeploymentEvent.APPROVE):
DeploymentState.IN_PROGRESS,
(DeploymentState.AWAITING_APPROVAL, DeploymentEvent.REJECT):
DeploymentState.CANCELLED,
# 部署执行
(DeploymentState.IN_PROGRESS, DeploymentEvent.START_VERIFICATION):
DeploymentState.VERIFYING,
(DeploymentState.IN_PROGRESS, DeploymentEvent.DEPLOYMENT_FAILED):
DeploymentState.ROLLING_BACK,
# 验证观察
(DeploymentState.VERIFYING, DeploymentEvent.VERIFICATION_PASSED):
DeploymentState.COMPLETED,
(DeploymentState.VERIFYING, DeploymentEvent.VERIFICATION_FAILED):
DeploymentState.ROLLING_BACK,
(DeploymentState.VERIFYING, DeploymentEvent.COMPLETE):
DeploymentState.COMPLETED,
# 回滚流程
(DeploymentState.ROLLING_BACK, DeploymentEvent.ROLLBACK_COMPLETED):
DeploymentState.ROLLED_BACK,
(DeploymentState.ROLLING_BACK, DeploymentEvent.ROLLBACK_FAILED):
DeploymentState.FAILED,
# 取消(可在多个阶段触发)
(DeploymentState.PENDING, DeploymentEvent.CANCEL):
DeploymentState.CANCELLED,
(DeploymentState.RISK_ASSESSMENT, DeploymentEvent.CANCEL):
DeploymentState.CANCELLED,
(DeploymentState.AWAITING_APPROVAL, DeploymentEvent.CANCEL):
DeploymentState.CANCELLED,
(DeploymentState.IN_PROGRESS, DeploymentEvent.CANCEL):
DeploymentState.ROLLING_BACK,
# 致命错误
(DeploymentState.RISK_ASSESSMENT, DeploymentEvent.FATAL_ERROR):
DeploymentState.FAILED,
(DeploymentState.IN_PROGRESS, DeploymentEvent.FATAL_ERROR):
DeploymentState.FAILED,
(DeploymentState.VERIFYING, DeploymentEvent.FATAL_ERROR):
DeploymentState.FAILED,
}
def __init__(self, deployment):
self.deployment = deployment
self.current_state = DeploymentState(deployment.status)
self.history: list = []
self.log = logger.bind(deployment_id=deployment.id)
def transition(self, event: DeploymentEvent) -> DeploymentState:
"""
执行状态转换
Args:
event: 触发事件
Returns:
新状态
Raises:
InvalidStateTransitionError: 如果状态转换不合法
"""
transition_key = (self.current_state, event)
if transition_key not in self.TRANSITIONS:
raise InvalidStateTransitionError(
f"Invalid transition: {self.current_state.value} + {event.name} "
f"has no defined target state"
)
new_state = self.TRANSITIONS[transition_key]
self.log.info(
"State transition",
from_state=self.current_state.value,
event=event.name,
to_state=new_state.value
)
# 记录转换历史
self.history.append({
"from": self.current_state.value,
"event": event.name,
"to": new_state.value,
"timestamp": __import__('time').time()
})
self.current_state = new_state
return new_state
def can_transition(self, event: DeploymentEvent) -> bool:
"""检查是否可以执行指定事件的状态转换"""
return (self.current_state, event) in self.TRANSITIONS
def available_events(self) -> Set[DeploymentEvent]:
"""获取当前状态下可用的事件列表"""
return {
event for (state, event), _ in self.TRANSITIONS.items()
if state == self.current_state
}
3.4.3 消息总线集成
# orchestrator/messaging/kafka_client.py
"""
Kafka消息总线客户端
支持生产者和消费者两种模式
"""
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.errors import KafkaConnectionError
import json
import asyncio
from typing import Callable, Any, Optional
import structlog
logger = structlog.get_logger(__name__)
class HarnessMessageBus:
"""
Harness平台消息总线
封装Kafka的生产/消费逻辑,提供高级别的事件发布/订阅接口
"""
# Topic常量
TOPIC_DEPLOYMENT_EVENTS = "harness.deployments.events"
TOPIC_DECISIONS = "harness.decisions.made"
TOPIC_ALERTS = "harness.alerts.triggered"
TOPIC_ROLLBACKS = "harness.rollback.triggered"
TOPIC_LEARNING = "harness.learning.experiences"
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self._producer: Optional[AIOKafkaProducer] = None
self._consumers: dict = {}
self._handlers: dict = {}
self.log = logger.bind(component="message_bus")
async def connect(self):
"""建立Kafka连接"""
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if isinstance(k, str) else k,
# 可靠性配置
acks='all', # 所有副本确认
enable_idempotence=True, # 幂等生产者
max_in_flight_requests_per_connection=5,
compression_type='gzip', # 压缩减少网络开销
# 重试配置
retries=10,
retry_backoff_ms=200,
)
await self._producer.start()
self.log.info("Kafka producer connected", servers=self.bootstrap_servers)
async def disconnect(self):
"""断开连接"""
if self._producer:
await self._producer.stop()
for consumer in self._consumers.values():
await consumer.stop()
self.log.info("Kafka connections closed")
async def publish_deployment_event(
self,
deployment_id: str,
event_type: str,
payload: dict
) -> None:
"""发布部署事件"""
message = {
"event_type": event_type,
"deployment_id": deployment_id,
"payload": payload,
"timestamp": __import__('time').time(),
"source": "orchestrator"
}
await self._publish(
topic=self.TOPIC_DEPLOYMENT_EVENTS,
key=deployment_id, # 按deployment_id分区,保证同一部署的事件有序
message=message
)
async def publish_learning_experience(
self,
deployment_id: str,
state_before: dict,
action: str,
reward: float,
state_after: dict,
metadata: dict = None
) -> None:
"""发布学习经验(供进化层消费)"""
message = {
"deployment_id": deployment_id,
"experience": {
"state": state_before,
"action": action,
"reward": reward,
"next_state": state_after,
},
"metadata": metadata or {},
"timestamp": __import__('time').time()
}
await self._publish(
topic=self.TOPIC_LEARNING,
key=deployment_id,
message=message
)
async def subscribe(
self,
topic: str,
handler: Callable,
group_id: str,
from_beginning: bool = False
) -> None:
"""订阅Topic并注册处理器"""
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode()),
# 消费者配置
auto_offset_reset='earliest' if from_beginning else 'latest',
enable_auto_commit=False, # 手动提交,确保处理后才确认
max_poll_interval_ms=300000, # 5分钟处理超时
)
await consumer.start()
self._consumers[f"{topic}:{group_id}"] = consumer
# 启动消费循环
asyncio.create_task(
self._consume_loop(consumer, handler, topic)
)
self.log.info("Subscribed to topic", topic=topic, group_id=group_id)
async def _consume_loop(
self,
consumer: AIOKafkaConsumer,
handler: Callable,
topic: str
) -> None:
"""消费循环"""
async for msg in consumer:
try:
await handler(msg.value)
await consumer.commit() # 处理成功后提交offset
except Exception as e:
self.log.error(
"Message handling failed",
topic=topic,
offset=msg.offset,
error=str(e),
exc_info=True
)
# 记录到死信队列,不影响其他消息的处理
await self._send_to_dlq(msg, str(e))
async def _publish(self, topic: str, key: str, message: dict) -> None:
"""内部发布方法"""
if not self._producer:
raise RuntimeError("Message bus not connected")
await self._producer.send_and_wait(
topic=topic,
key=key,
value=message
)
3.4.4 基础感知能力
# collector/perception/metrics_collector.py
"""
指标采集器:从Prometheus采集运行时指标
"""
from prometheus_client.parser import text_string_to_metric_families
import aiohttp
import asyncio
from typing import Dict, List, Optional
import structlog
from dataclasses import dataclass
logger = structlog.get_logger(__name__)
@dataclass
class ServiceMetrics:
"""服务当前指标快照"""
service_name: str
timestamp: float
# RED指标
request_rate: float = 0.0 # 请求/秒
error_rate: float = 0.0 # 错误率 (0-1)
p50_latency_ms: float = 0.0
p95_latency_ms: float = 0.0
p99_latency_ms: float = 0.0
# 资源指标
cpu_usage_pct: float = 0.0
memory_usage_pct: float = 0.0
# 可用性
availability: float = 1.0
# 原始数据(供特征工程使用)
raw_metrics: Dict = None
class PrometheusCollector:
"""
Prometheus指标采集器
使用PromQL查询关键指标,构建服务健康状态
"""
def __init__(self, prometheus_url: str, timeout_seconds: int = 10):
self.prometheus_url = prometheus_url.rstrip('/')
self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
self.log = logger.bind(component="prometheus_collector")
async def collect_service_metrics(
self,
service_name: str,
time_window_minutes: int = 5
) -> ServiceMetrics:
"""采集指定服务的完整指标"""
async with aiohttp.ClientSession(timeout=self.timeout) as session:
# 并行查询所有指标(减少采集延迟)
results = await asyncio.gather(
self._query(session, f"""
sum(rate(http_requests_total{{service="{service_name}"}}[{time_window_minutes}m]))
"""),
self._query(session, f"""
sum(rate(http_requests_total{{service="{service_name}",status=~"5.."}}[{time_window_minutes}m]))
/ sum(rate(http_requests_total{{service="{service_name}"}}[{time_window_minutes}m]))
"""),
self._query(session, f"""
histogram_quantile(0.99, sum(rate(
http_request_duration_seconds_bucket{{service="{service_name}"}}[{time_window_minutes}m]
)) by (le)) * 1000
"""),
self._query(session, f"""
avg(container_cpu_usage_seconds_total{{pod=~"{service_name}-.*"}})
"""),
return_exceptions=True
)
request_rate, error_rate, p99_latency, cpu_usage = results
return ServiceMetrics(
service_name=service_name,
timestamp=__import__('time').time(),
request_rate=self._safe_float(request_rate),
error_rate=self._safe_float(error_rate),
p99_latency_ms=self._safe_float(p99_latency),
cpu_usage_pct=self._safe_float(cpu_usage) * 100,
)
async def _query(
self,
session: aiohttp.ClientSession,
promql: str
) -> Optional[float]:
"""执行PromQL查询"""
try:
async with session.get(
f"{self.prometheus_url}/api/v1/query",
params={"query": promql.strip()}
) as resp:
if resp.status != 200:
return None
data = await resp.json()
if data["status"] == "success" and data["data"]["result"]:
return float(data["data"]["result"][0]["value"][1])
return None
except Exception as e:
self.log.warning("Prometheus query failed",
error=str(e), query=promql[:100])
return None
def _safe_float(self, value) -> float:
"""安全转换为float,异常时返回0.0"""
if isinstance(value, Exception) or value is None:
return 0.0
try:
return float(value)
except (TypeError, ValueError):
return 0.0
3.4.5 简单决策引擎
# decision/rule_engine.py
"""
规则引擎:基于DSL定义的规则进行快速决策
"""
import yaml
from typing import Any, Callable, Dict, List, Optional
from dataclasses import dataclass, field
import operator
import structlog
logger = structlog.get_logger(__name__)
@dataclass
class Rule:
"""规则定义"""
name: str
description: str
priority: int # 数字越小优先级越高
# 条件(AND关系)
conditions: List[dict]
# 触发动作
action: str
action_params: dict = field(default_factory=dict)
# 元数据
author: str = "system"
is_active: bool = True
tags: List[str] = field(default_factory=list)
@dataclass
class RuleMatchResult:
"""规则匹配结果"""
matched: bool
triggered_rule: Optional[Rule] = None
confidence: float = 0.0
action: Optional[str] = None
action_params: dict = field(default_factory=dict)
reasoning: str = ""
class ConditionEvaluator:
"""
条件表达式求值器
支持的操作符:
- gt, gte, lt, lte: 数值比较
- eq, neq: 相等判断
- contains, not_contains: 列表/字符串包含
- exists, not_exists: 字段存在性
- between: 范围检查
"""
OPERATORS: Dict[str, Callable] = {
"gt": operator.gt,
"gte": operator.ge,
"lt": operator.lt,
"lte": operator.le,
"eq": operator.eq,
"neq": operator.ne,
"contains": lambda a, b: b in a,
"not_contains": lambda a, b: b not in a,
"exists": lambda a, _: a is not None,
"not_exists": lambda a, _: a is None,
"between": lambda a, b: b[0] <= a <= b[1],
}
def evaluate(self, condition: dict, context: dict) -> bool:
"""
评估单个条件
条件格式:
{
"field": "metrics.error_rate", # 支持点号路径
"op": "gt",
"value": 0.05
}
"""
field_path = condition["field"]
op = condition["op"]
expected = condition.get("value")
# 支持点号路径访问(如 metrics.error_rate)
actual = self._get_nested(context, field_path)
if op not in self.OPERATORS:
raise ValueError(f"Unknown operator: {op}")
try:
return self.OPERATORS[op](actual, expected)
except (TypeError, KeyError):
return False
def _get_nested(self, data: dict, path: str) -> Any:
"""点号路径访问嵌套字典"""
keys = path.split(".")
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return None
return current
class RuleEngine:
"""
规则引擎:加载YAML规则文件并进行条件匹配
"""
def __init__(self):
self.rules: List[Rule] = []
self.evaluator = ConditionEvaluator()
self.log = logger.bind(component="rule_engine")
def load_from_yaml(self, yaml_content: str) -> int:
"""从YAML加载规则集,返回加载的规则数量"""
data = yaml.safe_load(yaml_content)
loaded_count = 0
for rule_data in data.get("rules", []):
try:
rule = Rule(
name=rule_data["name"],
description=rule_data.get("description", ""),
priority=rule_data.get("priority", 100),
conditions=rule_data["conditions"],
action=rule_data["action"],
action_params=rule_data.get("action_params", {}),
author=rule_data.get("author", "system"),
is_active=rule_data.get("is_active", True),
tags=rule_data.get("tags", [])
)
self.rules.append(rule)
loaded_count += 1
except KeyError as e:
self.log.warning(f"Invalid rule definition, missing field: {e}",
rule_name=rule_data.get("name", "unknown"))
# 按优先级排序
self.rules.sort(key=lambda r: r.priority)
self.log.info(f"Loaded {loaded_count} rules")
return loaded_count
def match(self, context: dict) -> RuleMatchResult:
"""
在给定上下文中匹配规则
返回第一个所有条件都满足的规则(按优先级排序)
"""
active_rules = [r for r in self.rules if r.is_active]
for rule in active_rules:
if self._evaluate_conditions(rule.conditions, context):
self.log.info(
"Rule matched",
rule_name=rule.name,
action=rule.action
)
return RuleMatchResult(
matched=True,
triggered_rule=rule,
confidence=0.95, # 规则匹配是确定性的
action=rule.action,
action_params=rule.action_params,
reasoning=f"Rule '{rule.name}' triggered: {rule.description}"
)
return RuleMatchResult(matched=False, reasoning="No rule matched")
def _evaluate_conditions(
self,
conditions: List[dict],
context: dict
) -> bool:
"""评估规则的所有条件(AND关系)"""
return all(
self.evaluator.evaluate(condition, context)
for condition in conditions
)
# 示例规则YAML配置
EXAMPLE_RULES_YAML = """
rules:
# P0:生产故障自动回滚(最高优先级)
- name: auto_rollback_on_high_error_rate
description: "错误率超过5%时自动回滚"
priority: 1
conditions:
- field: "metrics.error_rate"
op: "gt"
value: 0.05
- field: "deployment.status"
op: "eq"
value: "in_progress"
- field: "deployment.environment"
op: "eq"
value: "production"
action: "rollback"
action_params:
reason: "error_rate_exceeded_threshold"
urgency: "immediate"
tags: ["production", "safety", "auto_rollback"]
# P1:P99延迟恶化回滚
- name: rollback_on_latency_degradation
description: "P99延迟超过1秒,触发回滚"
priority: 2
conditions:
- field: "metrics.p99_latency_ms"
op: "gt"
value: 1000
- field: "deployment.status"
op: "eq"
value: "verifying"
action: "rollback"
action_params:
reason: "latency_degradation"
urgency: "high"
# P2:Pod启动失败告警
- name: alert_on_pod_crash_loop
description: "Pod处于CrashLoopBackOff状态超过3分钟"
priority: 10
conditions:
- field: "k8s.pod_crash_loop_count"
op: "gt"
value: 3
- field: "k8s.pod_crash_duration_minutes"
op: "gt"
value: 3
action: "alert_oncall"
action_params:
severity: "P1"
channel: "deployment-alerts"
# P3:高峰期阻止部署
- name: block_deployment_during_peak_hours
description: "工作日上午10点到下午2点(高峰期)不允许部署"
priority: 20
conditions:
- field: "context.is_peak_hours"
op: "eq"
value: true
- field: "deployment.environment"
op: "eq"
value: "production"
- field: "deployment.trigger_type"
op: "neq"
value: "hotfix" # 热修复例外
action: "block_and_schedule"
action_params:
schedule_for: "off_peak"
reason: "peak_hours_restriction"
# P4:自动扩容
- name: scale_up_on_high_cpu
description: "CPU使用率超过80%,触发扩容"
priority: 30
conditions:
- field: "metrics.cpu_usage_pct"
op: "gt"
value: 80
- field: "k8s.current_replicas"
op: "lt"
value: 10 # 不超过最大副本数
action: "scale_up"
action_params:
increment: 2
max_replicas: 10
"""
3.5 真实案例:大型互联网公司的Agent架构实践
3.5.1 案例一:Airbnb的Intelligent Delivery系统
背景与挑战
Airbnb在2019年面临严峻的工程挑战:超过300个微服务,每天需要处理1000+次部署请求,但人工驾驭的发布流程已经成为工程效率的最大瓶颈。
具体挑战数据:
- 平均每次部署需要工程师手动监控45分钟
- 每月有12-18起因为人为失误导致的回滚事件
- 高级工程师的30%时间用于发布相关的人工干预
- 节假日(如感恩节)冻结期前的"发布冲刺"造成大量技术债
架构演进过程
Airbnb的智能发布系统经历了三个阶段的演进:
Phase 1(2019-2020):规则化自动化
第一阶段的目标是将最常见的人工决策场景自动化。核心是建立基于规则的安全门控系统:
Airbnb Phase 1 架构:
┌─────────────────────────────────────────────────────────┐
│ 部署触发(CI/CD) │
└────────────────────────┬────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Safety Gates(安全门控) │
│ • 单元测试通过率 > 98% │
│ • 集成测试通过 │
│ • 代码覆盖率不降低 │
│ • 无已知安全漏洞(依赖扫描) │
└────────────────────────┬────────────────────────────────┘
│ 通过
▼
┌─────────────────────────────────────────────────────────┐
│ Canary发布(基于规则) │
│ • 5% → 25% → 50% → 100% │
│ • 每步观察10分钟 │
│ • 自动监控错误率和延迟 │
│ • 阈值超限自动回滚 │
└─────────────────────────────────────────────────────────┘
效果:
- 部署工程师每次部署时间:45分钟 → 15分钟(减少67%)
- 自动化率:40%(低风险部署全自动,高风险仍需人工)
Phase 2(2020-2021):风险评分模型
第二阶段引入了机器学习风险评分,这是Airbnb Intelligent Delivery系统的核心创新:
核心思路:能否预测"这次部署会出问题吗?"
数据来源(特征工程):
# Airbnb风险评分模型的核心特征(公开资料整理)
RISK_FEATURES = {
# 代码维度
"code_change_size": "代码变更行数(经验法则:越大风险越高)",
"file_change_count": "修改的文件数量",
"test_coverage_delta": "测试覆盖率变化(降低=高风险)",
"previous_deployment_failure": "最近3次部署是否有失败",
"days_since_last_deploy": "距离上次成功部署的天数(太久=工程师不熟悉)",
# 服务维度
"service_criticality": "服务重要性等级",
"service_age_days": "服务创建天数(新服务更容易出问题)",
"dependency_count": "直接依赖的服务数量",
"historical_failure_rate": "该服务近30天部署失败率",
# 环境维度
"hour_of_day": "发布时间(0-23)",
"day_of_week": "周几(周五下午是高风险窗口)",
"active_incidents": "当前是否有活跃事故",
"traffic_multiplier": "当前流量相对于正常水平的倍数",
# 工程师维度(匿名化)
"deployer_experience_score": "发布者的历史成功率",
"time_since_last_deploy": "工程师上次操作到现在的时间",
}
模型架构选择:
Airbnb选择了**Gradient Boosting(LightGBM)**而非深度学习,原因是:
- 可解释性:能解释为什么这次部署风险高
- 训练数据量不足以支撑深度学习(约10万条历史部署)
- 低延迟要求(评分必须在200ms内完成)
# 简化版的Airbnb风险评分模型实现
import lightgbm as lgb
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_recall_curve
class DeploymentRiskModel:
"""
基于LightGBM的部署风险评分模型
目标:预测"这次部署在24小时内发生回滚的概率"
"""
def __init__(self):
self.model = None
self.feature_names = list(RISK_FEATURES.keys())
self.threshold = 0.35 # 超过35%认为高风险
def train(self, historical_deployments: list):
"""从历史部署数据训练模型"""
# 提取特征和标签
X, y = [], []
for dep in historical_deployments:
features = self._extract_features(dep)
label = 1 if dep.outcome in ['rolled_back', 'failed'] else 0
X.append(features)
y.append(label)
X = np.array(X)
y = np.array(y)
# 数据集分割
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# LightGBM参数
params = {
'objective': 'binary',
'metric': ['auc', 'binary_logloss'],
'learning_rate': 0.05,
'num_leaves': 31,
'min_child_samples': 20,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
# 处理类别不平衡(大多数部署是成功的)
'scale_pos_weight': (y_train == 0).sum() / (y_train == 1).sum(),
'verbose': -1
}
# 训练
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val)
self.model = lgb.train(
params,
train_data,
num_boost_round=500,
valid_sets=[train_data, val_data],
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)]
)
# 评估
val_pred = self.model.predict(X_val)
auc = roc_auc_score(y_val, val_pred)
print(f"Validation AUC: {auc:.4f}")
# 找到最优阈值
precision, recall, thresholds = precision_recall_curve(y_val, val_pred)
f1_scores = 2 * precision * recall / (precision + recall + 1e-10)
self.threshold = thresholds[np.argmax(f1_scores)]
return {"auc": auc, "optimal_threshold": self.threshold}
def predict_risk(self, deployment_features: dict) -> dict:
"""
预测单次部署的风险
Returns:
{
"risk_score": 0.0-1.0,
"risk_level": "low|medium|high|critical",
"top_risk_factors": [{"factor": "...", "importance": 0.xx}, ...],
"recommendation": "proceed|review|block"
}
"""
features = self._extract_features(deployment_features)
risk_score = float(self.model.predict([features])[0])
# 计算特征重要性(用于解释)
importance = self.model.feature_importance(importance_type='gain')
top_factors = sorted(
zip(self.feature_names, features, importance),
key=lambda x: x[2],
reverse=True
)[:5]
return {
"risk_score": risk_score,
"risk_level": self._categorize_risk(risk_score),
"top_risk_factors": [
{"factor": f, "value": v, "importance": i}
for f, v, i in top_factors
],
"recommendation": self._get_recommendation(risk_score)
}
def _categorize_risk(self, score: float) -> str:
if score < 0.2: return "low"
if score < 0.4: return "medium"
if score < 0.7: return "high"
return "critical"
def _get_recommendation(self, score: float) -> str:
if score < self.threshold: return "proceed"
if score < 0.7: return "review"
return "block"
效果数据
Phase 2实施后Airbnb的提升数据:
- 故障率:降低60%(由风险模型阻止了大量高风险部署)
- MTTR(平均恢复时间):从45分钟降至9分钟(减少80%)
- 自动化率:从40%提升到85%
- 工程师满意度:从3.2/5提升到4.1/5
Phase 3(2021-2022):LLM增强决策
第三阶段引入了大语言模型处理边缘案例和生成人类可读的决策解释:
关键应用场景:
1. 异常根因分析:当指标异常时,LLM分析日志和事件生成根因假设
2. 部署说明理解:解析工程师写的部署说明,提取风险因素
3. 历史事故检索:语义搜索相似历史事故,提供参考案例
4. 决策解释生成:为每个自动化决策生成自然语言解释报告
示例LLM决策报告:
---
"基于对此次部署的分析:
风险等级:HIGH (0.73)
主要风险因素:
1. 代码变更规模较大(+1847行),增加了引入回归的概率
2. payment-service是关键服务,历史上3/20次部署发生过故障
3. 当前时间为工作日下午6点,流量是平时的1.4倍
4. 5个服务依赖payment-service,故障影响范围大
建议:
请将初始Canary流量降低到1%(而非默认5%),并将观察时间从5分钟
延长到15分钟,重点关注支付成功率指标。
如需继续,请点击「确认高风险部署」,或选择「推迟到低峰期」。"
---
3.5.2 案例二:Shopify的智能发布平台
大促场景的特殊挑战
Shopify的工程挑战在于极端的流量波峰。黑色星期五期间,流量峰值可达正常的10倍。在这种环境下,部署的每一个决策都牵一发而动全身。
Shopify发布了名为Shipit的智能发布平台(部分开源),核心设计理念是:
“把每次部署都当成潜在的生产事故来预防,而不是事后处理。”
预测性扩容:超前感知流量
class PredictiveScalingAgent:
"""
预测性扩容Agent
核心:在流量到达之前提前扩容,而非等到CPU告警再扩容
响应时间差:Pod启动通常需要60-120秒,如果等CPU>80%才扩容,
前2分钟的流量会被压垮
"""
def __init__(
self,
traffic_forecaster: TrafficForecaster,
kubernetes_client: K8sClient
):
self.forecaster = traffic_forecaster
self.k8s = kubernetes_client
# 安全缓冲:比预测流量多准备20%的容量
self.safety_buffer = 1.2
# 提前量:提前多少分钟开始扩容
self.lead_time_minutes = 3
async def run_prediction_loop(self):
"""持续运行的预测扩容循环"""
while True:
try:
# 预测未来lead_time_minutes后的流量
forecast = await self.forecaster.predict(
horizon_minutes=self.lead_time_minutes
)
for service, predicted_rps in forecast.items():
# 当前实际处理能力
current_capacity = await self._get_current_capacity(service)
# 需要的目标容量(含安全缓冲)
required_capacity = predicted_rps * self.safety_buffer
if required_capacity > current_capacity * 0.8:
# 预测流量将超过当前容量的80%,需要扩容
target_replicas = self._calculate_replicas(
required_capacity,
service
)
await self.k8s.scale_deployment(
service=service,
replicas=target_replicas,
reason=f"Predictive scaling: expected {predicted_rps:.0f} RPS in {self.lead_time_minutes}m"
)
except Exception as e:
logger.error("Prediction loop error", error=str(e))
await asyncio.sleep(60) # 每分钟检查一次
async def _get_current_capacity(self, service: str) -> float:
"""计算当前服务的最大处理能力(RPS)"""
deployment = await self.k8s.get_deployment(service)
replicas = deployment.spec.replicas
# 每个Pod的吞吐量基准(从历史数据学习)
throughput_per_pod = await self._get_pod_throughput_baseline(service)
return replicas * throughput_per_pod
实时回滚决策的实现
Shopify的一个关键创新是"智能回滚触发器",它能区分部署导致的问题和环境噪声:
class SmartRollbackDecisionEngine:
"""
智能回滚决策引擎
核心问题:我们如何知道指标恶化是因为新部署,还是因为:
- 外部依赖(数据库、第三方API)出问题
- 季节性流量波动
- 其他服务的问题传播过来
Shopify的解法:因果归因分析(Causal Attribution)
"""
async def should_rollback(
self,
deployment: Deployment,
current_metrics: ServiceMetrics
) -> RollbackDecision:
# 1. 统计显著性检验:与基准对比是否显著恶化
baseline = await self._get_baseline_metrics(
service=deployment.service_name,
lookback_hours=24
)
degradation = self._compute_degradation(current_metrics, baseline)
if not degradation.is_statistically_significant():
return RollbackDecision(
should_rollback=False,
confidence=0.9,
reason="Metrics within normal variance"
)
# 2. 因果归因:恶化是否源自本次部署?
causal_score = await self._compute_causal_attribution(
deployment=deployment,
degradation=degradation
)
# 3. 外部因素检查
external_factors = await self._check_external_factors(
deployment.service_name
)
# 综合判断
if causal_score > 0.7 and not external_factors.has_issues:
return RollbackDecision(
should_rollback=True,
confidence=causal_score,
reason=f"Deployment likely caused degradation (causal score: {causal_score:.2f})"
)
elif external_factors.has_issues:
return RollbackDecision(
should_rollback=False,
confidence=0.6,
reason=f"External factors may explain degradation: {external_factors.description}"
)
else:
return RollbackDecision(
should_rollback=None, # 不确定,请求人工判断
confidence=0.3,
reason="Ambiguous signal, requesting human review"
)
async def _compute_causal_attribution(
self,
deployment: Deployment,
degradation: DegradationReport
) -> float:
"""
因果归因评分(0-1,越高表示部署越可能是原因)
核心逻辑:
1. 时间相关性:恶化开始时间是否与部署时间高度相关?
2. 服务局部性:只有这个服务恶化,还是系统性问题?
3. 变更历史:这类变更历史上是否经常导致此类问题?
"""
# 时间相关性分析
degradation_start = degradation.start_timestamp
deployment_complete = deployment.completed_at.timestamp()
time_correlation = max(0, 1 - abs(degradation_start - deployment_complete) / 300)
# 5分钟内的时间相关性 = 1,超过5分钟线性衰减
# 局部性分析(其他服务是否正常)
peer_health = await self._check_peer_services(deployment.service_name)
locality_score = 1.0 if peer_health.all_healthy else 0.3
# 历史模式匹配
similar_incidents = await self._find_similar_historical_incidents(
service=deployment.service_name,
change_type=deployment.change_classification
)
history_score = min(1.0, len(similar_incidents) / 3)
# 加权综合
causal_score = (
time_correlation * 0.5 +
locality_score * 0.3 +
history_score * 0.2
)
return causal_score
3.5.3 案例三:Stripe的支付系统零停机发布
金融级可靠性的架构要求
Stripe的挑战是最严苛的:支付系统不允许任何停机,每秒处理数十万笔交易,任何配置错误都可能造成直接的经济损失。
Stripe的发布系统有两个与众不同的特点:
特点一:形式化验证在关键路径的应用
Stripe使用TLA+对关键状态机进行形式化验证,确保在理论层面不存在死锁和竞态条件:
Stripe的TLA+验证的核心场景:
1. 滚动部署过程中的并发请求处理
2. 多区域切换时的一致性保证
3. 数据库迁移的原子性
形式化验证的价值:
- 在代码运行之前,用数学方法证明逻辑正确性
- 发现人工代码审查无法发现的边缘案例
- 为生产事故调查提供精确的因果分析工具
特点二:多区域协调发布
class MultiRegionDeploymentCoordinator:
"""
多区域协调发布器
Stripe的全球部署策略:
1. 先在最小流量区域(如新加坡)部署
2. 观察30分钟,验证核心指标
3. 逐步扩展到欧洲、美国东部、美国西部
4. 全程保持回滚能力
关键约束:
- 任何时刻最多两个区域处于不同版本(避免API兼容性问题)
- 区域间版本差异不超过1个版本
"""
DEPLOYMENT_ORDER = [
{"region": "ap-southeast-1", "weight": 0.05}, # 新加坡:5%流量
{"region": "eu-west-1", "weight": 0.25}, # 爱尔兰:25%
{"region": "us-east-1", "weight": 0.45}, # 弗吉尼亚:45%
{"region": "us-west-2", "weight": 0.25}, # 俄勒冈:25%
]
async def deploy_globally(self, release: ReleaseBundle) -> GlobalDeployResult:
"""
全球渐进式部署
Args:
release: 包含所有区域的部署包(预构建好的不可变制品)
"""
deployed_regions = []
for region_config in self.DEPLOYMENT_ORDER:
region = region_config["region"]
logger.info(f"Starting deployment to {region}")
# 1. 部署到当前区域
result = await self._deploy_to_region(region, release)
if not result.success:
# 立即回滚所有已部署的区域
logger.error(f"Deployment failed in {region}, initiating global rollback")
await self._global_rollback(deployed_regions, release)
return GlobalDeployResult.failure(region, result.error)
# 2. 区域健康验证
health = await self._verify_region_health(
region=region,
duration_minutes=30, # 30分钟观察期
critical_metrics={
"payment_success_rate": ">= 0.9995", # 99.95%成功率
"api_error_rate": "< 0.0001", # 0.01%错误率
"p99_latency_ms": "< 300", # P99 < 300ms
}
)
if not health.passed:
logger.warning(f"Health check failed in {region}: {health.reason}")
await self._global_rollback(deployed_regions, release)
return GlobalDeployResult.failure(region, health.reason)
deployed_regions.append(region)
logger.info(f"Successfully deployed to {region}, "
f"proceeding to next region ({len(deployed_regions)}/{len(self.DEPLOYMENT_ORDER)})")
return GlobalDeployResult.success(deployed_regions)
async def _verify_region_health(
self,
region: str,
duration_minutes: int,
critical_metrics: dict
) -> RegionHealthResult:
"""
区域健康持续验证
每分钟采集一次指标,连续duration_minutes分钟都满足阈值才算通过
发现任何违规立即返回失败
"""
violations = []
for minute in range(duration_minutes):
await asyncio.sleep(60)
metrics = await self.metrics_client.get_region_metrics(region)
for metric, threshold_expr in critical_metrics.items():
value = getattr(metrics, metric, None)
if value is None:
continue
if not self._evaluate_threshold(value, threshold_expr):
violations.append({
"minute": minute + 1,
"metric": metric,
"value": value,
"threshold": threshold_expr
})
# 对于支付核心指标,一次违规立即失败
if metric in ["payment_success_rate", "api_error_rate"]:
return RegionHealthResult(
passed=False,
reason=f"Critical metric violation at minute {minute+1}: "
f"{metric}={value} (threshold: {threshold_expr})",
violations=violations
)
return RegionHealthResult(
passed=len(violations) == 0,
reason="All metrics within thresholds" if not violations else f"{len(violations)} violations detected",
violations=violations
)
SLA保证机制
Stripe对外承诺99.9999%的可用性(“六个九”)。其实现依赖于:
六个九可用性的数学含义:
每年允许停机时间 = 365天 × 24小时 × 3600秒 × 0.000001 = 31.5秒
保证机制(按重要性排序):
1. 多区域Active-Active架构:任何单个区域故障不影响整体
2. 渐进式发布:故障影响范围最大化限制
3. 自动回滚:P99故障恢复时间 < 2分钟
4. 熔断器:防止级联故障
5. 幂等API设计:请求可安全重试
6. 数据库读写分离+多副本:数据层高可用
3.6 实战:完整Agent骨架代码
3.6.1 项目结构设计
harness-agent/
├── README.md
├── pyproject.toml # 依赖管理(Poetry)
├── Makefile # 开发命令
├── docker-compose.dev.yml # 本地开发环境
│
├── src/
│ ├── agents/ # Agent核心实现
│ │ ├── __init__.py
│ │ ├── base_agent.py # BaseAgent基类
│ │ ├── orchestrator_agent.py
│ │ ├── risk_agent.py
│ │ ├── deployment_agent.py
│ │ ├── rollback_agent.py
│ │ └── learning_agent.py
│ │
│ ├── modules/ # 功能模块
│ │ ├── perception/
│ │ │ ├── __init__.py
│ │ │ ├── metrics_collector.py
│ │ │ ├── log_collector.py
│ │ │ └── event_collector.py
│ │ │
│ │ ├── cognition/
│ │ │ ├── __init__.py
│ │ │ ├── memory_manager.py
│ │ │ ├── knowledge_graph.py
│ │ │ └── context_builder.py
│ │ │
│ │ ├── decision/
│ │ │ ├── __init__.py
│ │ │ ├── rule_engine.py
│ │ │ ├── ml_model.py
│ │ │ ├── llm_reasoner.py
│ │ │ └── decision_router.py
│ │ │
│ │ ├── execution/
│ │ │ ├── __init__.py
│ │ │ ├── k8s_executor.py
│ │ │ ├── istio_executor.py
│ │ │ └── idempotent_executor.py
│ │ │
│ │ └── evolution/
│ │ ├── __init__.py
│ │ ├── online_learner.py
│ │ ├── offline_trainer.py
│ │ └── ab_tester.py
│ │
│ ├── models/ # 数据模型
│ │ ├── deployment.py
│ │ ├── decision.py
│ │ ├── service_profile.py
│ │ └── metrics.py
│ │
│ ├── infrastructure/ # 基础设施客户端
│ │ ├── kafka_client.py
│ │ ├── redis_client.py
│ │ ├── postgres_client.py
│ │ └── k8s_client.py
│ │
│ └── api/ # FastAPI入口
│ ├── main.py
│ ├── routers/
│ └── middleware/
│
├── tests/
│ ├── unit/
│ ├── integration/
│ └── e2e/
│
├── helm/ # Kubernetes Helm Chart
│ └── harness-agent/
│ ├── Chart.yaml
│ ├── values.yaml
│ ├── values-production.yaml
│ └── templates/
│ ├── deployment.yaml
│ ├── service.yaml
│ ├── configmap.yaml
│ ├── secret.yaml
│ ├── hpa.yaml
│ ├── serviceaccount.yaml
│ └── rbac.yaml
│
└── scripts/
├── setup_dev.sh
├── run_tests.sh
└── deploy.sh
3.6.2 核心Agent实现(完整代码)
# src/agents/base_agent.py
"""
BaseAgent:所有Agent的基类
实现了感知-决策-执行-学习的完整闭环
"""
import asyncio
import uuid
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timezone
import structlog
from ..modules.perception.metrics_collector import MetricsCollector
from ..modules.cognition.memory_manager import MemoryManager
from ..modules.decision.decision_router import DecisionRouter
from ..modules.execution.idempotent_executor import IdempotentExecutor
from ..modules.evolution.online_learner import OnlineLearner
from ..infrastructure.kafka_client import HarnessMessageBus
logger = structlog.get_logger(__name__)
@dataclass
class AgentConfig:
"""Agent通用配置"""
agent_id: str = field(default_factory=lambda: str(uuid.uuid4()))
agent_name: str = "unnamed_agent"
max_actions_per_minute: int = 60
decision_timeout_seconds: float = 30.0
execution_timeout_seconds: float = 300.0
enable_learning: bool = True
dry_run: bool = False
log_level: str = "INFO"
@dataclass
class Observation:
"""Agent观测到的环境状态"""
timestamp: float = field(default_factory=time.time)
metrics: Dict[str, Any] = field(default_factory=dict)
events: List[Dict] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
raw_data: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Action:
"""Agent决定执行的动作"""
action_type: str
params: Dict[str, Any] = field(default_factory=dict)
confidence: float = 1.0
reasoning: str = ""
idempotency_key: str = field(default_factory=lambda: str(uuid.uuid4()))
@dataclass
class ActionResult:
"""动作执行结果"""
success: bool
action: Action
outcome: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
duration_ms: float = 0.0
timestamp: float = field(default_factory=time.time)
class BaseAgent(ABC):
"""
Agent基类:定义所有Agent共同遵循的感知-决策-执行-学习闭环
子类需要实现:
- perceive(): 如何感知环境
- should_act(): 是否需要行动(避免空转)
- decide(): 决定执行什么动作
- execute(): 执行具体动作
- compute_reward(): 计算行动的奖励信号
基类提供:
- run_loop(): 主运行循环
- 决策记录与学习
- 并发控制与速率限制
- 遥测与可观测性
"""
def __init__(
self,
config: AgentConfig,
memory: MemoryManager,
message_bus: HarnessMessageBus,
decision_router: DecisionRouter,
executor: IdempotentExecutor,
learner: Optional[OnlineLearner] = None
):
self.config = config
self.memory = memory
self.message_bus = message_bus
self.decision_router = decision_router
self.executor = executor
self.learner = learner
# 速率限制
self._action_timestamps: List[float] = []
# 运行状态
self._running = False
self._loop_count = 0
self._total_actions = 0
self._total_errors = 0
self.log = logger.bind(
agent_id=config.agent_id,
agent_name=config.agent_name
)
# ─────────────────────────────────────────────────────────
# 抽象方法(子类必须实现)
# ─────────────────────────────────────────────────────────
@abstractmethod
async def perceive(self) -> Observation:
"""
感知当前环境状态
子类应从各种数据源(Prometheus、日志、Kubernetes API等)
采集数据并整合成Observation对象
"""
pass
@abstractmethod
async def should_act(self, observation: Observation) -> bool:
"""
判断是否需要行动
避免Agent在环境正常时无谓地运行决策引擎,节省计算资源
"""
pass
@abstractmethod
async def decide(self, observation: Observation) -> Optional[Action]:
"""
基于观测制定行动决策
调用decision_router选择合适的决策路径,
返回None表示决定不行动
"""
pass
@abstractmethod
async def execute(self, action: Action) -> ActionResult:
"""
执行决定的动作
通过executor确保幂等性,所有操作都应可安全重试
"""
pass
@abstractmethod
async def compute_reward(
self,
observation: Observation,
action: Action,
result: ActionResult,
next_observation: Observation
) -> float:
"""
计算本次行动的奖励信号
用于强化学习的在线更新
正值:好的行动(指标改善)
负值:坏的行动(指标恶化或引发故障)
"""
pass
# ─────────────────────────────────────────────────────────
# 核心运行循环(基类实现)
# ─────────────────────────────────────────────────────────
async def run_loop(
self,
interval_seconds: float = 10.0,
max_iterations: Optional[int] = None
) -> None:
"""
Agent主运行循环
循环结构:感知 → 判断 → 决策 → 执行 → 学习
Args:
interval_seconds: 循环间隔(秒)
max_iterations: 最大迭代次数(None=无限运行)
"""
self._running = True
self.log.info("Agent starting", interval_seconds=interval_seconds)
try:
while self._running:
if max_iterations and self._loop_count >= max_iterations:
break
loop_start = time.time()
self._loop_count += 1
try:
await self._single_iteration()
except Exception as e:
self._total_errors += 1
self.log.error(
"Iteration failed",
loop=self._loop_count,
error=str(e),
exc_info=True
)
# 不停止循环,继续下一次迭代
# 保持固定间隔
elapsed = time.time() - loop_start
sleep_time = max(0, interval_seconds - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
except asyncio.CancelledError:
self.log.info("Agent loop cancelled")
finally:
self._running = False
self.log.info(
"Agent stopped",
total_loops=self._loop_count,
total_actions=self._total_actions,
total_errors=self._total_errors
)
async def _single_iteration(self) -> None:
"""单次感知-决策-执行-学习迭代"""
# 1. 感知
observation = await asyncio.wait_for(
self.perceive(),
timeout=self.config.decision_timeout_seconds
)
# 将观测存入工作记忆
await self.memory.store_observation(observation)
# 2. 判断是否需要行动
if not await self.should_act(observation):
return
# 速率限制检查
if not self._check_rate_limit():
self.log.warning("Rate limit reached, skipping action")
return
# 3. 决策
action = await asyncio.wait_for(
self.decide(observation),
timeout=self.config.decision_timeout_seconds
)
if action is None:
return # Agent决定不行动
self.log.info(
"Action decided",
action_type=action.action_type,
confidence=action.confidence,
reasoning=action.reasoning[:200]
)
# 4. 执行
if self.config.dry_run:
self.log.info("DRY RUN: Would execute", action=action.action_type)
return
result = await asyncio.wait_for(
self.execute(action),
timeout=self.config.execution_timeout_seconds
)
self._total_actions += 1
if not result.success:
self.log.error(
"Action execution failed",
action_type=action.action_type,
error=result.error
)
# 5. 学习(异步,不阻塞主循环)
if self.config.enable_learning and self.learner:
next_observation = await self.perceive()
reward = await self.compute_reward(
observation, action, result, next_observation
)
asyncio.create_task(
self._async_learn(observation, action, reward, next_observation)
)
async def _async_learn(
self,
observation: Observation,
action: Action,
reward: float,
next_observation: Observation
) -> None:
"""异步学习:不阻塞主循环"""
try:
await self.learner.record_experience(
state=observation.metrics,
action=action.action_type,
reward=reward,
next_state=next_observation.metrics,
metadata={
"agent_id": self.config.agent_id,
"loop_count": self._loop_count
}
)
# 发布学习经验到Kafka,供离线训练使用
await self.message_bus.publish_learning_experience(
deployment_id=action.params.get("deployment_id", "unknown"),
state_before=observation.metrics,
action=action.action_type,
reward=reward,
state_after=next_observation.metrics
)
except Exception as e:
self.log.warning("Learning failed", error=str(e))
def _check_rate_limit(self) -> bool:
"""检查速率限制(每分钟最大行动次数)"""
now = time.time()
# 清除1分钟前的记录
self._action_timestamps = [
ts for ts in self._action_timestamps
if now - ts < 60
]
if len(self._action_timestamps) >= self.config.max_actions_per_minute:
return False
self._action_timestamps.append(now)
return True
def stop(self) -> None:
"""停止Agent运行"""
self._running = False
def get_stats(self) -> dict:
"""获取运行统计"""
return {
"agent_id": self.config.agent_id,
"agent_name": self.config.agent_name,
"is_running": self._running,
"total_loops": self._loop_count,
"total_actions": self._total_actions,
"total_errors": self._total_errors,
"error_rate": self._total_errors / max(1, self._loop_count)
}
# ─────────────────────────────────────────────────────────────────
# src/agents/deployment_agent.py
# 具体实现:部署监控Agent
# ─────────────────────────────────────────────────────────────────
class DeploymentMonitorAgent(BaseAgent):
"""
部署监控Agent
职责:实时监控进行中的部署,发现异常时触发回滚或告警
"""
def __init__(
self,
config: AgentConfig,
deployment_id: str,
service_name: str,
slo_targets: Dict[str, float],
k8s_client,
prometheus_collector: MetricsCollector,
**kwargs
):
super().__init__(config, **kwargs)
self.deployment_id = deployment_id
self.service_name = service_name
self.slo_targets = slo_targets
self.k8s_client = k8s_client
self.prometheus_collector = prometheus_collector
# 异常计数(连续N次异常才触发行动,避免误判)
self._anomaly_streak = 0
self._anomaly_threshold = 3
# 基准指标(部署前快照)
self._baseline_metrics: Optional[Dict] = None
async def perceive(self) -> Observation:
"""采集当前服务的所有指标"""
# 并行采集多源数据
metrics, k8s_state, events = await asyncio.gather(
self.prometheus_collector.collect_service_metrics(self.service_name),
self._get_k8s_state(),
self._get_recent_events(),
return_exceptions=True
)
return Observation(
metrics={
"error_rate": getattr(metrics, 'error_rate', 0.0),
"p99_latency_ms": getattr(metrics, 'p99_latency_ms', 0.0),
"request_rate": getattr(metrics, 'request_rate', 0.0),
"cpu_usage_pct": getattr(metrics, 'cpu_usage_pct', 0.0),
"memory_usage_pct": getattr(metrics, 'memory_usage_pct', 0.0),
},
events=events if isinstance(events, list) else [],
context={
"deployment_id": self.deployment_id,
"service_name": self.service_name,
"k8s_state": k8s_state if not isinstance(k8s_state, Exception) else {},
"slo_targets": self.slo_targets,
"baseline": self._baseline_metrics or {}
}
)
async def should_act(self, observation: Observation) -> bool:
"""只有在发现潜在问题时才触发决策"""
metrics = observation.metrics
# 快速路径:检查明显的SLO违规
if metrics.get("error_rate", 0) > self.slo_targets.get("error_rate", 0.01) * 2:
return True # 错误率超过SLO目标2倍
if metrics.get("p99_latency_ms", 0) > self.slo_targets.get("p99_latency_ms", 500) * 1.5:
return True # 延迟超过SLO目标1.5倍
# 检查Kubernetes事件(如Pod崩溃)
for event in observation.events:
if event.get("reason") in ["CrashLoopBackOff", "OOMKilled", "BackOff"]:
return True
return False
async def decide(self, observation: Observation) -> Optional[Action]:
"""使用三路径决策路由器制定行动"""
context = {
"metrics": observation.metrics,
"deployment": {
"id": self.deployment_id,
"service": self.service_name,
"status": "in_progress"
},
"slo_targets": self.slo_targets,
"baseline": self._baseline_metrics,
"k8s": observation.context.get("k8s_state", {})
}
decision = await self.decision_router.route_decision(context)
if decision.action == "rollback":
return Action(
action_type="rollback",
params={
"deployment_id": self.deployment_id,
"reason": decision.reasoning,
"urgency": "high" if observation.metrics.get("error_rate", 0) > 0.05 else "normal"
},
confidence=decision.confidence,
reasoning=decision.reasoning
)
elif decision.action == "alert_oncall":
return Action(
action_type="alert",
params={
"deployment_id": self.deployment_id,
"severity": "P1",
"message": decision.reasoning,
"metrics": observation.metrics
},
confidence=decision.confidence,
reasoning=decision.reasoning
)
return None # 不行动
async def execute(self, action: Action) -> ActionResult:
"""执行回滚或告警操作"""
start_time = time.time()
try:
if action.action_type == "rollback":
result = await self.executor.execute(
operation=RollbackOperation(
deployment_id=action.params["deployment_id"],
k8s_client=self.k8s_client,
urgency=action.params.get("urgency", "normal")
),
idempotency_key=f"rollback:{action.params['deployment_id']}"
)
return ActionResult(
success=result.success,
action=action,
outcome={"rollback_result": result.to_dict()},
duration_ms=(time.time() - start_time) * 1000
)
elif action.action_type == "alert":
await self.message_bus.publish_deployment_event(
deployment_id=action.params["deployment_id"],
event_type="alert_triggered",
payload=action.params
)
return ActionResult(
success=True,
action=action,
outcome={"alert_sent": True},
duration_ms=(time.time() - start_time) * 1000
)
except Exception as e:
return ActionResult(
success=False,
action=action,
error=str(e),
duration_ms=(time.time() - start_time) * 1000
)
async def compute_reward(
self,
observation: Observation,
action: Action,
result: ActionResult,
next_observation: Observation
) -> float:
"""计算行动奖励"""
if not result.success:
return -1.0 # 执行失败:负奖励
# 对比行动前后的错误率
pre_error_rate = observation.metrics.get("error_rate", 0)
post_error_rate = next_observation.metrics.get("error_rate", 0)
if action.action_type == "rollback":
if post_error_rate < pre_error_rate:
# 回滚有效地降低了错误率
improvement = (pre_error_rate - post_error_rate) / pre_error_rate
return improvement * 5 # 放大奖励信号
else:
return -0.5 # 回滚没有改善,轻微惩罚
return 0.0
async def _get_k8s_state(self) -> dict:
"""获取Kubernetes部署状态"""
try:
deployment = await self.k8s_client.get_deployment(self.service_name)
pods = await self.k8s_client.get_pods(
label_selector=f"app={self.service_name}"
)
return {
"desired_replicas": deployment.spec.replicas,
"ready_replicas": deployment.status.ready_replicas or 0,
"pod_statuses": [
{
"name": pod.metadata.name,
"phase": pod.status.phase,
"ready": all(c.ready for c in (pod.status.conditions or []))
}
for pod in pods.items
]
}
except Exception as e:
self.log.warning("Failed to get k8s state", error=str(e))
return {}
async def _get_recent_events(self) -> list:
"""获取最近5分钟的Kubernetes事件"""
try:
events = await self.k8s_client.get_events(
field_selector=f"involvedObject.name={self.service_name}",
since_minutes=5
)
return [
{
"reason": e.reason,
"message": e.message,
"count": e.count,
"last_time": e.last_timestamp.isoformat() if e.last_timestamp else None
}
for e in events.items
]
except Exception:
return []
async def set_baseline(self) -> None:
"""设置部署前的基准指标"""
obs = await self.perceive()
self._baseline_metrics = obs.metrics.copy()
self.log.info("Baseline metrics captured", baseline=self._baseline_metrics)
3.6.3 Kubernetes部署配置(完整Helm Chart)
# helm/harness-agent/Chart.yaml
apiVersion: v2
name: harness-agent
description: Harness AI Agent Platform
type: application
version: 0.1.0
appVersion: "1.0.0"
dependencies:
- name: postgresql
version: "12.x.x"
repository: "https://charts.bitnami.com/bitnami"
condition: postgresql.enabled
- name: redis
version: "17.x.x"
repository: "https://charts.bitnami.com/bitnami"
condition: redis.enabled
# helm/harness-agent/values.yaml
# Default values for harness-agent
global:
imageRegistry: ""
imagePullSecrets: []
# ─────────────────────────────────────────────
# Orchestrator服务
# ─────────────────────────────────────────────
orchestrator:
enabled: true
replicaCount: 2
image:
repository: harness/orchestrator
pullPolicy: IfNotPresent
tag: "latest"
service:
type: ClusterIP
port: 8080
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 2000m
memory: 2Gi
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 10
targetCPUUtilizationPercentage: 70
targetMemoryUtilizationPercentage: 80
config:
maxConcurrentDeployments: 10
riskThresholdForApproval: 0.7
canaryObservationMinutes: 5
enableAutoRollback: true
logLevel: "INFO"
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: harness-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: harness-secrets
key: redis-url
- name: KAFKA_BROKERS
value: "kafka:9092"
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: harness-secrets
key: anthropic-api-key
# ─────────────────────────────────────────────
# Analyzer服务
# ─────────────────────────────────────────────
analyzer:
enabled: true
replicaCount: 2
image:
repository: harness/analyzer
tag: "latest"
resources:
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 4000m
memory: 4Gi
# ─────────────────────────────────────────────
# Learner服务(ML训练)
# ─────────────────────────────────────────────
learner:
enabled: true
replicaCount: 1 # 训练服务通常单实例
image:
repository: harness/learner
tag: "latest"
resources:
requests:
cpu: 2000m
memory: 4Gi
limits:
cpu: 8000m
memory: 16Gi
config:
trainingSchedule: "0 2 * * *" # 每天凌晨2点训练
batchSize: 256
learningRate: 0.001
modelOutputPath: "/models"
persistence:
enabled: true
size: 50Gi
storageClass: fast-ssd
# ─────────────────────────────────────────────
# 依赖服务配置
# ─────────────────────────────────────────────
postgresql:
enabled: true
auth:
postgresPassword: "" # 必须在values-production.yaml中覆盖
database: harness
primary:
persistence:
size: 100Gi
storageClass: fast-ssd
redis:
enabled: true
auth:
password: "" # 必须在values-production.yaml中覆盖
replica:
replicaCount: 2
# ─────────────────────────────────────────────
# 监控与可观测性
# ─────────────────────────────────────────────
monitoring:
enabled: true
serviceMonitor:
enabled: true
namespace: harness-monitoring
dashboards:
enabled: true
grafana:
namespace: harness-monitoring
# 网络策略
networkPolicy:
enabled: true
ingress:
- from:
- namespaceSelector:
matchLabels:
name: harness-system
# helm/harness-agent/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "harness-agent.fullname" . }}-orchestrator
namespace: {{ .Release.Namespace }}
labels:
{{- include "harness-agent.labels" . | nindent 4 }}
app.kubernetes.io/component: orchestrator
spec:
replicas: {{ .Values.orchestrator.replicaCount }}
selector:
matchLabels:
{{- include "harness-agent.selectorLabels" . | nindent 6 }}
app.kubernetes.io/component: orchestrator
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0 # 零停机更新
template:
metadata:
labels:
{{- include "harness-agent.selectorLabels" . | nindent 8 }}
app.kubernetes.io/component: orchestrator
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
prometheus.io/path: "/metrics"
spec:
serviceAccountName: {{ include "harness-agent.serviceAccountName" . }}
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
# 优雅关闭配置
terminationGracePeriodSeconds: 60
containers:
- name: orchestrator
image: "{{ .Values.orchestrator.image.repository }}:{{ .Values.orchestrator.image.tag }}"
imagePullPolicy: {{ .Values.orchestrator.image.pullPolicy }}
ports:
- name: http
containerPort: 8080
protocol: TCP
- name: metrics
containerPort: 9090
protocol: TCP
env:
{{- toYaml .Values.orchestrator.env | nindent 8 }}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
envFrom:
- configMapRef:
name: {{ include "harness-agent.fullname" . }}-config
# 健康检查
livenessProbe:
httpGet:
path: /api/v1/health/live
port: http
initialDelaySeconds: 30
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /api/v1/health/ready
port: http
initialDelaySeconds: 10
periodSeconds: 5
failureThreshold: 3
# 优雅关闭
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 10"]
resources:
{{- toYaml .Values.orchestrator.resources | nindent 10 }}
volumeMounts:
- name: config
mountPath: /app/config
readOnly: true
- name: tmp
mountPath: /tmp
volumes:
- name: config
configMap:
name: {{ include "harness-agent.fullname" . }}-config
- name: tmp
emptyDir: {}
# Pod反亲和性:确保不同副本分布到不同节点
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/component
operator: In
values:
- orchestrator
topologyKey: kubernetes.io/hostname
# 容忍度:允许调度到有特定标签的节点
tolerations:
- key: "harness-workload"
operator: "Exists"
effect: "NoSchedule"
# helm/harness-agent/templates/hpa.yaml
{{- if .Values.orchestrator.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "harness-agent.fullname" . }}-orchestrator
namespace: {{ .Release.Namespace }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "harness-agent.fullname" . }}-orchestrator
minReplicas: {{ .Values.orchestrator.autoscaling.minReplicas }}
maxReplicas: {{ .Values.orchestrator.autoscaling.maxReplicas }}
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.orchestrator.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: {{ .Values.orchestrator.autoscaling.targetMemoryUtilizationPercentage }}
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 2
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300 # 5分钟稳定窗口,避免频繁缩容
policies:
- type: Pods
value: 1
periodSeconds: 120
{{- end }}
3.6.4 本地调试与测试
单元测试框架
# tests/unit/test_deployment_state_machine.py
"""
部署状态机单元测试
验证所有合法的状态转换和非法转换的异常处理
"""
import pytest
from src.agents.core.state_machine import (
DeploymentStateMachine,
DeploymentState,
DeploymentEvent,
InvalidStateTransitionError
)
@pytest.fixture
def mock_deployment():
"""创建测试用的部署对象"""
class MockDeployment:
id = "test-deployment-001"
status = "pending"
return MockDeployment()
class TestDeploymentStateMachine:
def test_initial_state_is_pending(self, mock_deployment):
sm = DeploymentStateMachine(mock_deployment)
assert sm.current_state == DeploymentState.PENDING
def test_valid_transition_pending_to_risk_assessment(self, mock_deployment):
sm = DeploymentStateMachine(mock_deployment)
new_state = sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
assert new_state == DeploymentState.RISK_ASSESSMENT
def test_valid_full_happy_path(self, mock_deployment):
"""测试完整的成功部署路径"""
sm = DeploymentStateMachine(mock_deployment)
sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
sm.transition(DeploymentEvent.RISK_ACCEPTABLE)
sm.transition(DeploymentEvent.START_VERIFICATION)
sm.transition(DeploymentEvent.COMPLETE)
assert sm.current_state == DeploymentState.COMPLETED
assert len(sm.history) == 4
def test_valid_rollback_path(self, mock_deployment):
"""测试回滚路径"""
sm = DeploymentStateMachine(mock_deployment)
sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
sm.transition(DeploymentEvent.RISK_ACCEPTABLE)
sm.transition(DeploymentEvent.START_VERIFICATION)
sm.transition(DeploymentEvent.VERIFICATION_FAILED)
sm.transition(DeploymentEvent.ROLLBACK_COMPLETED)
assert sm.current_state == DeploymentState.ROLLED_BACK
def test_high_risk_requires_approval(self, mock_deployment):
"""测试高风险部署需要审批"""
sm = DeploymentStateMachine(mock_deployment)
sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
sm.transition(DeploymentEvent.RISK_HIGH)
assert sm.current_state == DeploymentState.AWAITING_APPROVAL
def test_invalid_transition_raises_error(self, mock_deployment):
"""测试非法状态转换应抛出异常"""
sm = DeploymentStateMachine(mock_deployment)
with pytest.raises(InvalidStateTransitionError):
# PENDING状态不能直接触发COMPLETE事件
sm.transition(DeploymentEvent.COMPLETE)
def test_cancel_from_in_progress_triggers_rollback(self, mock_deployment):
"""测试进行中的部署取消后应进入回滚状态"""
sm = DeploymentStateMachine(mock_deployment)
sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
sm.transition(DeploymentEvent.RISK_ACCEPTABLE)
sm.transition(DeploymentEvent.CANCEL)
assert sm.current_state == DeploymentState.ROLLING_BACK
def test_available_events_from_pending(self, mock_deployment):
"""测试PENDING状态下的可用事件"""
sm = DeploymentStateMachine(mock_deployment)
available = sm.available_events()
assert DeploymentEvent.START_RISK_ASSESSMENT in available
assert DeploymentEvent.CANCEL in available
assert DeploymentEvent.COMPLETE not in available
def test_history_records_all_transitions(self, mock_deployment):
"""测试状态历史的完整记录"""
sm = DeploymentStateMachine(mock_deployment)
sm.transition(DeploymentEvent.START_RISK_ASSESSMENT)
sm.transition(DeploymentEvent.RISK_HIGH)
sm.transition(DeploymentEvent.APPROVE)
assert len(sm.history) == 3
assert sm.history[0]["from"] == "pending"
assert sm.history[0]["to"] == "risk_assessment"
# tests/unit/test_rule_engine.py
import pytest
from src.modules.decision.rule_engine import RuleEngine, EXAMPLE_RULES_YAML
@pytest.fixture
def rule_engine():
engine = RuleEngine()
engine.load_from_yaml(EXAMPLE_RULES_YAML)
return engine
class TestRuleEngine:
def test_auto_rollback_on_high_error_rate(self, rule_engine):
"""验证高错误率触发自动回滚"""
context = {
"metrics": {"error_rate": 0.08}, # 8% > 5% threshold
"deployment": {
"status": "in_progress",
"environment": "production"
}
}
result = rule_engine.match(context)
assert result.matched is True
assert result.action == "rollback"
def test_no_rollback_when_error_rate_normal(self, rule_engine):
"""验证正常错误率不触发回滚"""
context = {
"metrics": {"error_rate": 0.001}, # 0.1% < 5% threshold
"deployment": {
"status": "in_progress",
"environment": "production"
}
}
result = rule_engine.match(context)
# 可能匹配其他规则,但不应是rollback
if result.matched:
assert result.action != "rollback"
def test_rule_priority_ordering(self, rule_engine):
"""验证规则优先级:高优先级规则先匹配"""
# 同时满足多个规则的场景
context = {
"metrics": {
"error_rate": 0.1, # 触发auto_rollback(优先级1)
"p99_latency_ms": 1500 # 触发latency_rollback(优先级2)
},
"deployment": {
"status": "in_progress",
"environment": "production"
}
}
result = rule_engine.match(context)
assert result.matched is True
# 应该匹配优先级1的error_rate规则
assert result.triggered_rule.name == "auto_rollback_on_high_error_rate"
集成测试环境搭建
# tests/integration/conftest.py
"""
集成测试配置:使用testcontainers启动真实依赖服务
"""
import pytest
import asyncio
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
from src.infrastructure.postgres_client import PostgresClient
from src.infrastructure.redis_client import RedisClient
from src.infrastructure.kafka_client import HarnessMessageBus
@pytest.fixture(scope="session")
def event_loop():
"""创建会话级别的事件循环"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def postgres_container():
"""启动测试用PostgreSQL容器"""
with PostgresContainer("postgres:15") as postgres:
yield postgres
@pytest.fixture(scope="session")
async def redis_container():
"""启动测试用Redis容器"""
with RedisContainer("redis:7") as redis:
yield redis
@pytest.fixture(scope="session")
async def db_client(postgres_container):
"""创建数据库连接"""
client = PostgresClient(
dsn=postgres_container.get_connection_url()
)
await client.connect()
# 运行Schema迁移
await client.run_migrations("migrations/")
yield client
await client.disconnect()
@pytest.fixture(autouse=True)
async def clean_db(db_client):
"""每个测试前清理数据库"""
yield
# 测试后回滚
await db_client.execute("TRUNCATE deployments, decisions, risk_assessments CASCADE")
# tests/integration/test_deployment_pipeline.py
"""
端到端集成测试:完整部署流程
"""
import pytest
from unittest.mock import AsyncMock, patch
from src.agents.orchestrator_agent import DeploymentOrchestrator, OrchestratorConfig
from src.models.deployment import DeploymentRequest, DeploymentStatus
@pytest.mark.integration
class TestDeploymentPipeline:
@pytest.mark.asyncio
async def test_successful_low_risk_deployment(
self,
orchestrator: DeploymentOrchestrator
):
"""测试低风险部署的完整成功流程"""
request = DeploymentRequest(
service_name="test-service",
target_version="v1.2.3",
environment="production",
triggered_by="test_user"
)
# 模拟低风险评估(分数0.2)
with patch.object(
orchestrator.risk_assessor,
'assess',
return_value=MockRiskReport(overall_score=0.2)
):
with patch.object(
orchestrator.monitor,
'observe_and_validate',
return_value=MockVerifyResult(passed=True)
):
deployment = await orchestrator.submit_deployment(request)
# 等待异步完成(最多30秒)
for _ in range(30):
await asyncio.sleep(1)
updated = await orchestrator.deployment_repo.get(deployment.id)
if updated.status in [
DeploymentStatus.COMPLETED,
DeploymentStatus.FAILED,
DeploymentStatus.ROLLED_BACK
]:
break
final_status = await orchestrator.deployment_repo.get(deployment.id)
assert final_status.status == DeploymentStatus.COMPLETED
@pytest.mark.asyncio
async def test_auto_rollback_on_verification_failure(
self,
orchestrator: DeploymentOrchestrator
):
"""测试验证失败时的自动回滚"""
request = DeploymentRequest(
service_name="unstable-service",
target_version="v2.0.0-broken",
environment="production",
triggered_by="ci_system"
)
with patch.object(
orchestrator.risk_assessor,
'assess',
return_value=MockRiskReport(overall_score=0.3)
):
# 模拟验证失败(指标恶化)
with patch.object(
orchestrator.monitor,
'observe_and_validate',
return_value=MockVerifyResult(
passed=False,
failed_checks=["error_rate: 0.08 > 0.01"]
)
):
deployment = await orchestrator.submit_deployment(request)
# 等待处理完成
await asyncio.sleep(5)
final_status = await orchestrator.deployment_repo.get(deployment.id)
assert final_status.status == DeploymentStatus.ROLLED_BACK
3.7 本章小结
本章从Agent的本质出发,构建了一个完整的自进化超级智能体架构蓝图。以下是核心要点回顾:
架构核心:五层设计
感知层(Perception Layer)
↓ 采集+处理多源遥测数据
认知层(Cognition Layer)
↓ 四级记忆管理+知识图谱
决策层(Decision Layer)
↓ 三路径决策路由(规则/ML/LLM)
执行层(Execution Layer)
↓ 幂等执行+Saga事务
进化层(Evolution Layer)
在线学习+离线训练+A/B测试
关键设计原则
| 原则 | 实现机制 | 为什么重要 |
|---|---|---|
| 感知先于决策 | 多源数据融合+质量保障 | 错误的输入必然导致错误的决策 |
| 分层决策路由 | 规则/ML/LLM三路径 | 不同场景需要不同的决策速度和精度 |
| 幂等执行 | 操作唯一ID+状态存储 | 分布式环境中保证操作的安全可重试性 |
| 持续学习 | 经验缓冲+增量更新 | Agent必须随环境变化而进化 |
| 人类在环 | 高风险审批+反馈收集 | AI决策需要人类监督和知识注入 |
三大案例的核心启示
- Airbnb:用ML风险评分模型替代"一刀切"的发布策略,故障率降低60%
- Shopify:预测性扩容(提前3分钟)比被动响应更有效
- Stripe:形式化验证+渐进式多区域发布是金融级可靠性的必要条件
下一章预告
第四章将深入感知层的实现细节,包括:
- 如何设计一个健壮的遥测数据管道
- 异常检测算法的工程化实现
- 如何从海量指标中提取有意义的"信号"而非"噪声"
本章代码资源
本章所有代码均已整理到配套代码仓库:
git clone https://github.com/harness-book/chapter3-agent-skeleton
cd chapter3-agent-skeleton
make setup # 安装依赖
make test # 运行测试
make dev # 启动本地开发环境
思考题
-
在设计奖励函数时,如果业务目标之间存在冲突(如降低成本 vs 提升可用性),你会如何设计权重?
-
Saga模式在某些情况下无法完全补偿(如无法"取消"一封已发送的邮件通知),你会如何处理这类不可逆操作?
-
假设你的AI Agent连续5次做出错误的回滚决策,你会如何设计熔断机制,防止Agent继续误操作?
-
对于Airbnb的风险评分模型,如果模型开始漂移(生产环境的数据分布与训练数据不同),你会设计什么样的监控机制来检测这个问题?
本章完
更多推荐



所有评论(0)