本文以某店/某鸽客服 IM 场景为例,分享一个 Python WebSocket 客户端的封装思路:如何建立长连接、监听某鸽消息、设计回调、发送客服消息、处理重连以及关闭资源。

为了让示例更接近真实工程,文中会使用“买家消息、客服回复、某鸽会话字段”等概念来组织代码,但重点始终放在 WebSocket 客户端的工程设计上。

一、为什么某店/某鸽客服 IM 适合 WebSocket

某店/某鸽客服 IM 和普通后台接口不太一样。

普通接口大多是“我请求一次,你返回一次”。而客服消息是持续发生的:买家发消息、客服回复、系统事件、商品卡片、会话接入、客服转接等,都可能随时出现。

如果使用 HTTP 轮询,大概会遇到几个问题:

  • 实时性不够好,消息可能延迟几秒甚至更久。
  • 请求量大,大部分请求其实没有新消息。
  • 状态维护麻烦,需要不断记录最后一条消息的位置。
  • 发送和接收逻辑容易拆散,后期不好维护。

WebSocket 的优势在于建立一次连接后,客户端和服务端可以保持双向通信。服务端有新消息时可以主动推送,客户端也可以通过同一个通道发送消息。

在某店/某鸽客服 IM 场景里,一个比较理想的结构是:

建立 WebSocket 长连接
        |
监听服务端推送消息
        |
解析消息类型和某鸽会话字段
        |
触发业务回调
        |
按某鸽会话字段发送回复

这是某店/某鸽客服消息通道的基础结构,也适用于很多通知系统和协同工具。

二、一个可用的 WebSocket 客户端要解决什么

只写一个 websockets.connect() 并不难,真正麻烦的是工程化细节。

一个能在某店/某鸽客服场景里稳定使用的 WebSocket 封装,至少要考虑这些问题:

  • 连接建立后,怎么确认已经可发送消息。
  • 监听任务怎么放到后台持续运行。
  • 服务端推送消息后,怎么分发给不同回调。
  • 发送消息时,怎么复用当前连接。
  • 连接断开后,是否需要自动重连。
  • 程序退出时,怎么正确关闭连接和后台任务。

所以我更倾向于把 WebSocket 做成一层通用客户端,再在外面封装业务 API。

业务层不直接关心底层连接细节,只需要调用:

websocket, task = await create_pigeon_websocket(cookies)

用完后关闭:

await close_pigeon_websocket(websocket, task)

这样调用方的心智负担会低很多。

三、建立连接:把长连接交给后台任务

在 Python 异步程序里,WebSocket 监听通常是一个长期运行的协程。主流程不应该被它阻塞,否则后面就没法继续发送消息或处理其他逻辑。

因此可以把监听放到后台任务里运行:

websocket, task = await create_pigeon_websocket(
    cookies=cookies,
    on_buyer_message=on_buyer_message,
    reconnect_interval_s=5.0,
    connect_timeout_s=30.0,
)

这里几个参数的含义比较关键:

  • cookies:连接所需的鉴权上下文,实际项目中可以替换成登录态对象、会话凭证或其他鉴权载体。
  • on_buyer_message:买家消息回调。
  • reconnect_interval_s:断线后的重连间隔。
  • connect_timeout_s:等待连接成功的超时时间。

封装时有一个细节很重要:创建 WebSocket 对象之后,不要立刻发送消息,应该先等待连接真正建立成功。

否则在网络稍慢时,很容易出现“连接对象已经创建,但底层 socket 还没准备好”的问题。

这一点最好收敛在连接封装内部,外层调用者不需要重复判断连接状态。

四、某鸽消息监听:用回调隔离业务逻辑

某鸽客服消息类型很多,不建议把所有业务逻辑都写在 WebSocket 客户端里面。

更好的方式是:客户端只负责接收、解析和分发;业务代码通过回调函数接收事件。

例如监听买家消息:

async def on_buyer_message(event):
    print("收到买家消息:", event.get("content"))
    print("回复所需的某鸽会话字段:", event.get("reply_params"))

然后建立连接时传入:

websocket, task = await create_pigeon_websocket(
    cookies=cookies,
    on_buyer_message=on_buyer_message,
)

这样设计的好处是很明显的。

WebSocket 底层只关心“消息来了没有”,业务层只关心“消息来了之后我要做什么”。

后面如果要接入自动回复、关键词识别、订单查询、某鸽客服转接,都可以在回调里扩展,而不需要改动 WebSocket 客户端本身。

五、回复消息:用某鸽会话字段而不是重新查一遍

某店/某鸽客服 IM 里发送消息通常不是只传一个文本就够了,还需要知道要发给哪个买家、哪个店铺、哪个会话、哪个子会话。

所以监听消息时,我会把后续回复需要的字段整理成 reply_params

它大概长这样:

{
    "uid": "买家安全 uid",
    "mall_id": "店铺 id",
    "conversation_short_id": "...",
    "sub_conversation_short_id": "...",
    "reply_key": "..."
}

有了这些某鸽会话字段,回复时就不需要重新根据订单号查一遍会话,可以直接复用当前消息里的上下文。

示例:

async def on_buyer_message(event):
    reply_params = event["reply_params"]

    await send_text_message(
        websocket=websocket,
        text="您好,已经收到您的消息,稍后为您处理。",
        **reply_params,
    )

这种方式适合做自动回复、快捷回复、关键词触发回复等功能。

不过实际落地时,我一般不建议一上来就做全自动回复,而是先做“监听 + 字段提取 + 人工确认发送”。等业务规则稳定后,再逐步自动化。

六、完整调用示例

下面是一段简化后的调用层代码,重点看 WebSocket 连接、买家消息回调、文本回复和资源关闭这几个动作。

import asyncio

from dy_listening import load_cookies
from dy_send_api import (
    create_pigeon_websocket,
    close_pigeon_websocket,
    send_text_message,
)


async def main():
    cookies = load_cookies()
    websocket = None
    task = None

    async def on_buyer_message(event):
        content = event.get("content")
        reply_params = event.get("reply_params") or {}

        print("收到买家消息:", content)

        if "发货" in str(content):
            await send_text_message(
                websocket=websocket,
                text="您好,您的订单正在处理中,请耐心等待。",
                **reply_params,
            )

    try:
        websocket, task = await create_pigeon_websocket(
            cookies=cookies,
            on_buyer_message=on_buyer_message,
            reconnect_interval_s=5.0,
            connect_timeout_s=30.0,
        )

        print("WebSocket 已连接,开始监听某鸽客服消息")
        await asyncio.Event().wait()

    finally:
        if websocket is not None:
            await close_pigeon_websocket(websocket, task)


if __name__ == "__main__":
    asyncio.run(main())

这段代码主要体现几个关键点:

  • create_pigeon_websocket() 负责建立长连接。
  • on_buyer_message() 负责处理买家消息。
  • send_text_message() 复用当前 WebSocket 连接发送回复。
  • finally 里调用 close_pigeon_websocket() 释放资源。

真实工程里,还可以继续扩展某鸽图片消息、视频素材消息、订单维度会话定位等能力。

七、自动重连和资源关闭

WebSocket 长连接一定要考虑断线。

断线原因可能很多,比如网络波动、服务端主动关闭、登录态过期、客户端异常等。

因此封装时最好把重连逻辑放到底层客户端里,而不是让每个业务函数都自己写一遍。

通常可以提供一个参数:

reconnect_interval_s=5.0

表示断开后间隔 5 秒再尝试连接。

另外,关闭资源也很重要。尤其是在异步程序中,如果只关闭 socket,不取消后台任务,程序可能无法正常退出。

所以我会把关闭动作封装成统一方法:

await close_pigeon_websocket(websocket, task)

它负责关闭连接,并清理后台监听任务。

这种封装看起来不复杂,但能减少很多线上运行时的小问题。

八、消息分发与异常隔离

WebSocket 客户端接收到的原始消息,通常不能直接交给业务逻辑处理。

更稳妥的做法是先做一层标准化,把不同来源、不同类型的消息整理成统一事件对象,再分发给对应回调。

例如可以把事件抽象成下面几类:

raw_message      原始消息
buyer_message    买家消息
seller_message   客服消息
system_message   系统消息
access_event     接入事件

业务层只关心自己需要的事件类型。

比如自动回复逻辑只订阅 buyer_message,会话接入逻辑只订阅 access_event,调试日志可以订阅 raw_message

这样做有几个好处:

  • 不同业务逻辑互不影响。
  • 新增事件类型时,不需要重写连接层代码。
  • 回调内部报错时,可以单独捕获和记录。
  • 后续接入队列、规则引擎或人工工作台会更方便。

在实现时,我一般会让 WebSocket 客户端保持“轻业务”:

async def on_buyer_message(event):
    content = event.get("content")
    reply_params = event.get("reply_params")

    print("买家消息:", content)
    print("可用于回复的某鸽会话字段:", reply_params)

如果回调里需要调用外部接口,比如查询订单、匹配关键词、发送回复,也建议做好异常捕获。

async def on_buyer_message(event):
    try:
        content = event.get("content", "")

        if "物流" in content:
            await handle_logistics_question(event)

    except Exception as exc:
        print("处理买家消息失败:", exc)

长连接程序最怕一个业务异常把监听任务带崩。把消息分发、业务处理和异常隔离做好,后面运行起来会稳定很多。

九、可以继续扩展哪些能力

在 WebSocket 长连接打通之后,后续可以扩展的空间很大。

例如:

  • 买家消息监听
  • 关键词自动回复
  • 按某店订单号定位某鸽会话
  • 文本、图片、视频素材发送
  • 某店商品卡片或系统事件识别
  • 会话接入事件处理
  • 某鸽客服转接
  • 多账号或多店铺监听

这些能力并不是孤立的,它们都建立在同一套长连接和某鸽会话字段封装之上。

也就是说,只要 WebSocket 通道封装稳定,后面扩展业务功能会顺很多。

十、总结

本文用某店/某鸽客服 IM 场景举例,讲了一个 Python WebSocket 客户端在实际工程中应该如何封装。

核心思路可以总结为:

  • WebSocket 负责长连接和实时消息。
  • 回调函数负责承接业务逻辑。
  • 回复消息依赖某鸽会话字段,而不是每次重新查询。
  • 发送前要确保连接可用。
  • 断线重连和资源关闭要统一封装。

这套结构适合某店/某鸽客服自动化、实时消息监听、自动回复辅助、会话接入提醒等工程场景。只要是需要长连接、服务端推送、客户端回写和事件回调的系统,也可以沿用类似的封装方式。

最后再补一个工程建议:WebSocket 客户端不要只关注“能连上”,还要关注“断了怎么办、回调报错怎么办、程序退出怎么办、消息字段怎么标准化”。这些问题处理好之后,长连接服务才更接近可长期运行的状态。

更多推荐