最近在做一个智能客服系统的重构项目,之前的老系统是基于规则匹配的,一到业务高峰期就卡顿、响应慢,意图识别也经常出错,用户体验很不好。经过一番技术选型,我们最终决定采用 agents-flex 框架来构建新一代的高并发智能客服系统。这篇文章就来分享一下我们的实战经验,从架构设计到代码实现,再到性能调优和踩过的坑,希望能给有类似需求的开发者一些参考。

智能客服系统架构示意图

1. 为什么选择 agents-flex?先聊聊传统方案的痛点

我们之前的客服系统,问题主要集中在三个方面:

1.1 并发处理能力弱 规则引擎在处理大量并发请求时,由于大量字符串匹配和逻辑判断,CPU消耗巨大,响应时间(RT)会线性增长。当QPS超过500时,平均响应延迟就从几十毫秒飙升到秒级,用户体验急剧下降。

1.2 意图识别准确率低 基于关键词和正则的规则,无法理解用户问句的真实意图。比如用户问“我怎么退不了款?”,规则可能只匹配到“退款”关键词,但无法区分这是“咨询退款流程”还是“投诉退款失败”。这导致大量问题需要转人工,客服成本居高不下。

1.3 动态扩缩容困难 老系统是单体架构,状态(如用户会话上下文)存在本地内存。想要水平扩展,就得引入复杂的会话同步机制,或者改成无状态设计,改动成本非常高,几乎无法应对“双十一”这类流量突增的场景。

基于这些痛点,我们开始寻找新的解决方案。我们横向对比了几个主流框架:

  • Rasa: 开源,NLU和对话管理功能强大,但部署和运维相对复杂,分布式会话管理需要自己基于Redis等中间件实现,在高并发场景下的性能调优有一定门槛。
  • Dialogflow (Google): 云服务,开箱即用,意图识别准确率高,但定制化能力受限,数据隐私性要求高的场景不适合,且成本随调用量增长。
  • agents-flex: 一个新兴的、专注于高并发和分布式场景的智能体框架。它的核心优势在于原生的分布式设计高性能的异步推理管道。其上下文管理模块天生支持与Redis等分布式存储集成,NLU模块可以方便地集成BERT等预训练模型并进行异步批处理,非常适合我们构建高并发、可弹性伸缩的客服系统。

综合考虑定制化需求、性能、成本和部署灵活性,我们最终选择了 agents-flex

2. 核心实现:用 agents-flex 搭建智能客服骨架

我们的新系统主要分为两大模块:意图识别模块会话状态管理模块

2.1 意图识别模块实现

这是智能客服的“大脑”。我们利用 agents-flex 的 NLUProcessor 来构建一个异步的意图分类流水线。

import asyncio
from typing import Dict, List
import numpy as np
from agents_flex.nlu import NLUProcessor, Intent
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F

class CustomerServiceIntentRecognizer(NLUProcessor):
    """
    自定义的客服意图识别处理器。
    继承自 agents-flex 的 NLUProcessor,实现异步推理。
    """
    def __init__(self, model_path: str, intent_labels: List[str]):
        super().__init__()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # 加载预训练的BERT分类模型和分词器
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_path).to(self.device)
        self.model.eval()  # 设置为评估模式
        self.intent_labels = intent_labels  # 意图标签列表,如 ['查询订单', '申请退款', '投诉建议']

    async def preprocess(self, text: str) -> Dict:
        """异步文本预处理:分词、编码"""
        # 使用tokenizer将文本转换为模型输入
        inputs = self.tokenizer(text,
                                truncation=True,
                                padding='max_length',
                                max_length=128,
                                return_tensors="pt")
        # 将数据转移到指定设备(GPU/CPU)
        return {k: v.to(self.device) for k, v in inputs.items()}

    async def inference(self, preprocessed_data: Dict) -> np.ndarray:
        """异步模型推理"""
        with torch.no_grad():  # 禁用梯度计算,提升推理速度
            outputs = self.model(**preprocessed_data)
            logits = outputs.logits
            # 使用softmax获取概率分布
            probabilities = F.softmax(logits, dim=-1)
        return probabilities.cpu().numpy()

    async def postprocess(self, inference_result: np.ndarray) -> Intent:
        """后处理:将模型输出转换为Intent对象"""
        # 获取概率最高的意图索引
        predicted_idx = np.argmax(inference_result, axis=-1)[0]
        confidence = inference_result[0][predicted_idx]
        intent_name = self.intent_labels[predicted_idx]

        # 返回 agents-flex 定义的 Intent 对象
        return Intent(
            name=intent_name,
            confidence=float(confidence),
            entities={}  # 本例暂不处理实体抽取
        )

# 使用示例
async def main():
    recognizer = CustomerServiceIntentRecognizer(
        model_path="./models/bert_customer_service",
        intent_labels=["查询订单", "申请退款", "产品咨询", "投诉建议", "其他"]
    )
    user_query = "我昨天买的手机什么时候能发货?"
    intent = await recognizer.process(user_query)
    print(f"识别意图: {intent.name}, 置信度: {intent.confidence:.4f}")

if __name__ == "__main__":
    asyncio.run(main())

这段代码的核心是继承并实现了 NLUProcessor 的三个异步方法。preprocess 负责文本向量化,inference 调用模型,postprocess 将模型输出转化为框架能理解的 Intent 对象。这种设计使得模型加载、推理可以完全异步化,轻松融入 agents-flex 的异步处理管道,为高并发打下基础。

2.2 会话状态管理设计

对于高并发客服系统,会话状态(如对话历史、用户信息、当前处理进度)必须持久化到外部存储,并保证在多实例环境下的数据一致性。我们采用 Redis Cluster + 分布式锁 的方案。

  • 存储结构:每个用户会话用一个唯一的 session_id 作为 Redis Key,Value 是一个 Hash 结构,存储对话轮次、历史消息、提取的槽位(Slots)信息等。
  • 分布式锁:当需要更新某个会话的状态时(例如,填充一个用户提供的订单号),使用 Redlock 算法在 Redis 上获取一个针对该 session_id 的分布式锁,防止多个请求同时修改导致状态错乱。
import json
import aioredis
from redis.asyncio import RedisCluster
from redis.asyncio.lock import Lock

class DistributedSessionManager:
    def __init__(self, redis_nodes):
        # 连接 Redis 集群
        self.redis_client = RedisCluster.from_nodes(redis_nodes)

    async def get_session(self, session_id: str) -> Dict:
        """获取会话状态"""
        data = await self.redis_client.hgetall(f"cs_session:{session_id}")
        return {k.decode(): json.loads(v.decode()) for k, v in data.items()} if data else {}

    async def update_session(self, session_id: str, updates: Dict, ttl: int = 1800):
        """更新会话状态(使用分布式锁保证原子性)"""
        lock_key = f"cs_session_lock:{session_id}"
        # 获取分布式锁,超时时间5秒
        lock = Lock(self.redis_client, lock_key, timeout=5)
        try:
            if await lock.acquire():
                # 获取当前状态
                current_state = await self.get_session(session_id)
                # 合并更新
                current_state.update(updates)
                # 写回Redis,并设置TTL(30分钟无活动则过期)
                pipe = self.redis_client.pipeline()
                for key, value in current_state.items():
                    pipe.hset(f"cs_session:{session_id}", key, json.dumps(value))
                pipe.expire(f"cs_session:{session_id}", ttl)
                await pipe.execute()
            else:
                raise Exception(f"Acquire lock failed for session: {session_id}")
        finally:
            await lock.release()  # 释放锁

    async def add_message(self, session_id: str, role: str, content: str):
        """向会话中添加一条消息记录"""
        message = {"role": role, "content": content, "timestamp": time.time()}
        # 使用列表存储历史消息,只保留最近50条
        await self.redis_client.lpush(f"cs_session:{session_id}:messages", json.dumps(message))
        await self.redis_client.ltrim(f"cs_session:{session_id}:messages", 0, 49)

通过这套设计,我们的客服系统实现了无状态化,任何一个服务实例都可以处理任何用户的请求,只需从 Redis 集群中读取/写入对应的会话上下文即可,为水平扩容扫清了障碍。

3. 性能优化:从压测数据到冷启动

架构搭好了,性能如何呢?我们进行了详细的压力测试。

3.1 压测数据对比

我们使用 Locust 模拟了从 100 到 2000 的并发用户,对“意图识别+会话更新”这个核心链路进行压测。

并发用户数 平均响应时间 (ms) QPS (每秒查询率) 错误率
100 45 2200 0%
500 68 7350 0%
1000 120 8330 0.1%
2000 250 8000 0.5%

(测试环境:4台 8核16G 的云服务器,Redis Cluster 3主3从,模型为裁剪后的BERT-base)

可以看到,在并发1000以内时,系统表现非常稳定,QPS线性增长。超过1000后,响应时间有所上升,主要瓶颈出现在模型推理的GPU资源上。但整体QPS维持在8000+,完全满足我们日常万级并发的需求,峰值时通过快速扩容实例也能应对。

3.2 冷启动优化

NLU模型通常比较大(几百MB到几GB),如果每次服务启动或实例扩容时才加载,会导致前几分钟的请求全部超时失败。我们采用了 模型预热加载健康检查隔离 的策略。

  1. 预热加载:在服务启动的初始化阶段,在 __init__ 方法中同步加载模型和分词器到内存/显存。虽然这会稍微增加启动时间,但避免了第一个请求的“冷启动惩罚”。
  2. 就绪探针 (Readiness Probe):在 Kubernetes 的 Deployment 配置中,设置一个就绪探针。该探针会在服务初始化完成后(即模型加载完毕),才返回成功。K8s 只有在就绪探针通过后,才会将流量导入该 Pod。这样就确保了所有对外服务的实例都是“热”的。
# Kubernetes Deployment 配置片段示例
spec:
  containers:
  - name: cs-nlu-service
    image: my-registry/cs-nlu:v1.0
    readinessProbe:
      httpGet:
        path: /health/ready # 服务内部实现的就绪检查端点
        port: 8080
      initialDelaySeconds: 30  # 给予足够的模型加载时间
      periodSeconds: 5

4. 避坑指南:生产环境中的那些“坑”

在实际上线和运行过程中,我们遇到了几个典型问题,这里分享下解决方案。

4.1 对话超时处理

用户可能中途离开,导致会话长时间挂起。如果不处理,会浪费大量Redis存储和会话锁资源。 我们的最佳实践是双层超时机制

  1. 会话级TTL:如上文代码所示,每次更新会话时,刷新 Redis Key 的 TTL(例如30分钟)。超过30分钟无互动,会话自动过期被清理。
  2. 轮次级超时:在对话逻辑内部,对于每一个等待用户回复的“槽位填充”步骤,设置一个更短的超时(如3分钟)。如果超时,则触发超时处理流程,例如发送提示“您还在吗?”,或者将会话状态重置,并释放分布式锁。

4.2 敏感词过滤的合规实现

客服对话必须符合监管要求。我们并没有在NLU模型层面处理,而是在预处理和后处理之间插入了一个异步的过滤组件。

  • 异步过滤:在 preprocess 之后,inference 之前,调用一个高效的异步敏感词过滤服务(例如基于DFA算法)。如果命中,则直接返回一个特定的“内容违规”意图,并终止后续的模型推理流程。
  • 词库热更新:过滤词库存储在 Redis 中,后台管理页面更新词库后,通过 Pub/Sub 通知所有服务实例实时 reload,无需重启服务。
class SensitiveFilterMiddleware:
    async def filter(self, text: str) -> bool:
        """返回True表示包含敏感词"""
        # 这里调用内部的敏感词检测服务
        # 可以是基于本地DFA树的快速检查
        pass

# 在 IntentRecognizer 的 process 方法中集成
async def process(self, text: str) -> Intent:
    # 1. 敏感词检查
    if await self.filter_middleware.filter(text):
        return Intent(name="content_violation", confidence=1.0)
    # 2. 正常流程:预处理、推理、后处理
    preprocessed = await self.preprocess(text)
    # ... 后续流程

5. 互动体验:欢迎来测压我们的Demo

纸上得来终觉浅。我们部署了一个简化版的演示系统,并开放了测试 API 端点,你可以体验不同并发下的系统表现。

API 端点POST https://demo.custservice.com/v1/recognize 请求体

{
  "session_id": "your_test_session",
  "query": "我要投诉物流太慢了!"
}

你可以使用 wrkJMeter 等工具,对这个端点进行压测,观察响应时间和成功率的变化。我们配置了自动扩缩容策略,当 CPU 利用率持续超过70%时,会自动增加实例,你可以看到响应时间曲线如何随着实例数增加而变得平缓。

性能监控仪表盘截图

总结

通过这次基于 agents-flex 的智能客服系统重构,我们成功解决了老系统在并发、准确率和弹性方面的核心痛点。agents-flex 在分布式和异步处理上的原生支持,让我们能够更专注于业务逻辑的实现,而非底层基础设施的搭建。目前系统已稳定运行了数月,经历了多次营销活动的流量考验。

当然,没有完美的方案。agents-flex 作为一个较新的框架,其社区生态和工具链相比 Rasa 还有差距,有些高级功能需要自己动手实现。但它在性能和高并发场景下的表现,确实令人印象深刻。如果你也在为构建高性能、可扩展的对话系统而烦恼,agents-flex 绝对值得你花时间深入了解一下。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐