手把手封装一个 Python 微信 API SDK(带重试与异常处理)
调用第三方 HTTP API 时,最怕的不是接口不好用,而是你的代码一旦出错就满屏 traceback,既不知道是超时还是业务错误,重试逻辑也散落在各处。本文以微信 HTTP API 为例,从零封装一个带重试、带结构化异常、带日志的 Python SDK,让调用方只关心业务逻辑,不再为底层网络抖动焦虑。
一、为什么要封装而不是直接 requests.post?
很多人刚接入微信 API,第一版代码往往长这样:
import requests
resp = requests.post(
"https://xxx/postText",
headers={"VideosApi-token": "your_token"},
json={"appId": "xxx", "toWxid": "yyy", "content": "hello"}
)
print(resp.json())
能跑,但脆:
- 网络抖动:requests 默认没有重试,一次超时直接
ConnectionError。 - 业务错误混入:HTTP 200 但
ret != 200,调用方没有统一拦截,错误静默丢失。 - token/appId 到处复制:换个接口就要重新写 headers,维护成本高。
- 日志缺失:出了问题找不到是哪个接口、哪个时间点出错。
封装的目的就是把这四件事收拢到一个地方,让业务代码干净到只剩下"我要发消息"这一行。
随着业务接口越来越多——发文本、发图片、发文件、拉群列表、查好友——裸写 requests.post 的问题会被放大:每个接口都要手动拼 headers,错误处理逻辑散在各处,某天 token 换了还要全局搜索替换。这种代码在项目初期或许还能维护,一旦进入多人协作或接入 CI/CD,代价就会成倍放大。更重要的是,封装不只是减少重复,更是在强制规范团队的调用行为——超时多少秒、重试几次、哪些错误要告警,都收进一个地方统一管理。
二、项目结构
wechat_sdk/
├── __init__.py
├── client.py # 核心 HTTP 客户端(重试 + 异常)
├── exceptions.py # 自定义异常层次
├── api/
│ ├── __init__.py
│ ├── message.py # 消息相关接口
│ ├── contact.py # 联系人接口
│ └── group.py # 群组接口
└── utils/
├── logger.py # 统一日志
└── retry.py # 重试策略
目录不复杂,关键在于分层清晰:client 只管 HTTP,api/ 只管拼参数,调用方只管传业务数据*。
这个分层结构有一个实际好处:当官方文档更新了某个接口的字段,你只需要改 api/ 目录下对应的方法,不需要动 client.py 和异常体系。反过来,如果平台换了鉴权方式,只改 client.py 里的 headers 配置,业务层完全不感知。改动范围的可控性,是衡量封装质量的重要指标。
三、先定义异常体系
好的 SDK,异常要可区分、可捕获。把所有微信 API 异常统一挂在 WechatApiError 下:
# wechat_sdk/exceptions.py
class WechatApiError(Exception):
"""所有微信 API 异常的基类"""
pass
class NetworkError(WechatApiError):
"""网络层错误:超时、连接失败等"""
def __init__(self, message: str, original: Exception = None):
super().__init__(message)
self.original = original
class ApiError(WechatApiError):
"""业务层错误:HTTP 200 但 ret != 200"""
def __init__(self, ret: int, msg: str, data=None):
super().__init__(f"API Error [{ret}]: {msg}")
self.ret = ret
self.msg = msg
self.data = data
class AuthError(ApiError):
"""Token 无效或过期(ret == 401)"""
pass
class RateLimitError(ApiError):
"""频率超限(ret == 429 或业务提示频率过高)"""
pass
这样调用方可以按需捕获:只关心网络就捕 NetworkError,只关心频率就捕 RateLimitError,兜底用 WechatApiError。
异常体系的设计有几个需要特别注意的细节。第一,NetworkError 保留了 original 字段,用来存放原始的 requests.exceptions 异常对象,方便在日志里打出底层堆栈,定位是 DNS 解析失败还是 TCP 握手超时。第二,ApiError 把 ret、msg、data 都存下来,调用方可以在捕获异常后读取 e.ret 决定是否需要告警,而不是解析字符串。第三,AuthError 继承 ApiError 而不是 WechatApiError,这样捕获 ApiError 的代码天然也能捕获鉴权错误,不需要额外的 except AuthError 分支。
四、重试策略封装
微信 API 的网络抖动多发生在超时和 5xx 场景,业务错误(频率超限)则需要等待后重试,而鉴权失败永远不该重试。把这个逻辑独立出来:
# wechat_sdk/utils/retry.py
import time
import random
import logging
from functools import wraps
from wechat_sdk.exceptions import NetworkError, RateLimitError, AuthError
logger = logging.getLogger("wechat_sdk.retry")
def with_retry(max_attempts: int = 3, base_delay: float = 1.0, backoff: float = 2.0):
"""
指数退避重试装饰器。
- NetworkError / RateLimitError 会重试
- AuthError 及其他 ApiError 立即抛出,不重试
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = base_delay
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except AuthError:
raise # 鉴权错误不重试
except RateLimitError as e:
last_exc = e
wait = delay + random.uniform(0, 0.5) # 加随机抖动
logger.warning(f"[Attempt {attempt}/{max_attempts}] 频率超限,{wait:.1f}s 后重试")
time.sleep(wait)
delay *= backoff
except NetworkError as e:
last_exc = e
wait = delay + random.uniform(0, 0.3)
logger.warning(f"[Attempt {attempt}/{max_attempts}] 网络错误: {e},{wait:.1f}s 后重试")
time.sleep(wait)
delay *= backoff
raise last_exc
return wrapper
return decorator
backoff=2.0 意味着第一次等 1s,第二次等 2s,第三次等 4s,同时加了随机抖动防止多个进程同时重试"踩踏"。
关于重试次数的选择:max_attempts=3 是一个经验值。对于微信消息发送这类接口,3 次重试足以覆盖绝大多数短暂的网络抖动,同时不会在对端真正故障时让请求拖太久。如果你的业务对消息实时性要求极高(比如客服系统),可以把 max_attempts 调低到 2,base_delay 调到 0.5s,牺牲一点容错换取快速失败。反之,如果是低优先级的批量任务,可以调高到 5 次,配合更长的退避时间。
常见坑:装饰器里的 last_exc 在第一次就成功时是 None,需要确保正常返回路径不会走到 raise last_exc。这里用 return func(*args, **kwargs) 直接返回,所以不会有问题。但如果你改写成 while 循环,务必检查循环出口条件,否则全部重试完后 raise None 会产生一个令人困惑的 TypeError。
五、核心 HTTP 客户端
# wechat_sdk/client.py
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from wechat_sdk.exceptions import NetworkError, ApiError, AuthError, RateLimitError
from wechat_sdk.utils.retry import with_retry
logger = logging.getLogger("wechat_sdk.client")
# 占位符配置,注册后从官方文档 post.wechatapi.net 获取
BASE = "https://你的接口域名" # 注册后在官方文档获取
TOKEN = "你的Token"
APPID = "你的appId"
HEADERS = {"VideosApi-token": TOKEN}
class WechatClient:
def __init__(self, base_url: str = BASE, token: str = TOKEN, appid: str = APPID,
timeout: int = 10, max_retries: int = 3):
self.base_url = base_url.rstrip("/")
self.appid = appid
self.timeout = timeout
self.max_retries = max_retries
self._session = requests.Session()
self._session.headers.update({"VideosApi-token": token, "Content-Type": "application/json"})
# urllib3 层面处理 TCP 连接重试(非业务重试)
adapter = HTTPAdapter(max_retries=Retry(total=2, backoff_factor=0.3,
status_forcelist=[500, 502, 503]))
self._session.mount("https://", adapter)
self._session.mount("http://", adapter)
@with_retry(max_attempts=3, base_delay=1.0)
def post(self, path: str, payload: dict) -> dict:
"""
统一 POST 入口。自动注入 appId,统一异常转换。
代码为示例,具体接口/字段以官方文档 post.wechatapi.net 为准。
"""
url = f"{self.base_url}{path}"
body = {"appId": self.appid, **payload}
logger.debug(f"POST {url} | payload keys: {list(body.keys())}")
try:
resp = self._session.post(url, json=body, timeout=self.timeout)
resp.raise_for_status()
except requests.exceptions.Timeout as e:
raise NetworkError("请求超时", original=e)
except requests.exceptions.ConnectionError as e:
raise NetworkError("连接失败", original=e)
except requests.exceptions.HTTPError as e:
raise NetworkError(f"HTTP 错误: {e.response.status_code}", original=e)
data = resp.json()
ret = data.get("ret", -1)
if ret == 200:
logger.debug(f"POST {path} 成功")
return data.get("data", {})
msg = data.get("msg", "未知错误")
if ret == 401:
raise AuthError(ret=ret, msg=msg)
if ret == 429 or "频率" in msg:
raise RateLimitError(ret=ret, msg=msg)
raise ApiError(ret=ret, msg=msg, data=data.get("data"))
注意几个设计决策:
HTTPAdapter处理 TCP 级别抖动,@with_retry处理业务级抖动,两层各司其职。post()自动注入appId,调用方不需要每次传。- 返回值直接是
data字段内容,不需要调用方再.get("data")。
关于 timeout 参数:timeout=10 是单次请求超时时间(秒),传入 requests.Session.post 时是连接+读取的总时间上限。如果你的业务里有发送大文件的接口(比如发视频),应该针对该接口单独传更大的 timeout,而不是把全局超时调高,否则普通文本消息也会等很久才报超时。requests 支持传 timeout=(connect_timeout, read_timeout) 元组,可以把连接超时控制在 3 秒内,读取超时按需配置。
关于 Session 的复用:requests.Session 会复用底层 TCP 连接(HTTP keep-alive),对同一个域名的多次请求性能明显优于每次创建新的 requests 对象。在批量发消息场景下,这个差距会更明显。WechatClient 持有一个 _session 实例,因此整个 SDK 生命周期内都在复用同一个连接池,这是有意为之的设计。
六、业务接口层:消息模块
有了客户端,业务模块就只剩拼参数:
# wechat_sdk/api/message.py
from wechat_sdk.client import WechatClient
class MessageApi:
def __init__(self, client: WechatClient):
self.client = client
def send_text(self, to_wxid: str, content: str, ats: list = None) -> dict:
"""发送文本消息"""
payload = {"toWxid": to_wxid, "content": content}
if ats:
payload["ats"] = ",".join(ats)
return self.client.post("/postText", payload)
def send_image(self, to_wxid: str, image_url: str) -> dict:
"""发送图片(URL 方式)"""
return self.client.post("/postImage", {"toWxid": to_wxid, "imageUrl": image_url})
def send_link(self, to_wxid: str, title: str, desc: str, url: str, thumb_url: str = "") -> dict:
"""发送链接卡片"""
return self.client.post("/postLink", {
"toWxid": to_wxid, "title": title,
"desc": desc, "url": url, "thumbUrl": thumb_url
})
def revoke(self, to_wxid: str, msg_id: str) -> dict:
"""撤回消息"""
return self.client.post("/revokeMsg", {"toWxid": to_wxid, "msgId": msg_id})
参数说明:to_wxid 是接收方的微信 ID,群消息传群 ID(通常以 @chatroom 结尾),单聊传对方的 wxid。ats 是群 @ 列表,填 wxid 即可,SDK 内部会转成逗号分隔的字符串;如果要 @所有人,通常传 ["notify@all"],具体值以官方文档 post.wechatapi.net 为准。msg_id 是发送成功后平台返回的消息 ID,撤回时用,注意撤回有时间窗口限制,过期后接口会返回业务错误而不是抛异常,需要自行处理返回值。
常见坑:send_image 传的是图片 URL,不是本地路径。如果图片在本地,需要先上传到 CDN 或使用平台提供的上传接口拿到 URL,再调用该方法。直接传 file:// 路径不会报错,但对方收不到图片。
七、联系人与群组模块(精简版)
# wechat_sdk/api/contact.py
class ContactApi:
def __init__(self, client):
self.client = client
def search(self, keyword: str) -> dict:
return self.client.post("/search", {"keyword": keyword})
def add_friend(self, to_wxid: str, remark: str = "") -> dict:
return self.client.post("/addContacts", {"toWxid": to_wxid, "remark": remark})
def set_remark(self, to_wxid: str, remark: str) -> dict:
return self.client.post("/setFriendRemark", {"toWxid": to_wxid, "remark": remark})
def fetch_list(self) -> dict:
return self.client.post("/fetchContactsList", {})
# wechat_sdk/api/group.py
class GroupApi:
def __init__(self, client):
self.client = client
def create(self, member_list: list) -> dict:
return self.client.post("/createChatroom", {"memberList": member_list})
def invite(self, chatroom_id: str, member_list: list) -> dict:
return self.client.post("/inviteMember", {"chatroomId": chatroom_id, "memberList": member_list})
def kick(self, chatroom_id: str, member_list: list) -> dict:
return self.client.post("/removeMember", {"chatroomId": chatroom_id, "memberList": member_list})
def set_announcement(self, chatroom_id: str, content: str) -> dict:
return self.client.post("/setChatroomAnnouncement", {"chatroomId": chatroom_id, "content": content})
关于 fetch_list 的注意事项:拉取联系人列表是一个相对耗时的接口,返回数据量较大时响应时间可能超过默认的 10 秒。建议在调用前临时增大 client.timeout,或者直接在 SDK 初始化时给这个场景单独创建一个 timeout 更大的客户端实例。另外,联系人列表接口有调用频率限制,不要在循环里高频调用,应在本地缓存结果并设置合理的刷新周期。
八、统一入口:SDK 门面
# wechat_sdk/__init__.py
from wechat_sdk.client import WechatClient
from wechat_sdk.api.message import MessageApi
from wechat_sdk.api.contact import ContactApi
from wechat_sdk.api.group import GroupApi
from wechat_sdk.exceptions import WechatApiError, NetworkError, ApiError, AuthError, RateLimitError
class WechatSdk:
"""
微信 API SDK 门面类。
示例配置请参考官方文档 post.wechatapi.net,
base_url / token / appid 均为占位符,注册后替换。
"""
def __init__(self, base_url: str, token: str, appid: str, timeout: int = 10):
client = WechatClient(base_url=base_url, token=token, appid=appid, timeout=timeout)
self.message = MessageApi(client)
self.contact = ContactApi(client)
self.group = GroupApi(client)
__all__ = [
"WechatSdk",
"WechatApiError", "NetworkError", "ApiError", "AuthError", "RateLimitError"
]
门面类(Facade)的价值在于:调用方只需要 from wechat_sdk import WechatSdk 一行导入,不需要了解内部模块划分。sdk.message、sdk.contact、sdk.group 的命名空间也足够清晰,IDE 的自动补全能直接列出所有可用方法,降低上手成本。如果后续需要扩展新模块(比如朋友圈、支付),在 WechatSdk.__init__ 里加一行挂载即可,不需要修改任何外层接口。
九、调用示例
import logging
from wechat_sdk import WechatSdk, NetworkError, RateLimitError, AuthError
logging.basicConfig(level=logging.INFO)
# 三个占位符,注册后在官方文档 post.wechatapi.net 查看
sdk = WechatSdk(
base_url="https://你的接口域名",
token="你的Token",
appid="你的appId"
)
try:
result = sdk.message.send_text(to_wxid="filehelper", content="Hello from SDK!")
print("发送成功:", result)
except AuthError:
print("Token 无效,请检查配置")
except RateLimitError as e:
print(f"频率超限: {e.msg},已自动重试 3 次仍失败")
except NetworkError as e:
print(f"网络异常: {e}")
调用方的 try/except 结构清晰,每种错误有明确的处理路径,不再是一刀切的 except Exception。
生产环境建议:不要把 TOKEN、APPID 等凭证硬编码在源码里,应该从环境变量或配置文件读取。可以在 SDK 初始化前加一段读取逻辑:
import os
sdk = WechatSdk(
base_url=os.environ["WECHAT_BASE_URL"],
token=os.environ["WECHAT_TOKEN"],
appid=os.environ["WECHAT_APPID"]
)
这样既方便部署,也避免凭证随代码泄露到版本控制系统。
十、使用 WechatApi 平台的几点实践建议
实际跑这套 SDK 时,WechatApi 这类托管微信 HTTP API 平台已经把登录保活、设备管理这些脏活封装好了,你只需要关注业务逻辑。不过有几条操作规范值得写进你的 SDK 配置里:
| 操作类型 | 建议频率 |
|---|---|
| 主动加好友 | 每 2 小时 ≤ 5 个,每天 5–15 个 |
| 搜索用户 | 每天 10–20 次 |
| 建群 | 每天 ≤ 10 个,间隔 10 分钟以上 |
| 朋友圈点赞/评论 | 随机间隔 5–20 秒 |
| 批量下载(图/文件) | 每条间隔 3–10 秒,做队列 |
| 新设备首次调用 | 在线 3 天后再发批量操作 |
这些不是平台限制,是微信风控的行为特征识别逻辑——拟人化、控频才是核心。批量发图建议先上传一次拿到 CDN URL,后续用转发接口复用,避免反复上传触发异常检测。
从 SDK 设计角度来说,这个控频需求最好也写进代码里,而不是依赖调用方自觉。可以在 MessageApi 里维护一个上次调用时间戳,每次 send_* 前检查间隔,不足时 time.sleep() 补足。这样即使调用方在循环里猛发,SDK 层也能自动限速,而不是等到平台返回 RateLimitError 再触发重试等待。
十一、消息回调处理(Flask 示例)
SDK 负责发消息,收消息则依赖平台推送到你的回调地址(通过 setCallback 接口注册)。用 Flask 做一个最简回调服务:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/wechat/callback", methods=["POST"])
def callback():
data = request.json
msg_type = data.get("type")
from_wxid = data.get("fromWxid")
content = data.get("content")
# 具体字段以官方文档 post.wechatapi.net 为准
if msg_type == 1: # 文本消息(示例类型值,实际以文档为准)
print(f"收到来自 {from_wxid} 的消息: {content}")
# 异步处理,避免同步阻塞超时
# process_async(data)
return jsonify({"code": 200}) # 必须返回 200,否则平台认为推送失败会重试
if __name__ == "__main__":
app.run(port=5000)
关键:回调接口必须公网可达,且必须返回 HTTP 200,否则平台会认为推送失败并重试,可能造成消息重复处理。在回调里不要做同步的耗时操作(如直接下载文件),应该把任务推入队列异步处理。
回调的几个常见坑:
第一,幂等性处理。平台在网络抖动时可能重复推送同一条消息,消息有唯一 ID 字段,建议在处理前用 Redis 或数据库记录已处理的消息 ID,收到重复消息直接返回 200 跳过,不重复执行业务逻辑。
第二,签名验证。部分平台支持在回调请求里附带签名,用于验证推送方身份,防止伪造请求。如果平台有此能力,务必启用,具体验证逻辑以官方文档 post.wechatapi.net 为准。
第三,本地开发调试。开发阶段服务在本地跑,没有公网 IP,可以用 frp、ngrok 等内网穿透工具临时暴露本地端口,在平台注册穿透后的公网地址作为回调 URL,调试完成再换成正式地址。
十二、单元测试:mock 掉网络
封装好分层之后,测试变得容易很多——只需要 mock WechatClient.post,不需要真实网络:
# tests/test_message.py
import pytest
from unittest.mock import MagicMock
from wechat_sdk.client import WechatClient
from wechat_sdk.api.message import MessageApi
from wechat_sdk.exceptions import RateLimitError
def make_api():
client = MagicMock(spec=WechatClient)
return MessageApi(client), client
def test_send_text_success():
api, client = make_api()
client.post.return_value = {"msgId": "fake_msg_id_001"}
result = api.send_text("filehelper", "hello")
assert result["msgId"] == "fake_msg_id_001"
client.post.assert_called_once_with("/postText", {"toWxid": "filehelper", "content": "hello"})
def test_send_text_rate_limit_propagates():
api, client = make_api()
client.post.side_effect = RateLimitError(ret=429, msg="频率超限")
with pytest.raises(RateLimitError):
api.send_text("filehelper", "hello")
def test_send_text_with_at():
api, client = make_api()
client.post.return_value = {}
api.send_text("room@chatroom", "大家好", ats=["wxid_aaa", "wxid_bbb"])
_, kwargs = client.post.call_args
# ats 被拼成逗号分隔字符串
assert client.post.call_args[0][1]["ats"] == "wxid_aaa,wxid_bbb"
这三个测试用例覆盖了正常路径、异常传播、参数拼装三个维度,全程不发一次真实 HTTP 请求,跑起来极快。对于重试逻辑的测试,可以用 unittest.mock.patch("time.sleep") 屏蔽掉等待时间,让测试在毫秒级完成。
总结
这套 SDK 的核心设计思路可以归纳为三层分离:
- 异常层:把所有错误分类,让调用方知道"出了什么问题"。区分网络错误与业务错误,区分可重试与不可重试,是异常体系设计的关键。
- 重试层:把"要不要重试、等多久"的策略集中管理,业务代码不感知。指数退避加随机抖动,是处理网络抖动和频率限制的标准做法。
- 接口层:把 HTTP 细节封装进去,调用方只传业务参数。分模块组织接口,方便扩展也方便测试。
三层各自独立,测试也方便——可以 mock WechatClient.post 来测任意业务接口,不需要真实网络。按上面的结构搭好项目骨架,把凭证收进环境变量,再逐步补全各模块接口,就能跑通一个生产可用的微信 API Python SDK。具体可用的接口列表和字段定义,以官方文档 post.wechatapi.net 为准。
更多推荐

所有评论(0)