1. 项目概述:构建生产级Claude API智能体的三层架构

最近在几个企业级项目中深度使用了Anthropic的Claude API来构建智能体系统,踩了不少坑,也积累了一套行之有效的架构模式。我发现,很多开发者拿到Claude API后,直接就开始写简单的对话循环,结果在真实生产环境中遇到性能、稳定性、成本控制等一系列问题。今天我想分享的这套“三层架构”,是我从实际项目中提炼出来的,专门针对生产环境设计的Claude智能体开发框架。

简单来说,这个架构将智能体系统分为 交互层、逻辑层和基础设施层 。交互层负责处理用户输入和输出格式化;逻辑层是智能体的“大脑”,包含提示工程、思维链和工具调用;基础设施层则处理API调用、缓存、监控和容错。这种分层设计最大的好处是 解耦 ——每层可以独立演进,便于团队协作,也更容易进行单元测试和性能优化。

举个例子,上个月我们为一个电商客服系统部署Claude智能体,初期版本把所有代码堆在一个文件里。当需要增加对话历史管理功能时,改动一处就引发多处错误。采用三层架构重构后,我们只需要在基础设施层添加一个历史管理模块,其他层几乎不用动,开发效率提升明显,系统稳定性也大幅提高。

这套架构适合正在或计划将Claude API用于生产环境的开发者、技术负责人和架构师。无论你是构建客服机器人、内容创作助手、数据分析工具还是复杂的业务流程自动化系统,这个框架都能提供坚实的工程基础。接下来,我会逐层拆解,分享每层的设计思路、核心代码和避坑经验。

2. 架构全景与核心设计哲学

2.1 为什么需要三层架构?

在深入代码之前,我们先要理解为什么传统的“单文件脚本”模式在生产环境中行不通。Claude API虽然强大,但生产环境的要求完全不同:需要处理高并发、保证响应时间、控制API成本、维护对话一致性、提供监控告警等。三层架构正是为了解决这些问题而生。

生产环境的四大挑战

  1. 性能与扩展性 :用户量从几十到几千时,系统架构需要能水平扩展
  2. 稳定性与容错 :API可能超时、限流或返回错误,系统需要有降级方案
  3. 成本控制 :Claude API按token计费,不当使用可能导致巨额账单
  4. 可维护性 :随着功能增加,代码需要保持清晰的结构便于团队协作

三层架构的核心思想是 关注点分离 。交互层只关心“用户看到什么”,逻辑层只关心“智能体怎么思考”,基础设施层只关心“如何可靠地执行”。这种分离让每层都可以用最适合的技术实现,也便于单独优化。

2.2 各层职责与数据流

让我用具体的场景说明三层如何协作。假设用户问:“帮我分析上个月的销售数据,找出增长最快的三个产品类别。”

用户输入 → 交互层(接收、预处理) → 逻辑层(分析意图、构建提示) → 基础设施层(调用API、获取结果) → 逻辑层(解析响应、决策) → 交互层(格式化输出) → 用户

交互层 :接收用户原始输入,可能来自Web界面、API接口或消息队列。这一层负责输入验证、会话标识、上下文组装(如附加用户历史、系统提示)。在我们的电商案例中,交互层还会从数据库中提取用户的基本信息和历史订单,作为上下文的一部分。

逻辑层 :这是智能体的“决策中心”。它分析用户意图,决定是否需要调用工具(如查询数据库、调用外部API),构建给Claude的提示词。关键的是,逻辑层不直接调用Claude API——它只生成“指令”,由基础设施层执行。这种设计让逻辑层可以专注于业务逻辑,而不被网络请求、重试机制等基础设施问题干扰。

基础设施层 :最底层但最关键。它负责实际调用Claude API,处理认证、限流、重试、缓存、监控等。这一层通常以服务或库的形式存在,被上层透明地调用。好的基础设施层能让上层开发者像使用本地函数一样使用Claude API,而不用担心网络问题或配额限制。

3. 交互层:用户界面的智能网关

3.1 输入处理与上下文管理

交互层是用户与智能体的第一接触点,设计好坏直接影响用户体验。我建议将交互层设计为无状态的,这样便于水平扩展。每个请求都应包含完整的上下文,而不是依赖服务器端的状态。

核心组件设计

class InteractionLayer:
    def __init__(self, session_store, user_profile_service):
        self.session_store = session_store  # 会话存储(Redis等)
        self.user_profile = user_profile_service  # 用户信息服务
        
    async def process_input(self, user_id: str, raw_input: str, context: dict = None) -> dict:
        """
        处理用户输入,构建完整的请求上下文
        """
        # 1. 输入验证与清洗
        cleaned_input = self._sanitize_input(raw_input)
        
        # 2. 获取或创建会话
        session = await self._get_or_create_session(user_id)
        
        # 3. 构建上下文(用户信息、历史、系统提示等)
        full_context = await self._build_context(
            user_id, cleaned_input, session, context
        )
        
        # 4. 传递给逻辑层
        return {
            "processed_input": cleaned_input,
            "context": full_context,
            "session_id": session.id,
            "metadata": self._extract_metadata(cleaned_input)
        }
    
    def _sanitize_input(self, text: str) -> str:
        """清理用户输入,防止提示注入等安全问题"""
        # 移除或转义可能破坏提示结构的特殊字符
        # 但注意不要过度清理,以免影响语义
        import html
        text = html.escape(text)  # 基础HTML转义
        # 移除过长的连续空格、换行符等
        text = ' '.join(text.split())
        return text[:4000]  # 长度限制,防止滥用

上下文构建的实践经验 : 在实际项目中,上下文的质量直接决定Claude回复的质量。我总结了一套“上下文优先级”策略:

  1. 系统提示 (最高优先级):定义智能体的角色、能力和限制
  2. 对话历史 (最近3-5轮):保持对话连贯性
  3. 用户画像 :权限级别、偏好设置、历史行为
  4. 业务上下文 :当前操作涉及的数据、状态等
  5. 工具可用性 :当前可用的工具列表及其描述

一个常见的错误是把所有可用信息都塞进上下文。Claude有token限制(通常128K),需要精心选择。我们的经验是: 相关性优先于完整性 。只包含与当前查询直接相关的信息。

3.2 输出格式化与流式响应

Claude API支持流式响应,这对用户体验至关重要。想象一下,如果用户问一个复杂问题,等待10秒才看到完整回答,体验会很差。流式响应让用户看到思考过程,感觉更自然。

流式响应实现

import asyncio
from typing import AsyncGenerator

class StreamingFormatter:
    def __init__(self):
        self.buffer = ""
        self.sentence_endings = {'.', '!', '?', '。', '!', '?'}
        
    async def format_stream(
        self, stream: AsyncGenerator[str, None]
    ) -> AsyncGenerator[str, None]:
        """
        处理流式响应,按句子或合理段落切分
        """
        async for chunk in stream:
            self.buffer += chunk
            
            # 寻找合适的切分点(句子结束或达到一定长度)
            while True:
                # 优先在句子边界切分
                split_pos = -1
                for i, char in enumerate(self.buffer):
                    if char in self.sentence_endings:
                        # 确保后面是空格或结束
                        if i + 1 >= len(self.buffer) or self.buffer[i + 1].isspace():
                            split_pos = i + 1
                            break
                
                # 如果没有句子边界,但缓冲区过长,在空格处切分
                if split_pos == -1 and len(self.buffer) > 80:
                    # 查找最近的空格
                    for i in range(79, 0, -1):
                        if self.buffer[i].isspace():
                            split_pos = i + 1
                            break
                
                if split_pos > 0:
                    yield self.buffer[:split_pos]
                    self.buffer = self.buffer[split_pos:]
                else:
                    break
        
        # 发送剩余内容
        if self.buffer:
            yield self.buffer
            self.buffer = ""

输出格式化的注意事项

  1. 保持一致性 :无论响应内容如何,格式(如Markdown、纯文本、结构化数据)应该一致
  2. 错误处理 :当Claude返回错误或超时时,要有友好的降级消息
  3. 内容安全 :对输出内容进行必要的过滤,防止返回不当内容
  4. 性能考虑 :流式响应不要过于频繁地发送小片段,避免前端渲染压力

在我们的电商客服系统中,我们还为输出添加了 结构化数据标记 。例如,当Claude推荐产品时,除了自然语言描述,还会输出产品ID列表,方便前端直接生成可点击的卡片。

4. 逻辑层:智能体的思考引擎

4.1 提示工程与思维链设计

逻辑层的核心是 提示工程 。好的提示能让Claude发挥最大效能,差的提示则可能导致无关甚至错误的回答。经过多个项目实践,我总结了一套“结构化提示”方法。

基础提示模板

class PromptEngine:
    def __init__(self, system_prompt: str, few_shot_examples: list = None):
        self.system_prompt = system_prompt
        self.examples = few_shot_examples or []
        
    def build_conversation_prompt(
        self, 
        user_input: str, 
        context: dict,
        tools: list = None
    ) -> list:
        """
        构建完整的对话提示
        返回格式符合Claude消息API要求
        """
        messages = []
        
        # 1. 系统提示
        messages.append({
            "role": "system",
            "content": self._enhance_system_prompt(context)
        })
        
        # 2. 少量示例(few-shot learning)
        for example in self.examples[:3]:  # 限制示例数量
            messages.extend(example)
        
        # 3. 对话历史(最近的几轮)
        if "conversation_history" in context:
            for msg in context["conversation_history"][-6:]:  # 最近3轮
                messages.append(msg)
        
        # 4. 当前用户输入
        messages.append({
            "role": "user",
            "content": self._format_user_input(user_input, context, tools)
        })
        
        return messages
    
    def _enhance_system_prompt(self, context: dict) -> str:
        """根据上下文增强系统提示"""
        base_prompt = self.system_prompt
        
        # 添加用户特定信息
        if "user_role" in context:
            base_prompt += f"\n\n当前用户角色:{context['user_role']}"
        
        # 添加业务上下文
        if "business_context" in context:
            base_prompt += f"\n\n业务上下文:{context['business_context']}"
        
        # 添加当前日期时间(避免Claude使用过时信息)
        from datetime import datetime
        current_time = datetime.now().strftime("%Y年%m月%d日 %H:%M")
        base_prompt += f"\n\n当前时间:{current_time}(所有时间相关回答请基于此时间)"
        
        return base_prompt

思维链(Chain-of-Thought)的关键技巧 : 在复杂任务中,直接问Claude答案往往效果不好。更好的方法是引导Claude展示思考过程:

  1. 明确要求分步思考 :在提示中加入“请逐步思考”、“先分析问题,再给出答案”
  2. 提供思考模板 :对于特定类型问题,提供思考框架
  3. 验证中间步骤 :对于关键推理步骤,可以要求Claude自我验证

例如,在处理数据分析请求时,我们的提示会这样设计:

用户问题:分析上个月销售数据,找出问题并建议改进措施

系统提示:你是一个数据分析专家。请按以下步骤思考:
1. 首先明确用户需要什么数据(时间范围、指标、维度)
2. 然后假设数据可能反映的问题(需要什么数据验证)
3. 接着提出数据获取或计算的具体方法
4. 最后基于分析给出 actionable 的建议

请逐步展示你的思考过程,用【思考】标记思考步骤,用【结论】标记最终答案。

4.2 工具调用与函数设计

Claude支持工具调用(function calling),这是构建强大智能体的关键。但工具设计有讲究——不是所有功能都适合暴露为工具。

工具设计原则

  1. 原子性 :每个工具应该只做一件事,且做好
  2. 安全性 :工具调用必须有权限控制
  3. 可观测性 :工具调用应该被记录和监控
  4. 错误处理 :工具失败时要有清晰的错误信息

工具注册与调用框架

from typing import Dict, Any, Callable, Optional
import inspect
import json

class ToolRegistry:
    def __init__(self):
        self._tools = {}
        self._tool_descriptions = []
        
    def register_tool(self, func: Callable) -> Callable:
        """注册一个工具函数"""
        # 从函数签名和docstring提取描述
        sig = inspect.signature(func)
        docstring = func.__doc__ or ""
        
        # 解析参数说明(简化版,实际项目可以更复杂)
        params = {}
        for param_name, param in sig.parameters.items():
            params[param_name] = {
                "type": str(param.annotation) if param.annotation != inspect.Parameter.empty else "string",
                "description": self._extract_param_description(docstring, param_name),
                "required": param.default == inspect.Parameter.empty
            }
        
        tool_schema = {
            "name": func.__name__,
            "description": docstring.split('\n')[0] if docstring else "",
            "parameters": params
        }
        
        self._tools[func.__name__] = func
        self._tool_descriptions.append(tool_schema)
        
        return func
    
    async def execute_tool(self, tool_name: str, arguments: dict, context: dict) -> Any:
        """执行工具调用"""
        if tool_name not in self._tools:
            raise ValueError(f"工具未注册: {tool_name}")
        
        tool_func = self._tools[tool_name]
        
        # 权限检查
        if not self._check_permission(tool_name, context):
            return {"error": "权限不足", "tool": tool_name}
        
        # 参数验证
        try:
            validated_args = self._validate_arguments(tool_name, arguments)
        except ValueError as e:
            return {"error": f"参数验证失败: {str(e)}", "tool": tool_name}
        
        # 执行工具(支持同步和异步)
        try:
            if inspect.iscoroutinefunction(tool_func):
                result = await tool_func(**validated_args, context=context)
            else:
                result = tool_func(**validated_args, context=context)
            
            # 记录工具调用
            self._log_tool_call(tool_name, validated_args, result, context)
            
            return result
            
        except Exception as e:
            # 错误处理:返回结构化错误信息
            error_result = {
                "error": str(e),
                "error_type": type(e).__name__,
                "tool": tool_name
            }
            self._log_tool_error(tool_name, validated_args, error_result, context)
            return error_result
    
    def get_tools_for_prompt(self) -> list:
        """获取用于提示的工具描述"""
        return self._tool_descriptions

工具调用的最佳实践

  1. 工具粒度要适中 :太细(如 get_user_name )会导致频繁调用,增加延迟和成本;太粗(如 analyze_all_data )则灵活性差
  2. 提供足够的上下文 :工具函数应该能访问会话上下文,但不要传递整个上下文对象
  3. 实现工具链 :复杂任务可能需要多个工具协作,逻辑层应该能协调工具调用顺序
  4. 处理工具依赖 :某些工具需要其他工具的结果作为输入

在我们的内容审核系统中,我们设计了这样的工具链:

用户请求 → 内容分析工具 → 违规检测工具 → 风险评分工具 → 处置建议工具

每个工具独立且可测试,逻辑层负责传递数据和决策。

5. 基础设施层:生产环境的坚实底座

5.1 API客户端与连接管理

基础设施层直接与Claude API交互,这一层的稳定性决定整个系统的可靠性。我强烈建议不要直接使用官方的简单客户端,而是构建一个增强版客户端。

增强型API客户端设计

import asyncio
import time
import hashlib
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from anthropic import AsyncAnthropic
import backoff
import redis.asyncio as redis

@dataclass
class APIConfig:
    """API配置"""
    api_key: str
    base_url: str = "https://api.anthropic.com"
    timeout: int = 30
    max_retries: int = 3
    rate_limit_rpm: int = 1000  # 每分钟请求数限制
    
class EnhancedClaudeClient:
    def __init__(self, config: APIConfig, cache_client: Optional[redis.Redis] = None):
        self.config = config
        self.cache = cache_client
        self._client = AsyncAnthropic(
            api_key=config.api_key,
            base_url=config.base_url,
            timeout=config.timeout,
            max_retries=0  # 我们自己实现重试逻辑
        )
        
        # 限流器
        self._rate_limiter = RateLimiter(
            max_requests=config.rate_limit_rpm,
            period=60  # 60秒
        )
        
        # 统计信息
        self._stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_tokens": 0,
            "total_cost": 0.0
        }
    
    @backoff.on_exception(
        backoff.expo,
        (Exception,),
        max_tries=3,
        jitter=backoff.full_jitter
    )
    async def create_message(
        self,
        messages: List[Dict[str, Any]],
        model: str = "claude-3-opus-20240229",
        max_tokens: int = 4096,
        temperature: float = 0.7,
        stream: bool = False,
        use_cache: bool = True,
        **kwargs
    ) -> Dict[str, Any]:
        """
        增强的API调用方法,包含缓存、重试、限流、监控
        """
        # 1. 检查缓存
        cache_key = None
        if use_cache and self.cache:
            cache_key = self._generate_cache_key(messages, model, max_tokens, temperature)
            cached = await self.cache.get(cache_key)
            if cached:
                self._stats["total_requests"] += 1
                self._stats["successful_requests"] += 1
                return json.loads(cached)
        
        # 2. 限流
        await self._rate_limiter.acquire()
        
        # 3. 准备请求
        request_start = time.time()
        self._stats["total_requests"] += 1
        
        try:
            # 4. 实际API调用
            response = await self._client.messages.create(
                model=model,
                messages=messages,
                max_tokens=max_tokens,
                temperature=temperature,
                stream=stream,
                **kwargs
            )
            
            request_time = time.time() - request_start
            
            # 5. 处理响应
            result = self._process_response(response, stream)
            
            # 6. 更新统计
            self._stats["successful_requests"] += 1
            if hasattr(response, 'usage'):
                self._stats["total_tokens"] += response.usage.total_tokens
                self._stats["total_cost"] += self._calculate_cost(
                    model, response.usage
                )
            
            # 7. 记录监控指标
            await self._record_metrics({
                "model": model,
                "request_time": request_time,
                "tokens": response.usage.total_tokens if hasattr(response, 'usage') else 0,
                "success": True
            })
            
            # 8. 缓存结果(非流式响应)
            if use_cache and self.cache and cache_key and not stream:
                # 根据模型和temperature决定缓存时间
                cache_ttl = self._get_cache_ttl(model, temperature)
                await self.cache.setex(
                    cache_key,
                    cache_ttl,
                    json.dumps(result)
                )
            
            return result
            
        except Exception as e:
            # 错误处理
            self._stats["failed_requests"] += 1
            
            await self._record_metrics({
                "model": model,
                "request_time": time.time() - request_start,
                "success": False,
                "error": str(e)
            })
            
            # 根据错误类型决定是否重试
            if self._should_retry(e):
                raise  # 触发重试
            else:
                # 业务错误,不重试
                return self._create_error_response(e)
    
    def _generate_cache_key(self, messages, model, max_tokens, temperature) -> str:
        """生成缓存键"""
        content = json.dumps({
            "messages": messages,
            "model": model,
            "max_tokens": max_tokens,
            "temperature": temperature
        }, sort_keys=True)
        return f"claude:{hashlib.md5(content.encode()).hexdigest()}"
    
    def _get_cache_ttl(self, model: str, temperature: float) -> int:
        """根据模型和temperature确定缓存时间"""
        # 温度越低,结果越确定,缓存时间可以越长
        if temperature < 0.3:
            if "opus" in model:
                return 3600  # 1小时
            else:
                return 1800  # 30分钟
        elif temperature < 0.7:
            return 300  # 5分钟
        else:
            return 60  # 1分钟(高temperature结果随机性强)

连接管理的关键考虑

  1. 连接池 :对于高并发场景,需要管理HTTP连接池
  2. 超时设置 :区分连接超时、读取超时、总超时
  3. DNS缓存 :避免DNS查询成为性能瓶颈
  4. 代理支持 :企业环境可能需要配置代理

5.2 缓存策略与性能优化

缓存是降低API成本、提高响应速度的关键。但缓存AI响应需要特别小心——错误的缓存可能导致用户看到过时或不相关的信息。

多层缓存策略 : 我们采用三级缓存策略,针对不同场景优化:

  1. 内存缓存(L1) :存储高频、小体积的响应,如常用提示词的补全
  2. Redis缓存(L2) :存储中等频率的响应,缓存时间根据temperature和业务需求调整
  3. 数据库缓存(L3) :存储历史对话,用于用户查询历史或分析

缓存失效策略

class SmartCacheManager:
    def __init__(self, redis_client, model_metadata_provider):
        self.redis = redis_client
        self.model_metadata = model_metadata_provider
        
    async def get_cached_response(self, cache_key: str, context: dict) -> Optional[dict]:
        """智能缓存查询"""
        # 1. 检查缓存是否存在
        cached = await self.redis.get(cache_key)
        if not cached:
            return None
        
        data = json.loads(cached)
        
        # 2. 检查缓存是否仍然有效
        if not self._is_cache_valid(data, context):
            await self.redis.delete(cache_key)
            return None
        
        # 3. 检查业务上下文是否变化
        if self._context_changed(data.get("context_hash"), context):
            return None
        
        # 4. 返回缓存,更新访问时间
        await self._update_access_time(cache_key)
        return data["response"]
    
    def _is_cache_valid(self, cached_data: dict, context: dict) -> bool:
        """检查缓存有效性"""
        # 基于时间的失效
        if time.time() - cached_data["timestamp"] > cached_data["ttl"]:
            return False
        
        # 基于模型版本的失效
        model_version = self.model_metadata.get_current_version(
            cached_data["model"]
        )
        if model_version != cached_data["model_version"]:
            return False
        
        # 基于业务规则的失效
        if "user_preferences" in context:
            # 如果用户偏好改变,某些缓存可能失效
            if self._preferences_changed(
                cached_data["user_preferences"],
                context["user_preferences"]
            ):
                return False
        
        return True
    
    def _context_changed(self, old_context_hash: str, new_context: dict) -> bool:
        """检查上下文是否变化"""
        new_hash = self._hash_context(new_context)
        return old_context_hash != new_hash
    
    def _hash_context(self, context: dict) -> str:
        """计算上下文哈希,忽略不相关字段"""
        # 只哈希影响响应的关键字段
        relevant_fields = {
            "user_id": context.get("user_id"),
            "user_role": context.get("user_role"),
            "conversation_history": context.get("conversation_history", [])[-3:],  # 最近3条
            "business_rules": context.get("business_rules", {})
        }
        return hashlib.md5(
            json.dumps(relevant_fields, sort_keys=True).encode()
        ).hexdigest()

缓存的最佳实践

  1. 不要缓存高temperature的响应 :temperature > 0.9时,每次响应差异很大,缓存意义不大
  2. 考虑上下文变化 :用户角色、权限、对话历史变化时,相同问题可能需要不同回答
  3. 设置合理的TTL :技术问题可以缓存较久,时效性内容缓存时间要短
  4. 监控缓存命中率 :优化缓存策略的依据

5.3 监控、日志与告警系统

生产环境没有监控就是盲人摸象。对于Claude API智能体,我们需要监控多个维度:

关键监控指标

  1. 性能指标 :响应时间、Token使用量、请求成功率
  2. 业务指标 :用户满意度、任务完成率、错误类型分布
  3. 成本指标 :API调用成本、缓存命中率、Token使用效率
  4. 质量指标 :回答相关性、准确性、用户反馈

监控系统实现

import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import logging
from datetime import datetime

class MonitoringSystem:
    def __init__(self, service_name: str):
        self.service_name = service_name
        
        # Prometheus指标
        self.requests_total = Counter(
            f'{service_name}_requests_total',
            'Total API requests',
            ['model', 'endpoint', 'status']
        )
        
        self.request_duration = Histogram(
            f'{service_name}_request_duration_seconds',
            'API request duration',
            ['model', 'endpoint'],
            buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0)
        )
        
        self.tokens_used = Counter(
            f'{service_name}_tokens_used_total',
            'Total tokens used',
            ['model', 'token_type']  # input, output
        )
        
        self.cache_hits = Counter(
            f'{service_name}_cache_hits_total',
            'Total cache hits',
            ['cache_level']
        )
        
        # 业务指标
        self.user_satisfaction = Gauge(
            f'{service_name}_user_satisfaction',
            'User satisfaction score',
            ['user_segment']
        )
        
        # 日志配置
        self.logger = self._setup_logger()
    
    async def record_api_call(self, model: str, endpoint: str, 
                            duration: float, success: bool,
                            input_tokens: int = 0, output_tokens: int = 0):
        """记录API调用"""
        status = "success" if success else "failure"
        
        # 更新指标
        self.requests_total.labels(
            model=model, endpoint=endpoint, status=status
        ).inc()
        
        self.request_duration.labels(
            model=model, endpoint=endpoint
        ).observe(duration)
        
        if input_tokens > 0:
            self.tokens_used.labels(
                model=model, token_type="input"
            ).inc(input_tokens)
        
        if output_tokens > 0:
            self.tokens_used.labels(
                model=model, token_type="output"
            ).inc(output_tokens)
        
        # 记录日志
        log_level = logging.INFO if success else logging.ERROR
        self.logger.log(
            log_level,
            f"API调用: model={model}, endpoint={endpoint}, "
            f"duration={duration:.2f}s, success={success}, "
            f"tokens={input_tokens+output_tokens}"
        )
        
        # 触发告警(如果满足条件)
        if not success:
            await self._trigger_alert(
                "api_error",
                f"API调用失败: {model}/{endpoint}",
                severity="warning"
            )
        elif duration > 10.0:  # 慢查询告警
            await self._trigger_alert(
                "slow_request",
                f"慢请求: {model}/{endpoint} 耗时{duration:.2f}s",
                severity="info"
            )
    
    async def record_cache_hit(self, cache_level: str, key: str):
        """记录缓存命中"""
        self.cache_hits.labels(cache_level=cache_level).inc()
        
        # 调试日志
        self.logger.debug(f"缓存命中: level={cache_level}, key={key[:50]}...")
    
    async def record_user_feedback(self, session_id: str, rating: int, 
                                 feedback: str = None):
        """记录用户反馈"""
        self.user_satisfaction.labels(
            user_segment=self._get_user_segment(session_id)
        ).set(rating)
        
        # 存储详细反馈
        feedback_record = {
            "timestamp": datetime.utcnow().isoformat(),
            "session_id": session_id,
            "rating": rating,
            "feedback": feedback,
            "user_segment": self._get_user_segment(session_id)
        }
        
        # 可以存储到数据库或日志系统
        self.logger.info(
            f"用户反馈: {json.dumps(feedback_record, ensure_ascii=False)}"
        )
        
        # 低分反馈触发告警
        if rating <= 2:
            await self._trigger_alert(
                "low_rating",
                f"低分反馈: session={session_id}, rating={rating}",
                severity="warning"
            )

告警策略配置 : 在生产环境中,我们配置了分层告警:

  1. 紧急告警 (P0):API完全不可用、成本异常激增
  2. 警告告警 (P1):错误率升高、响应时间变慢
  3. 信息告警 (P2):缓存命中率下降、用户满意度下降

每个告警都包含:发生了什么、影响范围、可能原因、处理建议。例如,当API错误率超过5%时,告警信息会包括:

  • 哪些模型/端点受影响
  • 错误类型分布(限流、超时、认证失败等)
  • 最近1小时与之前24小时的对比
  • 建议操作:检查API密钥、调整限流设置、联系支持

6. 部署与运维实践

6.1 配置管理与环境隔离

生产环境需要严格的配置管理。硬编码的API密钥、模型参数是灾难的根源。我们采用多层配置系统:

配置管理架构

import os
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum

class Environment(Enum):
    LOCAL = "local"
    DEVELOPMENT = "dev"
    STAGING = "staging"
    PRODUCTION = "production"

@dataclass
class ClaudeConfig:
    """Claude配置"""
    api_key: str
    base_url: str = "https://api.anthropic.com"
    default_model: str = "claude-3-sonnet-20240229"
    timeout: int = 30
    max_retries: int = 3
    
    # 模型特定配置
    model_configs: Dict[str, Dict[str, Any]] = None
    
    def __post_init__(self):
        if self.model_configs is None:
            self.model_configs = {
                "claude-3-opus-20240229": {
                    "max_tokens": 4096,
                    "temperature": 0.7,
                    "cache_ttl": 3600
                },
                "claude-3-sonnet-20240229": {
                    "max_tokens": 8192,
                    "temperature": 0.7,
                    "cache_ttl": 1800
                },
                "claude-3-haiku-20240307": {
                    "max_tokens": 8192,
                    "temperature": 0.7,
                    "cache_ttl": 300
                }
            }

class ConfigManager:
    def __init__(self, env: Environment):
        self.env = env
        self._configs = {}
        self._load_configs()
    
    def _load_configs(self):
        """加载配置"""
        # 1. 从环境变量加载(最高优先级)
        self._load_from_env()
        
        # 2. 从配置文件加载
        self._load_from_file()
        
        # 3. 从配置服务加载(如Consul、AWS Parameter Store)
        if self.env != Environment.LOCAL:
            self._load_from_config_service()
    
    def _load_from_env(self):
        """从环境变量加载配置"""
        # API密钥
        api_key = os.getenv(f"CLAUDE_API_KEY_{self.env.value.upper()}")
        if not api_key:
            raise ValueError(f"未找到环境变量: CLAUDE_API_KEY_{self.env.value.upper()}")
        
        # 其他配置
        base_url = os.getenv("CLAUDE_BASE_URL", "https://api.anthropic.com")
        default_model = os.getenv("CLAUDE_DEFAULT_MODEL", "claude-3-sonnet-20240229")
        
        self._configs["claude"] = ClaudeConfig(
            api_key=api_key,
            base_url=base_url,
            default_model=default_model
        )
    
    def get_claude_config(self) -> ClaudeConfig:
        """获取Claude配置"""
        return self._configs["claude"]
    
    def get_model_config(self, model_name: str) -> Dict[str, Any]:
        """获取特定模型的配置"""
        claude_config = self.get_claude_config()
        return claude_config.model_configs.get(
            model_name,
            claude_config.model_configs[claude_config.default_model]
        )

环境隔离的最佳实践

  1. 使用不同的API密钥 :每个环境使用独立的API密钥,便于监控和成本分摊
  2. 配置差异化 :开发环境可以使用较小的模型、较短的超时时间
  3. 数据隔离 :测试环境不要访问生产数据
  4. 监控分离 :每个环境的监控指标要分开,避免干扰

6.2 成本控制与优化策略

Claude API按token计费,成本控制是生产部署的关键考虑。我们通过多策略组合控制成本:

成本控制策略

class CostController:
    def __init__(self, budget_daily: float, budget_monthly: float):
        self.budget_daily = budget_daily
        self.budget_monthly = budget_monthly
        self.daily_spent = 0.0
        self.monthly_spent = 0.0
        self.usage_history = []
        
        # 模型定价(示例,实际以官方为准)
        self.model_pricing = {
            "claude-3-opus-20240229": {"input": 15.00, "output": 75.00},  # 每百万tokens
            "claude-3-sonnet-20240229": {"input": 3.00, "output": 15.00},
            "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25}
        }
    
    def estimate_cost(self, model: str, input_tokens: int, 
                     output_tokens: int) -> float:
        """估算请求成本"""
        pricing = self.model_pricing.get(model)
        if not pricing:
            # 默认使用sonnet定价
            pricing = self.model_pricing["claude-3-sonnet-20240229"]
        
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        
        return input_cost + output_cost
    
    async def check_and_record(self, model: str, input_tokens: int, 
                              output_tokens: int) -> bool:
        """
        检查是否超出预算并记录使用
        返回True表示允许请求,False表示超出预算
        """
        cost = self.estimate_cost(model, input_tokens, output_tokens)
        
        # 检查日预算
        if self.daily_spent + cost > self.budget_daily:
            self._trigger_budget_alert("daily", self.daily_spent, cost)
            return False
        
        # 检查月预算
        if self.monthly_spent + cost > self.budget_monthly:
            self._trigger_budget_alert("monthly", self.monthly_spent, cost)
            return False
        
        # 记录使用
        self.daily_spent += cost
        self.monthly_spent += cost
        
        usage_record = {
            "timestamp": datetime.utcnow().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": cost,
            "daily_total": self.daily_spent,
            "monthly_total": self.monthly_spent
        }
        self.usage_history.append(usage_record)
        
        # 保留最近1000条记录
        if len(self.usage_history) > 1000:
            self.usage_history = self.usage_history[-1000:]
        
        return True
    
    def get_cost_optimization_suggestions(self) -> List[Dict]:
        """获取成本优化建议"""
        suggestions = []
        
        # 分析使用模式
        model_usage = {}
        for record in self.usage_history[-100:]:  # 最近100次
            model = record["model"]
            model_usage.setdefault(model, {
                "count": 0,
                "total_cost": 0.0,
                "avg_input_tokens": 0,
                "avg_output_tokens": 0
            })
            model_usage[model]["count"] += 1
            model_usage[model]["total_cost"] += record["cost"]
            model_usage[model]["avg_input_tokens"] = (
                (model_usage[model]["avg_input_tokens"] * (model_usage[model]["count"] - 1) +
                 record["input_tokens"]) / model_usage[model]["count"]
            )
            model_usage[model]["avg_output_tokens"] = (
                (model_usage[model]["avg_output_tokens"] * (model_usage[model]["count"] - 1) +
                 record["output_tokens"]) / model_usage[model]["count"]
            )
        
        # 生成建议
        for model, stats in model_usage.items():
            if model == "claude-3-opus-20240229" and stats["count"] > 10:
                # 如果大量使用Opus,考虑降级到Sonnet
                estimated_savings = stats["total_cost"] * 0.8  # 假设节省80%
                suggestions.append({
                    "type": "model_downgrade",
                    "model": model,
                    "suggestion": f"考虑将部分{model}请求降级到claude-3-sonnet",
                    "estimated_savings": estimated_savings,
                    "confidence": "high"
                })
            
            # 检查输入token是否过多
            if stats["avg_input_tokens"] > 8000:
                suggestions.append({
                    "type": "input_optimization",
                    "model": model,
                    "suggestion": f"{model}的平均输入token过高({stats['avg_input_tokens']:.0f}),考虑优化提示词或使用摘要",
                    "estimated_savings": stats["total_cost"] * 0.2,  # 假设节省20%
                    "confidence": "medium"
                })
        
        return suggestions

成本优化的具体措施

  1. 模型选择策略
  • 简单任务使用Haiku
  • 中等复杂度使用Sonnet
  • 只有关键任务使用Opus
  • 实现自动降级:当Sonnet连续失败时再尝试Opus
  1. 提示词优化
  • 移除不必要的上下文
  • 使用更简洁的表达
  • 实现上下文摘要:长对话历史先摘要再发送
  1. 缓存策略优化
  • 高频问题预生成回答
  • 根据temperature调整缓存时间
  • 实现语义缓存:相似问题返回相同答案
  1. 请求批处理
  • 多个相似请求合并处理
  • 异步处理非实时请求

6.3 安全与合规考虑

生产级智能体必须考虑安全和合规问题。Claude API智能体可能处理敏感数据,需要特别注意:

安全措施实现

class SecurityManager:
    def __init__(self, policy_config: Dict):
        self.policies = policy_config
        
        # 敏感数据检测模式
        self.sensitive_patterns = [
            # 身份证号
            r'\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b',
            # 手机号
            r'\b1[3-9]\d{9}\b',
            # 邮箱
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            # 银行卡号
            r'\b[1-9]\d{15,18}\b'
        ]
        
        self.compiled_patterns = [
            re.compile(pattern) for pattern in self.sensitive_patterns
        ]
    
    async def sanitize_input(self, text: str, context: Dict) -> Dict:
        """
        输入清洗与安全检查
        返回清洗后的文本和安全检查结果
        """
        result = {
            "original": text,
            "sanitized": text,
            "security_checks": {
                "has_sensitive_data": False,
                "sensitive_types": [],
                "is_malicious": False,
                "malicious_score": 0
            }
        }
        
        # 1. 敏感信息检测
        sensitive_matches = []
        for pattern in self.compiled_patterns:
            matches = pattern.findall(text)
            if matches:
                sensitive_matches.extend(matches)
                result["security_checks"]["has_sensitive_data"] = True
        
        if sensitive_matches:
            # 记录日志(脱敏后)
            self._log_sensitive_data(context, sensitive_matches)
            
            # 根据策略处理:脱敏、拒绝或继续
            if self.policies.get("mask_sensitive_data", True):
                result["sanitized"] = self._mask_sensitive_data(text, sensitive_matches)
                result["security_checks"]["sensitive_types"] = list(set(
                    self._classify_sensitive_type(match) 
                    for match in sensitive_matches
                ))
            elif self.policies.get("reject_on_sensitive_data", False):
                raise SecurityError("输入包含敏感信息,请求被拒绝")
        
        # 2. 恶意输入检测
        malicious_score = await self._check_malicious_input(text, context)
        result["security_checks"]["malicious_score"] = malicious_score
        
        if malicious_score > self.policies.get("malicious_threshold", 0.7):
            result["security_checks"]["is_malicious"] = True
            if self.policies.get("reject_on_malicious", True):
                raise SecurityError("输入被识别为恶意,请求被拒绝")
        
        # 3. 提示注入检测
        if self._detect_prompt_injection(text):
            result["security_checks"]["has_prompt_injection"] = True
            if self.policies.get("reject_on_prompt_injection", True):
                raise SecurityError("检测到提示注入尝试,请求被拒绝")
        
        return result
    
    def _mask_sensitive_data(self, text: str, matches: List[str]) -> str:
        """脱敏敏感数据"""
        masked_text = text
        for match in matches:
            # 保留部分信息用于上下文,但隐藏细节
            if self._classify_sensitive_type(match) == "phone":
                # 手机号:保留前3后4
                masked = match[:3] + "****" + match[-4:]
            elif self._classify_sensitive_type(match) == "id_card":
                # 身份证:保留前6后4
                masked = match[:6] + "********" + match[-4:]
            else:
                # 其他:通用脱敏
                masked = match[:2] + "***" + match[-2:] if len(match) > 4 else "***"
            
            masked_text = masked_text.replace(match, masked)
        
        return masked_text
    
    async def _check_malicious_input(self, text: str, context: Dict) -> float:
        """检查恶意输入"""
        score = 0.0
        
        # 1. 长度异常检测
        if len(text) > 10000:
            score += 0.3  # 超长文本可能是攻击
        
        # 2. 特殊字符比例
        special_chars = sum(1 for c in text if not c.isalnum() and not c.isspace())
        if special_chars / len(text) > 0.3:
            score += 0.2
        
        # 3. 已知攻击模式
        attack_patterns = [
            r"(?i)ignore.*previous|ignore.*above",
            r"(?i)system.*prompt|assistant.*prompt",
            r"(?i)disregard.*instructions",
            r"(?i)you are now.*",
            r"(?i)from now on.*",
            r"(?i)your new.*instructions",
            r"(?i)forget.*previous",
            r"(?i)do not.*listen.*to",
            r"(?i)this is a test.*ignore",
            r"(?i)output.*as.*json.*only",
            r"(?i)repeat.*word.*for.*word",
            r"(?i)say.*exactly.*this",
            r"<script.*>|javascript:|onload=|onerror=",
            r"SELECT.*FROM|INSERT.*INTO|DROP.*TABLE",
            r"rm -rf|del.*/.*|format.*c:",
            r"curl.*http|wget.*http",
            r"eval\(|exec\(|compile\(",
            r"\.\./\.\./|\.\.\\\.\\",  # 路径遍历
            r"<\?php|<\?=|<%|<\?",
        ]
        
        for pattern in attack_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                score += 0.1
        
        # 4. 频率限制检查
        user_id = context.get("user_id")
        if user_id:
            recent_requests = await self._get_recent_requests(user_id)
            if len(recent_requests) > self.policies.get("max_requests_per_minute", 60):
                score += 0.4
        
        return min(score, 1.0)  # 归一化到0-1
    
    def _detect_prompt_injection(self, text: str) -> bool:
        """检测提示注入尝试"""
        injection_indicators = [
            # 试图覆盖系统提示
            "忽略之前的指令",
            "忘记你之前的身份",
            "你现在是",
            "扮演",
            "假装你是",
            "你的新指令是",
            "系统提示:",
            "human:",
            "assistant:",
            # 试图获取内部信息
            "你的系统提示是什么",
            "你的初始指令",
            "你的配置",
            "你的设置",
            # 试图绕过限制
            "输出所有内容",
            "不要省略",
            "忽略安全限制",
            "绕过限制",
            # 特殊格式
            "```system",
            "```instruction",
            "---system---",
            "===system===",
        ]
        
        text_lower = text.lower()
        for indicator in injection_indicators:
            if indicator in text_lower:
                return True
        
        # 检查是否包含疑似转义序列
        if re.search(r'\\x[0-9a-f]{2}', text):
            return True
        
        return False

合规性考虑

  1. 数据保留策略 :根据法规要求设置对话日志保留时间
  2. 用户同意 :明确告知用户数据如何使用
  3. 数据访问控制 :确保只有授权人员能访问对话数据
  4. 审计日志 :记录所有数据访问和操作

7. 测试与质量保障

7.1 测试策略与实施

生产级智能体需要全面的测试覆盖。我们采用分层测试策略:

测试金字塔

  1. 单元测试 :测试每个独立组件
  2. 集成测试 :测试组件间交互
  3. 端到端测试 :测试完整流程
  4. 负载测试 :测试性能极限
  5. A/B测试 :测试不同配置的效果

单元测试示例

import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from your_module import EnhancedClaudeClient, ToolRegistry, InteractionLayer

class TestEnhancedClaudeClient:
    @pytest.mark.asyncio
    async def test_api_call_success(self):
        """测试成功的API调用"""
        # 模拟配置
        config = APIConfig(api_key="test_key")
        
        # 创建客户端
        client = EnhancedClaudeClient(config)
        
        # 模拟API响应
        mock_response = MagicMock()
        mock_response.usage = MagicMock(input_tokens=100, output_tokens=50)
        mock_response.content = [MagicMock(text="测试响应")]
        
        # 替换实际API调用
        with patch.object(client._client.messages, 'create', 
                         AsyncMock(return_value=mock_response)):
            result = await client.create_message(
                messages=[{"role": "user", "content": "你好"}],
                model="claude-3-sonnet-20240229"
            )
            
            assert result["content"] == "测试响应"
            assert client._stats["successful_requests"] == 1
            assert client._stats["total_tokens"] == 150
    
    @pytest.mark.asyncio
    async def test_api_call_retry(self):
        """测试失败重试"""
        config = APIConfig(api_key="test_key", max_retries=3)
        client = EnhancedClaudeClient(config)
        
        # 模拟第一次失败,第二次成功
        mock_response = MagicMock()
        mock_response.usage = MagicMock(input_tokens=100, output_tokens=50)
        mock_response.content = [MagicMock(text="重试成功")]
        
        call_count = 0
        async def mock_create(*args, **kwargs):
            nonlocal call_count
            call_count += 1
            if call_count == 1:
                raise Exception("第一次失败")
            return mock_response
        
        with patch.object(client._client.messages, 'create', 
                         AsyncMock(side_effect=mock_create)):
            result = await client.create_message(
                messages=[{"role": "user", "content": "测试"}]
            )
            
            assert result["content"] == "重试成功"
            assert call_count == 2
    
    @pytest.mark.asyncio
    async def test_cache_behavior(self):
        """测试缓存行为"""
        config = APIConfig(api_key="test_key")
        
        # 模拟Redis
        mock_redis = AsyncMock()
        mock_redis.get.return_value = json.dumps({
            "response": {"content": "缓存响应"},
            "timestamp": time.time() - 100,  # 100秒前
            "ttl": 300,
            "model": "claude-3-sonnet-20240229",
            "model_version": "20240229"
        })
        
        client = EnhancedClaudeClient(config, cache_client=mock_redis)
        
        # 应该返回缓存,不调用API
        with patch.object(client._client.messages, 'create') as mock_api:
            result = await client.create_message(
                messages=[{"role": "user", "content": "缓存测试"}],
                use_cache=True
            )
            
            # 验证返回了缓存
            assert result["content"] == "缓存响应"
            # 验证没有调用API
            mock_api.assert_not_called()
            # 验证访问了缓存
            mock_redis.get.assert_called_once()

class TestToolRegistry:
    def test_tool_registration(self):
        """测试工具注册"""
        registry = ToolRegistry()
        
        @registry.register_tool
        def test_tool(param1: str, param2: int = 10) -> str:
            """测试工具的描述
            
            Args:
                param1: 参数1说明
                param2: 参数2说明,默认10
            """
            return f"结果: {param1}-{param2}"
        
        # 验证工具已注册
        assert "test_tool" in registry._tools
        assert len(registry._tool_descriptions) == 1
        
        tool_desc = registry._tool_descriptions[0]
        assert tool_desc["name"] == "test_tool"
        assert tool_desc["description"] == "测试工具的描述"
        assert "param1" in tool_desc["parameters"]
        assert "param2" in tool_desc["parameters"]
    
    @pytest.mark.asyncio
    async def test_tool_execution(self):
        """测试工具执行"""
        registry = ToolRegistry()
        
        @registry.register_tool
        async def async_tool(input_text: str) -> dict:
            """异步测试工具"""
            return {"processed": input_text.upper()}
        
        result = await registry.execute_tool(
            "async_tool",
            {"input_text": "hello"},
            {"user_id": "test_user"}
        )
        
        assert result == {"processed": "HELLO"}

class TestInteractionLayer:
    def test_input_sanitization(self):
        """测试输入清洗"""
        layer = InteractionLayer(None, None)
        
        # 测试HTML转义
        input_text = "<script>alert('xss')</script>"
        cleaned = layer._sanitize_input(input_text)
        assert "<" not in cleaned  # 应该被转义
        
        # 测试长度限制
        long_text = "a" * 5000
        cleaned = layer._sanitize_input(long_text)
        assert len(cleaned) <= 4000  # 应该被截断
        
        # 测试空格规范化
        messy_text = "太多  空格   和\n换行"
        cleaned = layer._sanitize_input(messy_text)
        # 应该被规范化,但保留一个空格
        assert "  " not in cleaned  # 没有连续空格

@pytest.mark.integration
class TestIntegration:
    """集成测试"""
    
    @pytest.mark.asyncio
    async def test_full_flow(self):
        """测试完整流程"""
        # 初始化所有组件
        config_manager = ConfigManager(Environment.LOCAL)
        claude_config = config_manager.get_claude_config()
        
        # 使用模拟客户端避免真实API调用
        mock_client = AsyncMock()
        mock_response = MagicMock()
        mock_response.usage = MagicMock(input_tokens=50, output_tokens=100)
        mock_response.content = [MagicMock(text="集成测试响应")]
        mock_client.create_message.return_value = {
            "content": "集成测试响应",
            "usage": {"input_tokens": 50, "output_tokens": 100}
        }
        
        # 创建交互层
        interaction = InteractionLayer(
            session_store=AsyncMock(),
            user_profile_service=AsyncMock()
        )
        
        # 创建逻辑层
        prompt_engine = PromptEngine(
            system_prompt="你是一个测试助手",
            few_shot_examples=[]
        )
        
        # 创建工具注册表
        tool_registry = ToolRegistry()
        
        # 模拟完整流程
        user_input = "你好,测试一下"
        context = {"user_id": "test_user", "conversation_history": []}
        
        # 1. 交互层处理输入
        processed = await interaction.process_input(
            user_id="test_user",
            raw_input=user_input,
            context=context
        )
        
        # 2. 逻辑层构建提示
        messages = prompt_engine.build_conversation_prompt(
            user_input=processed["processed_input"],
            context=processed["context"]
        )
        
        # 3. 基础设施层调用API
        response = await mock_client.create_message(messages=messages)
        
        # 验证结果
        assert response["content"] == "集成测试响应"
        assert processed["session_id"] is not None
        assert len(messages) > 0

端到端测试要点

  1. 使用测试专用API密钥 :避免影响生产环境
  2. 模拟外部依赖 :数据库、缓存、第三方服务
  3. 测试错误处理 :网络错误、API限制、无效输入
  4. 性能基准测试 :建立性能基准,监控回归

7.2 质量监控与持续改进

测试不是一次性的,需要持续监控和改进。我们建立的质量监控体系包括:

质量指标监控

class QualityMonitor:
    def __init__(self, metrics_storage):
        self.storage = metrics_storage
        self.quality_metrics = {
            "response_relevance": 0.0,  # 响应相关性(0-1)
            "response_accuracy": 0.0,    # 响应准确性(0-1)
            "user_satisfaction": 0.0,    # 用户满意度(0-5)
            "task_completion_rate": 0.0, # 任务完成率(0-1)
            "error_rate": 0.0,           # 错误率
            "avg_response_time": 0.0,    # 平均响应时间(秒)
        }
        
    async def evaluate_response_quality(self, session_id: str, 
                                      user_input: str, 
                                      ai_response: str,
                                      expected_output: str = None) -> Dict:
        """评估响应质量"""
        evaluation = {}
        
        # 1. 自动评估指标
        evaluation["relevance"] = await self._calculate_relevance(
            user_input, ai_response
        )
        
        evaluation["readability"] = self._calculate_readability(ai_response)
        
        evaluation["safety_score"] = await self._calculate_safety_score(
            ai_response
        )
        
        # 2. 如果有预期输出,计算准确性
        if expected_output:
            evaluation["accuracy"] = await self._calculate_accuracy(
                ai_response, expected_output
            )
        
        # 3. 记录评估结果
        await self.storage.record_evaluation(session_id, evaluation)
        
        # 4. 更新质量指标
        self._update_quality_metrics(evaluation)
        
        # 5. 检查是否需要告警
        await self._check_quality_alerts(evaluation)
        
        return evaluation
    
    async def _calculate_relevance(self, query: str, response: str) -> float:
        """计算响应相关性"""
        # 方法1:基于嵌入向量的相似度
        try:
            # 使用句子嵌入模型计算相似度
            query_embedding = await self._get_embedding(query)
            response_embedding = await self._get_embedding(response)
            
            similarity = self._cosine_similarity(
                query_embedding, response_embedding
            )
            return float(similarity)
        except:
            # 方法2:基于关键词的重叠(回退方案)
            query_words = set(query.lower().split())
            response_words = set(response.lower().split())
            
            if not query_words:
                return 0.0
            
            overlap = len(query_words.intersection(response_words))
            return overlap / len(query_words)
    
    def _calculate_readability(self, text: str) -> float:
        """计算可读性分数"""
        # 使用Flesch Reading Ease公式
        sentences = len(re.findall(r'[.!?]+', text))
        words = len(text.split())
        syllables = sum(self._count_syllables(word) for word in text.split())
        
        if sentences == 0 or words == 0:
            return 0.0
        
        # Flesch Reading Ease公式
        score = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words)
        
        # 归一化到0-1
        return max(0.0, min(1.0, score / 100.0))
    
    async def _calculate_safety_score(self, text: str) -> float:
        """计算安全性分数"""
        # 检查是否有不安全内容
        unsafe_patterns = [
            r"(?i)hate speech|歧视|侮辱",
            r"(?i)violence|暴力|伤害",
            r"(?i)self-harm|自残|自杀",
            r"(?i)illegal|违法|犯罪",
            r"(?i)personal attack|人身攻击",
            r"(?i)harassment|骚扰",
            r"(?i)explicit content|色情|裸露",
        ]
        
        score = 1.0  # 初始分数
        
        for pattern in unsafe_patterns:
            if re.search(pattern, text):
                score -= 0.2  # 每匹配一个模式扣0.2分
        
        # 检查是否有泄露敏感信息
        security_mgr = SecurityManager({})
        sanitize_result = await security_mgr.sanitize_input(text, {})
        
        if sanitize_result["security_checks"]["has_sensitive_data"]:
            score -= 0.3
        
        if sanitize_result["security_checks"]["is_malicious"]:
            score -= 0.5
        
        return max(0.0, score)  # 确保不低于0
    
    async def record_user_feedback(self, session_id: str, rating: int, 
                                 feedback: str = None):
        """记录用户反馈"""
        # 存储反馈
        await self.storage.record_feedback(session_id, rating, feedback)
        
        # 更新满意度指标
        self.quality_metrics["user_satisfaction"] = await self._calculate_avg_satisfaction()
        
        # 如果评分低,触发详细分析
        if rating <= 2:
            await self._analyze_low_rating(session_id, rating, feedback)
    
    async def _analyze_low_rating(self, session_id: str, rating: int, 
                                feedback: str):
        """分析低分反馈"""
        # 获取会话详情
        session_data = await self.storage.get_session_data(session_id)
        
        analysis = {
            "session_id": session_id,
            "rating": rating,
            "feedback": feedback,
            "user_input": session_data.get("last_user_input"),
            "ai_response": session_data.get("last_ai_response"),
            "context": session_data.get("context"),
            "timestamp": datetime.utcnow().isoformat(),
            "automatic_analysis": {}
        }
        
        # 自动分析可能的原因
        if feedback:
            # 分析反馈文本
            analysis["automatic_analysis"]["feedback_keywords"] = self._extract_keywords(feedback)
        
        # 检查响应质量指标
        if session_data.get("quality_metrics"):
            qm = session_data["quality_metrics"]
            if qm.get("relevance", 1.0) < 0.5:
                analysis["automatic_analysis"]["low_relevance"] = True
            if qm.get("readability", 1.0) < 0.5:
                analysis["automatic_analysis"]["low_readability"] = True
        
        # 检查响应时间
        if session_data.get("response_time", 0) > 10.0:
            analysis["automatic_analysis"]["slow_response"] = True
        
        # 存储分析结果
        await self.storage.record_low_rating_analysis(analysis)
        
        # 触发告警
        if rating == 1:
            await self._trigger_alert(
                "critical_low_rating",
                f"严重低分反馈: session={session_id}, rating={rating}",
                details=analysis
            )

持续改进流程

  1. 定期质量评审 :每周审查低分反馈和错误案例
  2. 提示词优化 :基于用户反馈调整提示词
  3. A/B测试 :测试不同模型、参数、提示词的效果
  4. 性能优化 :监控慢查询,优化缓存策略
  5. 成本分析 :定期审查成本,优化使用模式

8. 实际案例:电商客服智能体

8.1 业务需求与架构设计

让我通过一个实际案例展示三层架构的应用。我们为一家中型电商平台构建客服智能体,需求如下:

业务需求

  1. 处理商品咨询、订单查询、售后问题
  2. 支持多轮对话,理解上下文
  3. 能够调用内部系统(订单系统、商品库、物流跟踪)
  4. 7x24小时服务,响应时间<3秒
  5. 支持中文和英文
  6. 月成本预算<5000元

架构设计

用户请求 → 负载均衡 → 交互层集群 → 逻辑层服务 → 基础设施层 → Claude API
                ↑           ↑           ↑           ↑
            监控告警     会话管理     业务逻辑    缓存/重试/限流

技术选型

  • 交互层 :FastAPI + WebSocket(实时对话)
  • 逻辑层 :Python + Redis(状态管理)
  • 基础设施层 :自定义客户端 + Redis缓存 + Prometheus监控
  • 部署 :Docker + Kube

更多推荐