影刀RPA店群自动化教程:Python驱动实时运营仪表板与手动干预架构实战

在这里插入图片描述


几十个店铺同时在跑自动化,运营却只能靠“猜”来判断系统是否正常。

在这里插入图片描述

拼多多店群自动化上架方案

一个店铺卡住了,可能要等到日报出来才知道——那时候已经晚了半天。

在这里插入图片描述

店群自动化的早期,我们的监控手段相当原始:看日志、查数据库、偶尔远程桌面到Worker上瞅一眼浏览器窗口。
运营同事想知道“现在有哪些店铺在跑什么任务”,唯一的途径是在企业微信群里问开发。

后来我们决定:不能再让运营“盲飞”了。
必须有一个所有人都能看懂的实时仪表板,展示每个店铺的自动化状态,并且允许授权人员直接进行干预。
在这里插入图片描述

这篇文章就完整展开这套实时运营仪表板与手动干预系统的架构设计和工程实现。
涉及Python后端、WebSocket实时推送、React前端和与现有调度系统的无缝集成。


TEMU店群如何管理运营?


在这里插入图片描述

一、实时数据从哪里来

仪表板要展示的核心信息,其实分散在几个已有的组件中:

  • Redis:任务状态、队列长度、Worker心跳、浏览器实例占用情况
    • PostgreSQL:历史任务记录、店铺配置
    • 调度引擎:当前正在执行的任务详情
  • 在这里插入图片描述

我们选择不做新的数据采集,而是构建一个只读的数据汇聚层,把这些信息以统一格式暴露给前端。

class DashboardDataAggregator:
    def __init__(self, redis, db_pool):
            self.redis = redis
                    self.db = db_pool
    async def get_all_shops_status(self) -> list:
            # 从Redis获取所有店铺的实时状态
                    shop_keys = await self.redis.keys("shop:status:*")
                            shops = []
                                    for key in shop_keys:
                                                shop_id = key.decode().split(":")[-1]
                                                            data = await self.redis.hgetall(key)
                                                                        shops.append({
                                                                                        "shop_id": shop_id,
                                                                                                        "platform": data.get(b"platform", b"").decode(),
                                                                                                                        "current_task": data.get(b"current_task", b"").decode(),
                                                                                                                                        "task_status": data.get(b"task_status", b"idle").decode(),
                                                                                                                                                        "progress": int(data.get(b"progress", 0)),
                                                                                                                                                                        "worker_id": data.get(b"worker_id", b"").decode(),
                                                                                                                                                                                        "last_active": data.get(b"last_active", b"").decode(),
                                                                                                                                                                                                    })
                                                                                                                                                                                                            return shops
    async def get_worker_loads(self) -> list:
            worker_keys = await self.redis.keys("workers:*")
                    workers = []
                            for key in worker_keys:
                                        info = await self.redis.hgetall(key)
                                                    workers.append({
                                                                    "worker_id": key.decode().split(":")[-1],
                                                                                    "load": json.loads(info.get(b"load", b"{}")),
                                                                                                    "status": info.get(b"status", b"online").decode(),
                                                                                                                })
                                                                                                                        return workers
    async def get_queue_depth(self) -> dict:
            depths = {}
                    for stream in ["task:stream:pdd", "task:stream:temu", "task:stream:tiktok"]:
                                depths[stream] = await self.redis.xlen(stream)
                                        return depths
                                        ```
这个聚合层不是服务,而是被仪表板后端直接调用。  
数据每2秒刷新一次(对于WebSocket推送,变化时立即推送)。

---

## 二、WebSocket实时推送:让数据主动找前端

如果前端每2秒轮询一次REST API,在网络波动时会产生明显的延迟和毛刺。  
我们采用WebSocket建立长连接,后端在检测到状态变化时主动推送,前端接收更新后局部刷新界面。

后端WebSocket服务使用FastAPI + `websockets` 实现。

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import asyncio
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
            self.active_connections: Dict[str, WebSocket] = {}
    async def connect(self, websocket: WebSocket, user_id: str):
            await websocket.accept()
                    self.active_connections[user_id] = websocket
    def disconnect(self, user_id: str):
            self.active_connections.pop(user_id, None)
    async def broadcast(self, message: dict):
            disconnected = []
                    for user_id, ws in self.active_connections.items():
                                try:
                                                await ws.send_json(message)
                                                            except:
                                                                            disconnected.append(user_id)
                                                                                    for uid in disconnected:
                                                                                                self.disconnect(uid)
manager = ConnectionManager()

@app.websocket("/ws/dashboard")
async def dashboard_websocket(websocket: WebSocket):
    # 简单的token验证
        token = websocket.query_params.get("token")
            user_id = verify_token(token)
                if not user_id:
                        await websocket.close(code=4001)
                                return
    await manager.connect(websocket, user_id)
        try:
                while True:
                            # 保持连接,等待客户端消息(如心跳)
                                        data = await websocket.receive_text()
                                            except WebSocketDisconnect:
                                                    manager.disconnect(user_id)
                                                    ```
当调度引擎或Worker代理更新任务状态时,会调用一个通知函数,将状态变更消息发送到所有连接的WebSocket客户端。

```python
async def notify_status_change(shop_id: str, status_update: dict):
    await manager.broadcast({
            "type": "shop_status",
                    "shop_id": shop_id,
                            "data": status_update
                                })
                                ```
前端收到消息后,更新对应店铺卡片的状态和进度条,无需刷新整个页面。

---

## 三、前端设计:一眼看清所有店铺

前端使用React + Ant Design构建(本文侧重后端架构,仅描述前端思路)。

每个店铺显示为一个卡片,卡片上包含:
- 店铺名称与平台图标
- - 当前任务名称(如“上货中”“采集竞品”)
- - 进度条(已完成步骤 / 总步骤)
- - 状态标签:运行中(绿色)、暂停(黄色)、失败(红色)、空闲(灰色)
- - Worker节点和最近活跃时间
卡片按平台分组,支持拖拽排序和自定义视图。  
运营可以将自己负责的店铺固定在顶部,或者按状态筛选。

当某个店铺任务失败时,对应卡片变红并闪烁,点击可查看最近的错误信息。

**设计原则:不需要任何培训,运营一看就懂。**

---

## 四、手动干预:从“只能看”到“可以动”

如果仪表板只是展示,它依然是一个高级监控屏。  
真正释放运营能力的,是允许他们直接在仪表板上进行干预操作。

我们提供的干预能力包括:
- **暂停任务**:安全暂停当前正在执行的任务(上一篇文章详细描述了暂停机制)
- - **恢复任务**:从暂停点继续执行
- - **跳过当前步骤**:遇到已知的临时问题(如平台公告页弹窗),跳过非关键步骤
- - **修改任务参数**:如临时调整上架价格、更换回复话术模板
- - **终止任务**:强制终止并将任务标记为失败
每个操作都对应后端的一个REST API,需要权限校验和审计日志。

```python
from fastapi import HTTPException, Depends

@app.post("/api/tasks/{task_id}/pause")
async def api_pause_task(task_id: str, reason: str = "", user=Depends(get_current_user)):
    if not user.has_permission("task:pause"):
            raise HTTPException(status_code=403, detail="No permission")
                await task_manager.request_pause(task_id, reason)
                    await audit_logger.log(user.id, "pause_task", task_id, {"reason": reason})
                        return {"status": "ok"}
@app.post("/api/tasks/{task_id}/modify_step")
async def api_modify_step(task_id: str, step_index: int, new_params: dict, user=Depends(get_current_user)):
    if not user.has_permission("task:modify"):
            raise HTTPException(status_code=403)
                await task_manager.modify_step_params(task_id, step_index, new_params)
                    await audit_logger.log(user.id, "modify_step", task_id, {"step": step_index, "params": new_params})
                        return {"status": "ok"}
                        ```
权限模型基于RBAC,与之前的安全系统一致。  
运营人员默认只有“查看”和“暂停/恢复”权限,“修改参数”和“终止任务”需要更高角色。

操作完成后,系统立即通过WebSocket向所有连接的客户端广播状态更新,前端实时反映变化。

---

## 五、任务进度的精确展示

进度条是仪表板上最直观的指标。  
为了让进度条准确,我们需要知道每个任务的总步骤数和当前执行到第几步。

指令配置中的每个步骤都有明确的序列号。  
Worker在执行每一步前、后,都会更新Redis中的进度信息。

```python
async def update_task_progress(task_id: str, step_index: int, total_steps: int):
    await redis.hset(f"task:progress:{task_id}", mapping={
            "current_step": step_index,
                    "total_steps": total_steps,
                            "updated_at": time.time()
                                })
                                    # 触发WebSocket推送
                                        await notify_status_change(
                                                task_context["shop_id"],
                                                        {"task_id": task_id, "progress": int(step_index / total_steps * 100)}
                                                            )
                                                            ```
前端收到进度更新后,对应店铺卡片上的进度条平滑增长。  
这让运营能直观判断任务是否在正常进行——如果进度条长时间卡住不动,说明可能遇到了问题。

---

## 六、大规模店铺下的性能优化

当店铺数量达到60+时,每个前端都连接WebSocket并接收全量广播,会产生大量的前端渲染压力。  
我们做了以下优化:

- **频道订阅**:WebSocket连接时可以指定关注的平台或店铺组,后端只推送相关更新,而非全量广播。
- - **增量更新**:只推送变更的字段,前端做局部Patch,而不是整个店铺对象。
- - **前端虚拟滚动**:店铺卡片列表使用`react-window`只渲染可视区域内的卡片,大幅降低DOM节点数。
- - **后端数据缓存**:聚合数据在内存中缓存1秒,多个WebSocket客户端共享同一份数据,避免对Redis的高频查询。
```python
class FilteredBroadcaster:
    def __init__(self):
            self.subscriptions: Dict[str, set] = {}  # user_id -> {shop_id, ...}
    async def push_to_subscribers(self, shop_id, update):
            for user_id, shops in self.subscriptions.items():
                        if shop_id in shops or "*" in shops:
                                        ws = manager.active_connections.get(user_id)
                                                        if ws:
                                                                            await ws.send_json({"shop_id": shop_id, "update": update})
                                                                            ```
前端在建立WebSocket连接时可以传参 `subscribe_shops`,后端据此过滤推送内容。

---

## 七、与现有系统的集成

这套仪表板系统作为独立微服务部署,不侵入调度引擎和Worker代理的核心代码。

- 通过只读访问Redis获取实时状态
- - 通过REST API调用调度引擎的干预接口(pause/resume/modify)
- - 通过订阅Redis Keyspace Notifications或现有的事件流获取状态变更事件
Worker代理在更新任务状态时,已经习惯写入Redis Hash。  
我们只是在写入后增加了一步`notify_status_change`调用,不影响原有执行流程。

---

## 八、踩坑与教训

**WebSocket断线重连风暴。**  
前端在断线后设置了自动重连,但没有加退避延迟。一次后端短暂重启,导致数十个前端几乎同时重连,产生了瞬间的认证请求洪峰。  
我们加入了1~5秒的随机退避重连,并在后端增加了连接限速。

**进度条跳跃。**  
当任务包含并行步骤时,步骤序号和总步骤数的线性比例无法准确反映进度。  
我们改为在指令配置中定义每个步骤的“权重”,总进度按已完成步骤的权重累加计算。

**权限边界模糊。**  
运营有时会绕过仪表板,直接通过其他渠道找开发修改任务,导致审计日志缺失。  
我们强制所有干预操作必须通过仪表板的API入口,Worker不接受来自其他渠道的干预指令。

---

## 九、写在最后

自动化的价值,不仅体现在“机器替人干活”,更体现在“人能够清晰地看见机器在干什么”。

一套好的运营仪表板,加上安全的手动干预能力,让运营团队从“担心自动化跑出问题”变成了“信任自动化并参与其中”。

> 当运营可以一边喝咖啡,一边在手机上看到所有店铺在有条不紊地工作时,  
> > 自动化的工程价值才算真正落地。
---

*作者:林焱*

更多推荐