调用第三方 HTTP API 时,最怕的不是接口不好用,而是你的代码一旦出错就满屏 traceback,既不知道是超时还是业务错误,重试逻辑也散落在各处。本文以微信 HTTP API 为例,从零封装一个带重试、带结构化异常、带日志的 Python SDK,让调用方只关心业务逻辑,不再为底层网络抖动焦虑。


一、为什么要封装而不是直接 requests.post?

很多人刚接入微信 API,第一版代码往往长这样:

import requests

resp = requests.post(
    "https://xxx/postText",
    headers={"VideosApi-token": "your_token"},
    json={"appId": "xxx", "toWxid": "yyy", "content": "hello"}
)
print(resp.json())

能跑,但脆:

  1. 网络抖动:requests 默认没有重试,一次超时直接 ConnectionError
  2. 业务错误混入:HTTP 200 但 ret != 200,调用方没有统一拦截,错误静默丢失。
  3. token/appId 到处复制:换个接口就要重新写 headers,维护成本高。
  4. 日志缺失:出了问题找不到是哪个接口、哪个时间点出错。

封装的目的就是把这四件事收拢到一个地方,让业务代码干净到只剩下"我要发消息"这一行。

随着业务接口越来越多——发文本、发图片、发文件、拉群列表、查好友——裸写 requests.post 的问题会被放大:每个接口都要手动拼 headers,错误处理逻辑散在各处,某天 token 换了还要全局搜索替换。这种代码在项目初期或许还能维护,一旦进入多人协作或接入 CI/CD,代价就会成倍放大。更重要的是,封装不只是减少重复,更是在强制规范团队的调用行为——超时多少秒、重试几次、哪些错误要告警,都收进一个地方统一管理。


二、项目结构

wechat_sdk/
├── __init__.py
├── client.py       # 核心 HTTP 客户端(重试 + 异常)
├── exceptions.py   # 自定义异常层次
├── api/
│   ├── __init__.py
│   ├── message.py  # 消息相关接口
│   ├── contact.py  # 联系人接口
│   └── group.py    # 群组接口
└── utils/
    ├── logger.py   # 统一日志
    └── retry.py    # 重试策略

目录不复杂,关键在于分层清晰:client 只管 HTTP,api/ 只管拼参数,调用方只管传业务数据*。

这个分层结构有一个实际好处:当官方文档更新了某个接口的字段,你只需要改 api/ 目录下对应的方法,不需要动 client.py 和异常体系。反过来,如果平台换了鉴权方式,只改 client.py 里的 headers 配置,业务层完全不感知。改动范围的可控性,是衡量封装质量的重要指标。


三、先定义异常体系

好的 SDK,异常要可区分、可捕获。把所有微信 API 异常统一挂在 WechatApiError 下:

# wechat_sdk/exceptions.py

class WechatApiError(Exception):
    """所有微信 API 异常的基类"""
    pass


class NetworkError(WechatApiError):
    """网络层错误:超时、连接失败等"""
    def __init__(self, message: str, original: Exception = None):
        super().__init__(message)
        self.original = original


class ApiError(WechatApiError):
    """业务层错误:HTTP 200 但 ret != 200"""
    def __init__(self, ret: int, msg: str, data=None):
        super().__init__(f"API Error [{ret}]: {msg}")
        self.ret = ret
        self.msg = msg
        self.data = data


class AuthError(ApiError):
    """Token 无效或过期(ret == 401)"""
    pass


class RateLimitError(ApiError):
    """频率超限(ret == 429 或业务提示频率过高)"""
    pass

这样调用方可以按需捕获:只关心网络就捕 NetworkError,只关心频率就捕 RateLimitError,兜底用 WechatApiError

异常体系的设计有几个需要特别注意的细节。第一,NetworkError 保留了 original 字段,用来存放原始的 requests.exceptions 异常对象,方便在日志里打出底层堆栈,定位是 DNS 解析失败还是 TCP 握手超时。第二,ApiErrorretmsgdata 都存下来,调用方可以在捕获异常后读取 e.ret 决定是否需要告警,而不是解析字符串。第三,AuthError 继承 ApiError 而不是 WechatApiError,这样捕获 ApiError 的代码天然也能捕获鉴权错误,不需要额外的 except AuthError 分支。


四、重试策略封装

微信 API 的网络抖动多发生在超时和 5xx 场景,业务错误(频率超限)则需要等待后重试,而鉴权失败永远不该重试。把这个逻辑独立出来:

# wechat_sdk/utils/retry.py

import time
import random
import logging
from functools import wraps
from wechat_sdk.exceptions import NetworkError, RateLimitError, AuthError

logger = logging.getLogger("wechat_sdk.retry")


def with_retry(max_attempts: int = 3, base_delay: float = 1.0, backoff: float = 2.0):
    """
    指数退避重试装饰器。
    - NetworkError / RateLimitError 会重试
    - AuthError 及其他 ApiError 立即抛出,不重试
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = base_delay
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except AuthError:
                    raise  # 鉴权错误不重试
                except RateLimitError as e:
                    last_exc = e
                    wait = delay + random.uniform(0, 0.5)  # 加随机抖动
                    logger.warning(f"[Attempt {attempt}/{max_attempts}] 频率超限,{wait:.1f}s 后重试")
                    time.sleep(wait)
                    delay *= backoff
                except NetworkError as e:
                    last_exc = e
                    wait = delay + random.uniform(0, 0.3)
                    logger.warning(f"[Attempt {attempt}/{max_attempts}] 网络错误: {e}{wait:.1f}s 后重试")
                    time.sleep(wait)
                    delay *= backoff
            raise last_exc
        return wrapper
    return decorator

backoff=2.0 意味着第一次等 1s,第二次等 2s,第三次等 4s,同时加了随机抖动防止多个进程同时重试"踩踏"。

关于重试次数的选择max_attempts=3 是一个经验值。对于微信消息发送这类接口,3 次重试足以覆盖绝大多数短暂的网络抖动,同时不会在对端真正故障时让请求拖太久。如果你的业务对消息实时性要求极高(比如客服系统),可以把 max_attempts 调低到 2,base_delay 调到 0.5s,牺牲一点容错换取快速失败。反之,如果是低优先级的批量任务,可以调高到 5 次,配合更长的退避时间。

常见坑:装饰器里的 last_exc 在第一次就成功时是 None,需要确保正常返回路径不会走到 raise last_exc。这里用 return func(*args, **kwargs) 直接返回,所以不会有问题。但如果你改写成 while 循环,务必检查循环出口条件,否则全部重试完后 raise None 会产生一个令人困惑的 TypeError


五、核心 HTTP 客户端

# wechat_sdk/client.py

import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from wechat_sdk.exceptions import NetworkError, ApiError, AuthError, RateLimitError
from wechat_sdk.utils.retry import with_retry

logger = logging.getLogger("wechat_sdk.client")

# 占位符配置,注册后从官方文档 post.wechatapi.net 获取
BASE  = "https://你的接口域名"   # 注册后在官方文档获取
TOKEN = "你的Token"
APPID = "你的appId"
HEADERS = {"VideosApi-token": TOKEN}


class WechatClient:
    def __init__(self, base_url: str = BASE, token: str = TOKEN, appid: str = APPID,
                 timeout: int = 10, max_retries: int = 3):
        self.base_url = base_url.rstrip("/")
        self.appid = appid
        self.timeout = timeout
        self.max_retries = max_retries

        self._session = requests.Session()
        self._session.headers.update({"VideosApi-token": token, "Content-Type": "application/json"})

        # urllib3 层面处理 TCP 连接重试(非业务重试)
        adapter = HTTPAdapter(max_retries=Retry(total=2, backoff_factor=0.3,
                                                 status_forcelist=[500, 502, 503]))
        self._session.mount("https://", adapter)
        self._session.mount("http://", adapter)

    @with_retry(max_attempts=3, base_delay=1.0)
    def post(self, path: str, payload: dict) -> dict:
        """
        统一 POST 入口。自动注入 appId,统一异常转换。
        代码为示例,具体接口/字段以官方文档 post.wechatapi.net 为准。
        """
        url = f"{self.base_url}{path}"
        body = {"appId": self.appid, **payload}

        logger.debug(f"POST {url} | payload keys: {list(body.keys())}")

        try:
            resp = self._session.post(url, json=body, timeout=self.timeout)
            resp.raise_for_status()
        except requests.exceptions.Timeout as e:
            raise NetworkError("请求超时", original=e)
        except requests.exceptions.ConnectionError as e:
            raise NetworkError("连接失败", original=e)
        except requests.exceptions.HTTPError as e:
            raise NetworkError(f"HTTP 错误: {e.response.status_code}", original=e)

        data = resp.json()
        ret = data.get("ret", -1)

        if ret == 200:
            logger.debug(f"POST {path} 成功")
            return data.get("data", {})

        msg = data.get("msg", "未知错误")

        if ret == 401:
            raise AuthError(ret=ret, msg=msg)
        if ret == 429 or "频率" in msg:
            raise RateLimitError(ret=ret, msg=msg)

        raise ApiError(ret=ret, msg=msg, data=data.get("data"))

注意几个设计决策:

  • HTTPAdapter 处理 TCP 级别抖动,@with_retry 处理业务级抖动,两层各司其职。
  • post() 自动注入 appId,调用方不需要每次传。
  • 返回值直接是 data 字段内容,不需要调用方再 .get("data")

关于 timeout 参数timeout=10 是单次请求超时时间(秒),传入 requests.Session.post 时是连接+读取的总时间上限。如果你的业务里有发送大文件的接口(比如发视频),应该针对该接口单独传更大的 timeout,而不是把全局超时调高,否则普通文本消息也会等很久才报超时。requests 支持传 timeout=(connect_timeout, read_timeout) 元组,可以把连接超时控制在 3 秒内,读取超时按需配置。

关于 Session 的复用requests.Session 会复用底层 TCP 连接(HTTP keep-alive),对同一个域名的多次请求性能明显优于每次创建新的 requests 对象。在批量发消息场景下,这个差距会更明显。WechatClient 持有一个 _session 实例,因此整个 SDK 生命周期内都在复用同一个连接池,这是有意为之的设计。


六、业务接口层:消息模块

有了客户端,业务模块就只剩拼参数:

# wechat_sdk/api/message.py

from wechat_sdk.client import WechatClient


class MessageApi:
    def __init__(self, client: WechatClient):
        self.client = client

    def send_text(self, to_wxid: str, content: str, ats: list = None) -> dict:
        """发送文本消息"""
        payload = {"toWxid": to_wxid, "content": content}
        if ats:
            payload["ats"] = ",".join(ats)
        return self.client.post("/postText", payload)

    def send_image(self, to_wxid: str, image_url: str) -> dict:
        """发送图片(URL 方式)"""
        return self.client.post("/postImage", {"toWxid": to_wxid, "imageUrl": image_url})

    def send_link(self, to_wxid: str, title: str, desc: str, url: str, thumb_url: str = "") -> dict:
        """发送链接卡片"""
        return self.client.post("/postLink", {
            "toWxid": to_wxid, "title": title,
            "desc": desc, "url": url, "thumbUrl": thumb_url
        })

    def revoke(self, to_wxid: str, msg_id: str) -> dict:
        """撤回消息"""
        return self.client.post("/revokeMsg", {"toWxid": to_wxid, "msgId": msg_id})

参数说明to_wxid 是接收方的微信 ID,群消息传群 ID(通常以 @chatroom 结尾),单聊传对方的 wxid。ats 是群 @ 列表,填 wxid 即可,SDK 内部会转成逗号分隔的字符串;如果要 @所有人,通常传 ["notify@all"],具体值以官方文档 post.wechatapi.net 为准。msg_id 是发送成功后平台返回的消息 ID,撤回时用,注意撤回有时间窗口限制,过期后接口会返回业务错误而不是抛异常,需要自行处理返回值。

常见坑send_image 传的是图片 URL,不是本地路径。如果图片在本地,需要先上传到 CDN 或使用平台提供的上传接口拿到 URL,再调用该方法。直接传 file:// 路径不会报错,但对方收不到图片。


七、联系人与群组模块(精简版)

# wechat_sdk/api/contact.py

class ContactApi:
    def __init__(self, client):
        self.client = client

    def search(self, keyword: str) -> dict:
        return self.client.post("/search", {"keyword": keyword})

    def add_friend(self, to_wxid: str, remark: str = "") -> dict:
        return self.client.post("/addContacts", {"toWxid": to_wxid, "remark": remark})

    def set_remark(self, to_wxid: str, remark: str) -> dict:
        return self.client.post("/setFriendRemark", {"toWxid": to_wxid, "remark": remark})

    def fetch_list(self) -> dict:
        return self.client.post("/fetchContactsList", {})


# wechat_sdk/api/group.py

class GroupApi:
    def __init__(self, client):
        self.client = client

    def create(self, member_list: list) -> dict:
        return self.client.post("/createChatroom", {"memberList": member_list})

    def invite(self, chatroom_id: str, member_list: list) -> dict:
        return self.client.post("/inviteMember", {"chatroomId": chatroom_id, "memberList": member_list})

    def kick(self, chatroom_id: str, member_list: list) -> dict:
        return self.client.post("/removeMember", {"chatroomId": chatroom_id, "memberList": member_list})

    def set_announcement(self, chatroom_id: str, content: str) -> dict:
        return self.client.post("/setChatroomAnnouncement", {"chatroomId": chatroom_id, "content": content})

关于 fetch_list 的注意事项:拉取联系人列表是一个相对耗时的接口,返回数据量较大时响应时间可能超过默认的 10 秒。建议在调用前临时增大 client.timeout,或者直接在 SDK 初始化时给这个场景单独创建一个 timeout 更大的客户端实例。另外,联系人列表接口有调用频率限制,不要在循环里高频调用,应在本地缓存结果并设置合理的刷新周期。


八、统一入口:SDK 门面

# wechat_sdk/__init__.py

from wechat_sdk.client import WechatClient
from wechat_sdk.api.message import MessageApi
from wechat_sdk.api.contact import ContactApi
from wechat_sdk.api.group import GroupApi
from wechat_sdk.exceptions import WechatApiError, NetworkError, ApiError, AuthError, RateLimitError


class WechatSdk:
    """
    微信 API SDK 门面类。
    示例配置请参考官方文档 post.wechatapi.net,
    base_url / token / appid 均为占位符,注册后替换。
    """
    def __init__(self, base_url: str, token: str, appid: str, timeout: int = 10):
        client = WechatClient(base_url=base_url, token=token, appid=appid, timeout=timeout)
        self.message = MessageApi(client)
        self.contact = ContactApi(client)
        self.group   = GroupApi(client)


__all__ = [
    "WechatSdk",
    "WechatApiError", "NetworkError", "ApiError", "AuthError", "RateLimitError"
]

门面类(Facade)的价值在于:调用方只需要 from wechat_sdk import WechatSdk 一行导入,不需要了解内部模块划分。sdk.messagesdk.contactsdk.group 的命名空间也足够清晰,IDE 的自动补全能直接列出所有可用方法,降低上手成本。如果后续需要扩展新模块(比如朋友圈、支付),在 WechatSdk.__init__ 里加一行挂载即可,不需要修改任何外层接口。


九、调用示例

import logging
from wechat_sdk import WechatSdk, NetworkError, RateLimitError, AuthError

logging.basicConfig(level=logging.INFO)

# 三个占位符,注册后在官方文档 post.wechatapi.net 查看
sdk = WechatSdk(
    base_url="https://你的接口域名",
    token="你的Token",
    appid="你的appId"
)

try:
    result = sdk.message.send_text(to_wxid="filehelper", content="Hello from SDK!")
    print("发送成功:", result)

except AuthError:
    print("Token 无效,请检查配置")

except RateLimitError as e:
    print(f"频率超限: {e.msg},已自动重试 3 次仍失败")

except NetworkError as e:
    print(f"网络异常: {e}")

调用方的 try/except 结构清晰,每种错误有明确的处理路径,不再是一刀切的 except Exception

生产环境建议:不要把 TOKEN、APPID 等凭证硬编码在源码里,应该从环境变量或配置文件读取。可以在 SDK 初始化前加一段读取逻辑:

import os
sdk = WechatSdk(
    base_url=os.environ["WECHAT_BASE_URL"],
    token=os.environ["WECHAT_TOKEN"],
    appid=os.environ["WECHAT_APPID"]
)

这样既方便部署,也避免凭证随代码泄露到版本控制系统。


十、使用 WechatApi 平台的几点实践建议

实际跑这套 SDK 时,WechatApi 这类托管微信 HTTP API 平台已经把登录保活、设备管理这些脏活封装好了,你只需要关注业务逻辑。不过有几条操作规范值得写进你的 SDK 配置里:

操作类型 建议频率
主动加好友 每 2 小时 ≤ 5 个,每天 5–15 个
搜索用户 每天 10–20 次
建群 每天 ≤ 10 个,间隔 10 分钟以上
朋友圈点赞/评论 随机间隔 5–20 秒
批量下载(图/文件) 每条间隔 3–10 秒,做队列
新设备首次调用 在线 3 天后再发批量操作

这些不是平台限制,是微信风控的行为特征识别逻辑——拟人化、控频才是核心。批量发图建议先上传一次拿到 CDN URL,后续用转发接口复用,避免反复上传触发异常检测。

从 SDK 设计角度来说,这个控频需求最好也写进代码里,而不是依赖调用方自觉。可以在 MessageApi 里维护一个上次调用时间戳,每次 send_* 前检查间隔,不足时 time.sleep() 补足。这样即使调用方在循环里猛发,SDK 层也能自动限速,而不是等到平台返回 RateLimitError 再触发重试等待。


十一、消息回调处理(Flask 示例)

SDK 负责发消息,收消息则依赖平台推送到你的回调地址(通过 setCallback 接口注册)。用 Flask 做一个最简回调服务:

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/wechat/callback", methods=["POST"])
def callback():
    data = request.json
    msg_type = data.get("type")
    from_wxid = data.get("fromWxid")
    content = data.get("content")

    # 具体字段以官方文档 post.wechatapi.net 为准
    if msg_type == 1:  # 文本消息(示例类型值,实际以文档为准)
        print(f"收到来自 {from_wxid} 的消息: {content}")
        # 异步处理,避免同步阻塞超时
        # process_async(data)

    return jsonify({"code": 200})  # 必须返回 200,否则平台认为推送失败会重试

if __name__ == "__main__":
    app.run(port=5000)

关键:回调接口必须公网可达,且必须返回 HTTP 200,否则平台会认为推送失败并重试,可能造成消息重复处理。在回调里不要做同步的耗时操作(如直接下载文件),应该把任务推入队列异步处理。

回调的几个常见坑

第一,幂等性处理。平台在网络抖动时可能重复推送同一条消息,消息有唯一 ID 字段,建议在处理前用 Redis 或数据库记录已处理的消息 ID,收到重复消息直接返回 200 跳过,不重复执行业务逻辑。

第二,签名验证。部分平台支持在回调请求里附带签名,用于验证推送方身份,防止伪造请求。如果平台有此能力,务必启用,具体验证逻辑以官方文档 post.wechatapi.net 为准。

第三,本地开发调试。开发阶段服务在本地跑,没有公网 IP,可以用 frpngrok 等内网穿透工具临时暴露本地端口,在平台注册穿透后的公网地址作为回调 URL,调试完成再换成正式地址。


十二、单元测试:mock 掉网络

封装好分层之后,测试变得容易很多——只需要 mock WechatClient.post,不需要真实网络:

# tests/test_message.py

import pytest
from unittest.mock import MagicMock
from wechat_sdk.client import WechatClient
from wechat_sdk.api.message import MessageApi
from wechat_sdk.exceptions import RateLimitError


def make_api():
    client = MagicMock(spec=WechatClient)
    return MessageApi(client), client


def test_send_text_success():
    api, client = make_api()
    client.post.return_value = {"msgId": "fake_msg_id_001"}

    result = api.send_text("filehelper", "hello")
    assert result["msgId"] == "fake_msg_id_001"
    client.post.assert_called_once_with("/postText", {"toWxid": "filehelper", "content": "hello"})


def test_send_text_rate_limit_propagates():
    api, client = make_api()
    client.post.side_effect = RateLimitError(ret=429, msg="频率超限")

    with pytest.raises(RateLimitError):
        api.send_text("filehelper", "hello")


def test_send_text_with_at():
    api, client = make_api()
    client.post.return_value = {}

    api.send_text("room@chatroom", "大家好", ats=["wxid_aaa", "wxid_bbb"])
    _, kwargs = client.post.call_args
    # ats 被拼成逗号分隔字符串
    assert client.post.call_args[0][1]["ats"] == "wxid_aaa,wxid_bbb"

这三个测试用例覆盖了正常路径、异常传播、参数拼装三个维度,全程不发一次真实 HTTP 请求,跑起来极快。对于重试逻辑的测试,可以用 unittest.mock.patch("time.sleep") 屏蔽掉等待时间,让测试在毫秒级完成。


总结

这套 SDK 的核心设计思路可以归纳为三层分离:

  • 异常层:把所有错误分类,让调用方知道"出了什么问题"。区分网络错误与业务错误,区分可重试与不可重试,是异常体系设计的关键。
  • 重试层:把"要不要重试、等多久"的策略集中管理,业务代码不感知。指数退避加随机抖动,是处理网络抖动和频率限制的标准做法。
  • 接口层:把 HTTP 细节封装进去,调用方只传业务参数。分模块组织接口,方便扩展也方便测试。

三层各自独立,测试也方便——可以 mock WechatClient.post 来测任意业务接口,不需要真实网络。按上面的结构搭好项目骨架,把凭证收进环境变量,再逐步补全各模块接口,就能跑通一个生产可用的微信 API Python SDK。具体可用的接口列表和字段定义,以官方文档 post.wechatapi.net 为准。

更多推荐