04 - AI Agents 生产部署与优化
AI Agents 生产部署与优化
·
一、生产环境部署准备
1.1 生产就绪检查清单
1.2 部署架构设计
二、部署方案选择
2.1 部署方式对比
| 方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| 云托管服务 | 开箱即用、运维成本低 | 定制化受限、成本较高 | 快速上线、企业应用 |
| 容器化部署 | 灵活可控、可移植 | 需要运维能力 | 中大型项目、混合云 |
| Serverless | 按需付费、自动扩展 | 冷启动延迟、资源限制 | 流量不稳定、成本敏感 |
| 边缘部署 | 低延迟、数据本地化 | 功能受限、复杂度高 | 实时响应、隐私要求高 |
2.2 基于 Docker + Kubernetes 的部署实践
🐳 Dockerfile 示例
# 基础镜像
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY ./src /app/src
COPY ./config /app/config
# 设置环境变量
ENV PYTHONPATH=/app
ENV AGENT_ENV=production
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动命令
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
☸️ Kubernetes 部署配置
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent-service
labels:
app: ai-agent
spec:
replicas: 3
selector:
matchLabels:
app: ai-agent
template:
metadata:
labels:
app: ai-agent
spec:
containers:
- name: agent
image: your-registry/ai-agent:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: ai-secrets
key: openai-api-key
- name: REDIS_HOST
value: redis-service
- name: VECTOR_DB_URL
value: http://qdrant-service:6333
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: ai-agent-service
spec:
selector:
app: ai-agent
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
# hpa.yaml (水平扩展)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ai-agent-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ai-agent-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
部署流程
# 1. 构建 Docker 镜像
docker build -t ai-agent:v1.0 .
# 2. 推送到镜像仓库
docker tag ai-agent:v1.0 your-registry/ai-agent:v1.0
docker push your-registry/ai-agent:v1.0
# 3. 创建 Kubernetes 密钥
kubectl create secret generic ai-secrets \
--from-literal=openai-api-key=YOUR_KEY
# 4. 部署到 K8s
kubectl apply -f deployment.yaml
kubectl apply -f service.yaml
kubectl apply -f hpa.yaml
# 5. 验证部署
kubectl get pods
kubectl get svc
kubectl logs -f deployment/ai-agent-service
三、性能优化策略
3.1 Context Engineering(上下文工程)
Context Engineering 是优化 Agent 性能的关键技术,通过精心设计上下文来提升效率和准确性。
🔧 Context 优化实现
from typing import List, Dict
import tiktoken
class ContextEngineer:
"""上下文工程优化器"""
def __init__(self, model_name: str = "gpt-4"):
self.tokenizer = tiktoken.encoding_for_model(model_name)
self.max_tokens = 8000 # 预留给输出的token
def optimize_context(
self,
system_prompt: str,
conversation_history: List[Dict],
retrieved_docs: List[str],
max_context_tokens: int = 120000
) -> str:
"""优化上下文以适应 token 限制"""
# 1. 计算固定消耗
system_tokens = self.count_tokens(system_prompt)
available_tokens = max_context_tokens - system_tokens - self.max_tokens
# 2. 压缩对话历史(保留最近和最重要的)
compressed_history = self.compress_conversation(
conversation_history,
max_tokens=available_tokens * 0.3 # 30% 给历史
)
# 3. 优化检索文档(重排序+截断)
optimized_docs = self.optimize_documents(
retrieved_docs,
max_tokens=available_tokens * 0.7 # 70% 给文档
)
# 4. 构建最终上下文
final_context = self.build_context(
system_prompt,
compressed_history,
optimized_docs
)
return final_context
def compress_conversation(
self,
history: List[Dict],
max_tokens: int
) -> List[Dict]:
"""压缩对话历史"""
compressed = []
current_tokens = 0
# 优先保留最近的对话
for message in reversed(history):
msg_tokens = self.count_tokens(str(message))
if current_tokens + msg_tokens <= max_tokens:
compressed.insert(0, message)
current_tokens += msg_tokens
else:
# 如果是关键消息(包含工具调用),尝试摘要
if self.is_critical_message(message):
summary = self.summarize_message(message)
summary_tokens = self.count_tokens(summary)
if current_tokens + summary_tokens <= max_tokens:
compressed.insert(0, {"role": "system", "content": summary})
current_tokens += summary_tokens
break
return compressed
def optimize_documents(
self,
docs: List[str],
max_tokens: int
) -> List[str]:
"""优化检索文档"""
# 1. 去重
unique_docs = list(set(docs))
# 2. 相关性排序(假设已经有相关性分数)
scored_docs = [
(doc, self.calculate_relevance(doc))
for doc in unique_docs
]
scored_docs.sort(key=lambda x: x[1], reverse=True)
# 3. 按 token 限制截断
optimized = []
current_tokens = 0
for doc, score in scored_docs:
doc_tokens = self.count_tokens(doc)
if current_tokens + doc_tokens <= max_tokens:
optimized.append(doc)
current_tokens += doc_tokens
else:
# 截断最后一个文档
remaining_tokens = max_tokens - current_tokens
if remaining_tokens > 100: # 至少保留100 tokens
truncated = self.truncate_document(doc, remaining_tokens)
optimized.append(truncated)
break
return optimized
def count_tokens(self, text: str) -> int:
"""计算文本的 token 数量"""
return len(self.tokenizer.encode(text))
def truncate_document(self, doc: str, max_tokens: int) -> str:
"""智能截断文档(保留开头和关键部分)"""
tokens = self.tokenizer.encode(doc)
if len(tokens) <= max_tokens:
return doc
# 保留前 70% 和后 30%
head_tokens = int(max_tokens * 0.7)
tail_tokens = max_tokens - head_tokens
truncated_tokens = tokens[:head_tokens] + tokens[-tail_tokens:]
return self.tokenizer.decode(truncated_tokens)
def is_critical_message(self, message: Dict) -> bool:
"""判断是否为关键消息"""
content = str(message.get("content", ""))
# 包含工具调用、错误信息或明确的决策
return any(keyword in content.lower() for keyword in [
"tool", "function", "error", "决策", "结论"
])
def summarize_message(self, message: Dict) -> str:
"""摘要消息(可调用 LLM)"""
content = message.get("content", "")
# 简化版:提取前100字符
return f"[摘要] {content[:100]}..."
def calculate_relevance(self, doc: str) -> float:
"""计算文档相关性(简化版)"""
# 实际应该基于查询相似度
return len(doc) / 1000 # 示例:基于长度
使用示例:
# 初始化优化器
optimizer = ContextEngineer(model_name="gpt-4")
# 准备数据
system_prompt = "你是一个专业的客服助手..."
conversation = [
{"role": "user", "content": "我的订单在哪里?"},
{"role": "assistant", "content": "请提供订单号"},
# ... 更多历史
]
retrieved_docs = [
"订单查询指南: ...",
"物流追踪说明: ...",
# ... 更多文档
]
# 优化上下文
optimized_context = optimizer.optimize_context(
system_prompt=system_prompt,
conversation_history=conversation,
retrieved_docs=retrieved_docs
)
# 使用优化后的上下文
response = agent.run(optimized_context)
3.2 缓存策略
多级缓存实现:
import redis
import hashlib
from functools import lru_cache
from typing import Optional, Any
class MultiLevelCache:
"""多级缓存系统"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.l1_max_size = 1000 # L1 缓存大小
@lru_cache(maxsize=1000)
def get_from_l1(self, key: str) -> Optional[str]:
"""L1: Python 内存缓存(LRU)"""
return None # LRU装饰器自动处理
def get_from_l2(self, key: str) -> Optional[str]:
"""L2: Redis 缓存"""
return self.redis.get(key)
async def get_from_l3(self, key: str) -> Optional[str]:
"""L3: Vector DB / Persistent Storage"""
# 从向量数据库或持久化存储获取
pass
async def get(self, key: str) -> Optional[Any]:
"""多级缓存获取"""
# 尝试 L1
result = self.get_from_l1(key)
if result:
return result
# 尝试 L2
result = self.get_from_l2(key)
if result:
# 回写到 L1
self.set_to_l1(key, result)
return result
# 尝试 L3
result = await self.get_from_l3(key)
if result:
# 回写到 L2 和 L1
self.set_to_l2(key, result, ttl=3600)
self.set_to_l1(key, result)
return result
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""设置缓存(所有层级)"""
self.set_to_l1(key, value)
self.set_to_l2(key, value, ttl)
def set_to_l1(self, key: str, value: Any):
"""设置 L1 缓存"""
# LRU 自动管理
pass
def set_to_l2(self, key: str, value: Any, ttl: int):
"""设置 L2 缓存"""
self.redis.setex(key, ttl, value)
@staticmethod
def generate_cache_key(*args, **kwargs) -> str:
"""生成缓存键"""
key_str = f"{args}_{kwargs}"
return hashlib.md5(key_str.encode()).hexdigest()
# 使用示例
class CachedAgent:
def __init__(self, cache: MultiLevelCache):
self.cache = cache
async def run(self, user_input: str):
"""带缓存的执行"""
cache_key = self.cache.generate_cache_key("agent_response", user_input)
# 尝试从缓存获取
cached_result = await self.cache.get(cache_key)
if cached_result:
print("🎯 从缓存返回结果")
return cached_result
# 执行 Agent
result = await self.agent.arun(user_input)
# 存入缓存
self.cache.set(cache_key, result, ttl=1800) # 30分钟
return result
3.3 并发处理优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable
class ConcurrentAgent:
"""支持并发处理的 Agent"""
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.semaphore = asyncio.Semaphore(max_workers)
async def batch_process(
self,
inputs: List[str],
process_func: Callable
) -> List[Any]:
"""批量并发处理"""
tasks = [
self.process_with_semaphore(input_item, process_func)
for input_item in inputs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"❌ 任务 {i} 失败: {result}")
processed_results.append(None)
else:
processed_results.append(result)
return processed_results
async def process_with_semaphore(
self,
input_item: str,
process_func: Callable
):
"""带信号量的处理(限制并发数)"""
async with self.semaphore:
return await process_func(input_item)
# 使用示例
async def main():
agent = ConcurrentAgent(max_workers=5)
inputs = [
"查询订单1",
"查询订单2",
# ... 100个查询
]
async def process_query(query: str):
# 处理单个查询
return await agent_service.run(query)
results = await agent.batch_process(inputs, process_query)
print(f"✅ 完成 {len(results)} 个任务")
四、Agentic Protocols(代理协议)
4.1 什么是 Agentic Protocols?
Agentic Protocols 是标准化的 Agent 通信和集成协议,主要包括:
4.2 MCP (Model Context Protocol) 实践
MCP 是 Anthropic 提出的标准协议,用于 AI 模型与外部工具的交互。
MCP Server 实现示例:
from mcp import Server, Tool, Context
from typing import Dict, Any
class MCPToolServer:
"""MCP 工具服务器"""
def __init__(self):
self.server = Server(name="my-tools")
self.register_tools()
def register_tools(self):
"""注册工具"""
@self.server.tool(
name="search_database",
description="搜索数据库中的信息",
parameters={
"query": {"type": "string", "description": "搜索查询"},
"limit": {"type": "integer", "description": "返回结果数量"}
}
)
async def search_database(
context: Context,
query: str,
limit: int = 10
) -> Dict[str, Any]:
"""数据库搜索工具"""
# 实际搜索逻辑
results = await self.db.search(query, limit)
return {
"results": results,
"count": len(results)
}
@self.server.tool(
name="send_notification",
description="发送通知给用户",
parameters={
"user_id": {"type": "string"},
"message": {"type": "string"}
}
)
async def send_notification(
context: Context,
user_id: str,
message: str
) -> Dict[str, Any]:
"""通知工具"""
success = await self.notification_service.send(user_id, message)
return {"success": success}
def run(self, port: int = 8080):
"""启动 MCP 服务器"""
self.server.run(host="0.0.0.0", port=port)
# 客户端使用
from mcp import Client
async def use_mcp_tools():
client = Client("http://localhost:8080")
# 列出可用工具
tools = await client.list_tools()
print("可用工具:", tools)
# 调用工具
result = await client.call_tool(
"search_database",
query="订单信息",
limit=5
)
print("搜索结果:", result)
4.3 Agent-to-Agent (A2A) 通信
class A2AProtocol:
"""Agent-to-Agent 通信协议"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.registry = {} # Agent 注册表
def register_agent(
self,
agent_id: str,
capabilities: List[str],
endpoint: str
):
"""注册 Agent"""
self.registry[agent_id] = {
"capabilities": capabilities,
"endpoint": endpoint
}
async def send_task(
self,
target_agent_id: str,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""发送任务给其他 Agent"""
if target_agent_id not in self.registry:
raise ValueError(f"Agent {target_agent_id} 未注册")
endpoint = self.registry[target_agent_id]["endpoint"]
# 发送请求
response = await self.http_client.post(
f"{endpoint}/execute",
json={
"task": task,
"from_agent": self.agent_id,
"timestamp": datetime.now().isoformat()
}
)
return response.json()
async def delegate_subtask(
self,
task_description: str,
required_capabilities: List[str]
):
"""委托子任务给合适的 Agent"""
# 找到具备所需能力的 Agent
suitable_agents = [
agent_id
for agent_id, info in self.registry.items()
if all(cap in info["capabilities"] for cap in required_capabilities)
]
if not suitable_agents:
raise ValueError("没有合适的 Agent 处理此任务")
# 选择负载最低的 Agent
target_agent = await self.select_best_agent(suitable_agents)
# 委托任务
result = await self.send_task(
target_agent,
{"description": task_description}
)
return result
# 使用示例
async def multi_agent_collaboration():
# Agent A: 协调者
coordinator = A2AProtocol("coordinator")
# Agent B: 数据分析师
coordinator.register_agent(
"analyst",
capabilities=["data_analysis", "visualization"],
endpoint="http://analyst-service:8080"
)
# Agent C: 报告撰写者
coordinator.register_agent(
"writer",
capabilities=["report_writing", "summarization"],
endpoint="http://writer-service:8080"
)
# 协作流程
# 1. 委托数据分析
analysis_result = await coordinator.delegate_subtask(
"分析2024年销售数据",
required_capabilities=["data_analysis"]
)
# 2. 委托报告撰写
report = await coordinator.delegate_subtask(
f"基于分析结果撰写报告: {analysis_result}",
required_capabilities=["report_writing"]
)
return report
五、监控与可观测性
5.1 全链路追踪
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerExporter
class ObservableAgent:
"""可观测的 Agent"""
def __init__(self, service_name: str):
# 设置追踪
trace.set_tracer_provider(TracerProvider())
tracer_provider = trace.get_tracer_provider()
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
tracer_provider.add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
self.tracer = trace.get_tracer(service_name)
async def run_with_tracing(self, user_input: str):
"""带追踪的执行"""
with self.tracer.start_as_current_span("agent_execution") as span:
span.set_attribute("input", user_input)
span.set_attribute("agent.version", "1.0")
try:
# 1. 查询分析
with self.tracer.start_as_current_span("query_analysis"):
intent = await self.analyze_intent(user_input)
span.set_attribute("intent", intent)
# 2. 工具调用
with self.tracer.start_as_current_span("tool_execution"):
tool_result = await self.execute_tools(intent)
span.set_attribute("tools_used", len(tool_result))
# 3. 生成响应
with self.tracer.start_as_current_span("response_generation"):
response = await self.generate_response(tool_result)
span.set_attribute("response_length", len(response))
span.set_status(trace.Status(trace.StatusCode.OK))
return response
except Exception as e:
span.set_status(
trace.Status(trace.StatusCode.ERROR, str(e))
)
span.record_exception(e)
raise
5.2 性能监控大盘
六、生产环境最佳实践
✅ 部署检查清单
## 部署前检查
### 性能
- [ ] 响应时间 < 3秒(P95)
- [ ] 并发支持 > 100 QPS
- [ ] Token 优化(Context Engineering)
- [ ] 多级缓存配置
### 可靠性
- [ ] 健康检查端点 `/health`
- [ ] 优雅关机(Grace Shutdown)
- [ ] 重试机制(最多3次)
- [ ] 降级方案(Fallback)
- [ ] 熔断器(Circuit Breaker)
### 安全性
- [ ] API Key 加密存储
- [ ] HTTPS 强制
- [ ] 速率限制(Rate Limit)
- [ ] 输入验证和清洗
- [ ] 审计日志
### 可观测性
- [ ] 结构化日志
- [ ] 全链路追踪(OpenTelemetry)
- [ ] 性能监控(Prometheus + Grafana)
- [ ] 告警配置(PagerDuty / Slack)
### 成本
- [ ] Token 使用监控
- [ ] 缓存命中率 > 60%
- [ ] 自动扩缩容配置
- [ ] 成本预算告警
``
更多推荐



所有评论(0)