Python WebSocket 实战:封装某店/某鸽客服消息监听与发送通道
本文以某店/某鸽客服 IM 场景为例,分享一个 Python WebSocket 客户端的封装思路:如何建立长连接、监听某鸽消息、设计回调、发送客服消息、处理重连以及关闭资源。
为了让示例更接近真实工程,文中会使用“买家消息、客服回复、某鸽会话字段”等概念来组织代码,但重点始终放在 WebSocket 客户端的工程设计上。
一、为什么某店/某鸽客服 IM 适合 WebSocket
某店/某鸽客服 IM 和普通后台接口不太一样。
普通接口大多是“我请求一次,你返回一次”。而客服消息是持续发生的:买家发消息、客服回复、系统事件、商品卡片、会话接入、客服转接等,都可能随时出现。
如果使用 HTTP 轮询,大概会遇到几个问题:
- 实时性不够好,消息可能延迟几秒甚至更久。
- 请求量大,大部分请求其实没有新消息。
- 状态维护麻烦,需要不断记录最后一条消息的位置。
- 发送和接收逻辑容易拆散,后期不好维护。
WebSocket 的优势在于建立一次连接后,客户端和服务端可以保持双向通信。服务端有新消息时可以主动推送,客户端也可以通过同一个通道发送消息。
在某店/某鸽客服 IM 场景里,一个比较理想的结构是:
建立 WebSocket 长连接
|
监听服务端推送消息
|
解析消息类型和某鸽会话字段
|
触发业务回调
|
按某鸽会话字段发送回复
这是某店/某鸽客服消息通道的基础结构,也适用于很多通知系统和协同工具。
二、一个可用的 WebSocket 客户端要解决什么
只写一个 websockets.connect() 并不难,真正麻烦的是工程化细节。
一个能在某店/某鸽客服场景里稳定使用的 WebSocket 封装,至少要考虑这些问题:
- 连接建立后,怎么确认已经可发送消息。
- 监听任务怎么放到后台持续运行。
- 服务端推送消息后,怎么分发给不同回调。
- 发送消息时,怎么复用当前连接。
- 连接断开后,是否需要自动重连。
- 程序退出时,怎么正确关闭连接和后台任务。
所以我更倾向于把 WebSocket 做成一层通用客户端,再在外面封装业务 API。
业务层不直接关心底层连接细节,只需要调用:
websocket, task = await create_pigeon_websocket(cookies)
用完后关闭:
await close_pigeon_websocket(websocket, task)
这样调用方的心智负担会低很多。
三、建立连接:把长连接交给后台任务
在 Python 异步程序里,WebSocket 监听通常是一个长期运行的协程。主流程不应该被它阻塞,否则后面就没法继续发送消息或处理其他逻辑。
因此可以把监听放到后台任务里运行:
websocket, task = await create_pigeon_websocket(
cookies=cookies,
on_buyer_message=on_buyer_message,
reconnect_interval_s=5.0,
connect_timeout_s=30.0,
)
这里几个参数的含义比较关键:
cookies:连接所需的鉴权上下文,实际项目中可以替换成登录态对象、会话凭证或其他鉴权载体。on_buyer_message:买家消息回调。reconnect_interval_s:断线后的重连间隔。connect_timeout_s:等待连接成功的超时时间。
封装时有一个细节很重要:创建 WebSocket 对象之后,不要立刻发送消息,应该先等待连接真正建立成功。
否则在网络稍慢时,很容易出现“连接对象已经创建,但底层 socket 还没准备好”的问题。
这一点最好收敛在连接封装内部,外层调用者不需要重复判断连接状态。
四、某鸽消息监听:用回调隔离业务逻辑
某鸽客服消息类型很多,不建议把所有业务逻辑都写在 WebSocket 客户端里面。
更好的方式是:客户端只负责接收、解析和分发;业务代码通过回调函数接收事件。
例如监听买家消息:
async def on_buyer_message(event):
print("收到买家消息:", event.get("content"))
print("回复所需的某鸽会话字段:", event.get("reply_params"))
然后建立连接时传入:
websocket, task = await create_pigeon_websocket(
cookies=cookies,
on_buyer_message=on_buyer_message,
)
这样设计的好处是很明显的。
WebSocket 底层只关心“消息来了没有”,业务层只关心“消息来了之后我要做什么”。
后面如果要接入自动回复、关键词识别、订单查询、某鸽客服转接,都可以在回调里扩展,而不需要改动 WebSocket 客户端本身。
五、回复消息:用某鸽会话字段而不是重新查一遍
某店/某鸽客服 IM 里发送消息通常不是只传一个文本就够了,还需要知道要发给哪个买家、哪个店铺、哪个会话、哪个子会话。
所以监听消息时,我会把后续回复需要的字段整理成 reply_params。
它大概长这样:
{
"uid": "买家安全 uid",
"mall_id": "店铺 id",
"conversation_short_id": "...",
"sub_conversation_short_id": "...",
"reply_key": "..."
}
有了这些某鸽会话字段,回复时就不需要重新根据订单号查一遍会话,可以直接复用当前消息里的上下文。
示例:
async def on_buyer_message(event):
reply_params = event["reply_params"]
await send_text_message(
websocket=websocket,
text="您好,已经收到您的消息,稍后为您处理。",
**reply_params,
)
这种方式适合做自动回复、快捷回复、关键词触发回复等功能。
不过实际落地时,我一般不建议一上来就做全自动回复,而是先做“监听 + 字段提取 + 人工确认发送”。等业务规则稳定后,再逐步自动化。
六、完整调用示例
下面是一段简化后的调用层代码,重点看 WebSocket 连接、买家消息回调、文本回复和资源关闭这几个动作。
import asyncio
from dy_listening import load_cookies
from dy_send_api import (
create_pigeon_websocket,
close_pigeon_websocket,
send_text_message,
)
async def main():
cookies = load_cookies()
websocket = None
task = None
async def on_buyer_message(event):
content = event.get("content")
reply_params = event.get("reply_params") or {}
print("收到买家消息:", content)
if "发货" in str(content):
await send_text_message(
websocket=websocket,
text="您好,您的订单正在处理中,请耐心等待。",
**reply_params,
)
try:
websocket, task = await create_pigeon_websocket(
cookies=cookies,
on_buyer_message=on_buyer_message,
reconnect_interval_s=5.0,
connect_timeout_s=30.0,
)
print("WebSocket 已连接,开始监听某鸽客服消息")
await asyncio.Event().wait()
finally:
if websocket is not None:
await close_pigeon_websocket(websocket, task)
if __name__ == "__main__":
asyncio.run(main())
这段代码主要体现几个关键点:
create_pigeon_websocket()负责建立长连接。on_buyer_message()负责处理买家消息。send_text_message()复用当前 WebSocket 连接发送回复。finally里调用close_pigeon_websocket()释放资源。
真实工程里,还可以继续扩展某鸽图片消息、视频素材消息、订单维度会话定位等能力。
七、自动重连和资源关闭
WebSocket 长连接一定要考虑断线。
断线原因可能很多,比如网络波动、服务端主动关闭、登录态过期、客户端异常等。
因此封装时最好把重连逻辑放到底层客户端里,而不是让每个业务函数都自己写一遍。
通常可以提供一个参数:
reconnect_interval_s=5.0
表示断开后间隔 5 秒再尝试连接。
另外,关闭资源也很重要。尤其是在异步程序中,如果只关闭 socket,不取消后台任务,程序可能无法正常退出。
所以我会把关闭动作封装成统一方法:
await close_pigeon_websocket(websocket, task)
它负责关闭连接,并清理后台监听任务。
这种封装看起来不复杂,但能减少很多线上运行时的小问题。
八、消息分发与异常隔离
WebSocket 客户端接收到的原始消息,通常不能直接交给业务逻辑处理。
更稳妥的做法是先做一层标准化,把不同来源、不同类型的消息整理成统一事件对象,再分发给对应回调。
例如可以把事件抽象成下面几类:
raw_message 原始消息
buyer_message 买家消息
seller_message 客服消息
system_message 系统消息
access_event 接入事件
业务层只关心自己需要的事件类型。
比如自动回复逻辑只订阅 buyer_message,会话接入逻辑只订阅 access_event,调试日志可以订阅 raw_message。
这样做有几个好处:
- 不同业务逻辑互不影响。
- 新增事件类型时,不需要重写连接层代码。
- 回调内部报错时,可以单独捕获和记录。
- 后续接入队列、规则引擎或人工工作台会更方便。
在实现时,我一般会让 WebSocket 客户端保持“轻业务”:
async def on_buyer_message(event):
content = event.get("content")
reply_params = event.get("reply_params")
print("买家消息:", content)
print("可用于回复的某鸽会话字段:", reply_params)
如果回调里需要调用外部接口,比如查询订单、匹配关键词、发送回复,也建议做好异常捕获。
async def on_buyer_message(event):
try:
content = event.get("content", "")
if "物流" in content:
await handle_logistics_question(event)
except Exception as exc:
print("处理买家消息失败:", exc)
长连接程序最怕一个业务异常把监听任务带崩。把消息分发、业务处理和异常隔离做好,后面运行起来会稳定很多。
九、可以继续扩展哪些能力
在 WebSocket 长连接打通之后,后续可以扩展的空间很大。
例如:
- 买家消息监听
- 关键词自动回复
- 按某店订单号定位某鸽会话
- 文本、图片、视频素材发送
- 某店商品卡片或系统事件识别
- 会话接入事件处理
- 某鸽客服转接
- 多账号或多店铺监听
这些能力并不是孤立的,它们都建立在同一套长连接和某鸽会话字段封装之上。
也就是说,只要 WebSocket 通道封装稳定,后面扩展业务功能会顺很多。
十、总结
本文用某店/某鸽客服 IM 场景举例,讲了一个 Python WebSocket 客户端在实际工程中应该如何封装。
核心思路可以总结为:
- WebSocket 负责长连接和实时消息。
- 回调函数负责承接业务逻辑。
- 回复消息依赖某鸽会话字段,而不是每次重新查询。
- 发送前要确保连接可用。
- 断线重连和资源关闭要统一封装。
这套结构适合某店/某鸽客服自动化、实时消息监听、自动回复辅助、会话接入提醒等工程场景。只要是需要长连接、服务端推送、客户端回写和事件回调的系统,也可以沿用类似的封装方式。
最后再补一个工程建议:WebSocket 客户端不要只关注“能连上”,还要关注“断了怎么办、回调报错怎么办、程序退出怎么办、消息字段怎么标准化”。这些问题处理好之后,长连接服务才更接近可长期运行的状态。
更多推荐



所有评论(0)