影刀RPA店群自动化可观测性实战:Python协同分布式链路追踪与全链路上下文传播
影刀RPA店群自动化可观测性实战:Python协同分布式链路追踪与全链路上下文传播
一个上货任务失败,追查原因要翻遍四个系统的日志。
调度引擎说“任务已分发”,Worker说“流程已启动”,影刀说“元素点击成功”,但最终商品就是没上架。

随着店群自动化系统的微服务化——调度引擎、Worker代理、影刀流程、浏览器CDP、Redis消息队列——一个业务操作横跨五六个组件。
当某个店铺的上货任务最终失败时,我们必须把散落在各处的日志片段像拼图一样拼起来,才能还原完整的故事线。
这已经不是“查日志”,而是“做刑侦”。
分布式链路追踪,正是为了解决这类跨服务、跨进程、跨中间件的排障难题而生。
这篇文章就完整展开我们如何将OpenTelemetry引入店群自动化系统,实现从调度请求到浏览器点击的全链路可观测性。
拼多多店群自动化上架方案

一、没有追踪的时候,我们在黑暗中摸索
先描述一个真实的排障场景。
某天早晨,运营反馈拼多多店铺“xx旗舰店”的上货任务连续失败。
我们打开Kibana,输入店铺ID,出来上百条日志。有调度引擎的、Worker A的、Worker B的(因为重试调度到了另一台)、Redis Streams的消费记录、影刀流程内部的步骤日志。
但问题在于:这些日志之间没有关联ID。
我们不知道Worker A上的那条“元素定位失败”日志,到底对应调度引擎的哪一次任务分发。
只能靠时间戳模糊对齐,人工拼接出大概的执行路径。浪费大量时间。
分布式追踪的核心价值,就是给每一次业务操作分配一个全局唯一的Trace ID,并在所有组件间传播这个ID,让所有日志和指标都能串起来。
二、OpenTelemetry引入:标准化的可观测性框架
经过选型,我们采用了OpenTelemetry(简称OTel)作为追踪标准。
它提供了Python SDK,支持自动埋点和手动埋点,且后端可以对接Jaeger、Zipkin或直接写入Elasticsearch。
我们首先在Python调度引擎和Worker代理中集成了OTel。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def init_tracer(service_name: str):
resource = Resource(attributes={SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)

jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-agent",
agent_port=6831,
)
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(__name__)
tracer = init_tracer("scheduler-engine")
调度引擎在接收到一个Webhook事件或定时触发时,会创建一个根Span,代表这次业务操作的整体生命周期。
TEMU店群如何管理运营?
from opentelemetry.trace import SpanKind
async def handle_new_order(order_event):
with tracer.start_as_current_span(
"handle_new_order",
kind=SpanKind.SERVER,
attributes={
"shop_id": order_event["shop_id"],
"platform": order_event["platform"],
"order_id": order_event["order_id"],
}
) as span:
trace_id = span.get_span_context().trace_id
# 将trace_id注入任务消息,传播到下游
task_message = build_task(order_event)
task_message["trace_id"] = format(trace_id, '032x')
await task_queue.enqueue(task_message)
```
从这里开始,Trace ID就与这条任务消息绑定,准备跨越进程边界传播。
---
## 三、跨Redis传播:在消息中携带上下文
任务消息通过Redis Streams在调度引擎和Worker代理之间传递。
要保证Trace上下文不丢失,必须在消息体中显式携带`traceparent`或`trace_id`。
我们采用W3C Trace Context标准,在消息的Header中注入`traceparent`字段。
```python
from opentelemetry.propagate import inject
async def enqueue_task(task, span_context):
carrier = {}
inject(carrier) # 将当前span上下文注入到carrier字典
task["traceparent"] = carrier.get("traceparent")
await redis.xadd("task:stream", {"data": json.dumps(task)})
```
Worker代理从Redis拉取到任务后,提取`traceparent`并还原出一个远程Span,作为当前执行操作的父Span。
```python
from opentelemetry.propagate import extract
async def consume_task(message):
task_data = json.loads(message["data"])
carrier = {"traceparent": task_data.get("traceparent")}
ctx = extract(carrier)
with tracer.start_as_current_span(
"execute_task",
context=ctx,
kind=SpanKind.CONSUMER,
attributes={
"task_id": task_data["task_id"],
"flow_name": task_data["flow_name"],
"shop_id": task_data["shop_id"],
}
) as span:
result = await run_shadow_flow(task_data)
span.set_attribute("task.result", result)
```
这样一来,Jaeger中就能看到一条完整的Trace链路:`handle_new_order` → `enqueue_task` → `execute_task`。
之前断裂的日志,被Trace ID串联了起来。
---
## 四、向影刀RPA流程注入追踪
这步是最棘手的。影刀RPA流程是独立的Windows进程,无法直接使用Python的OTel SDK。
但影刀流程中允许调用Python脚本作为步骤节点。我们利用这一点来传播和记录Span。
任务启动时,Worker代理将`trace_id`和当前`span_id`通过命令行参数传入影刀流程。
ShadowBot.exe --flow=“pdd_upload” --params=“shop_id=1032&trace_id=abc123&span_id=def456”
在影刀流程的关键步骤(如“点击提交按钮”),我们插入一个“执行Python脚本”节点,脚本中读取参数,并创建一个子Span,上报到本地的OTel Collector。
```python
import sys
import json
from opentelemetry import trace
from opentelemetry.trace import SpanContext, TraceFlags, NonRecordingSpan
# 从命令行参数恢复父Span上下文
trace_id_hex = sys.argv[1] # 传入的trace_id
span_id_hex = sys.argv[2] # 传入的parent_span_id
parent_span_context = SpanContext(
trace_id=int(trace_id_hex, 16),
span_id=int(span_id_hex, 16),
is_remote=True,
trace_flags=TraceFlags(0x01),
)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"shadow_flow_step:click_submit",
context=trace.set_span_in_context(NonRecordingSpan(parent_span_context)),
attributes={"step": "submit", "shop_id": sys.argv[3]}
) as span:
# 执行实际的点击操作
...
span.set_attribute("success", True)
```
虽然这些Span是由不同的Python子进程上报的,但共享同一个Trace ID,Jaeger会将它们拼接在一起。
这样,我们就实现了从调度引擎到影刀RPA内部步骤的完整链路。
---
## 五、浏览器端的追踪嵌入
自动化任务中最耗时的部分往往是页面加载和渲染。
我们利用CDP(Chrome DevTools Protocol)向浏览器注入内联的追踪逻辑,在页面关键生命周期事件(`DOMContentLoaded`、`load`)触发时,通过`fetch`向OTel Collector上报事件。
```python
async def inject_page_tracing(cdp, trace_id, span_id):
script = f"""
const traceId = '{trace_id}';
const parentSpanId = '{span_id}';
window.addEventListener('DOMContentLoaded', () => {{
const spanId = Math.random().toString(16).substr(2, 16);
fetch('http://otel-collector:4318/v1/traces', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify({{
resourceSpans: [{{
scopeSpans: [{{
spans: [{{
traceId: traceId,
spanId: spanId,
parentSpanId: parentSpanId,
name: 'page.dom_ready',
startTimeUnixNano: Date.now() * 1000000,
endTimeUnixNano: Date.now() * 1000000,
}}]
}}]
}}]
}})
}});
}});
"""
await cdp.evaluate(script)
```
这些页面级别的Span,帮助我们精确量化每个页面的DOM构建耗时,并与后端任务Span形成父子关系。
当发现某个店铺页面的DOM耗时突然从2秒飙升到15秒时,就能快速定位到平台前端可能改版或该店铺模板存在问题。
---
## 六、可观测性的三个支柱:Traces + Logs + Metrics 的关联
仅有Traces还不够。我们通过OTel的日志桥接,将Trace ID自动注入到Python的结构化日志中。
```python
import logging
from opentelemetry.instrumentation.logging import LoggingInstrumentor
LoggingInstrumentor().instrument(set_logging_format=True)
logger = logging.getLogger(__name__)
logger.info("Task started", extra={"shop_id": "1032"})
# 输出日志中自动带上了 trace_id 和 span_id
同时,Prometheus指标中也记录了Trace相关的信息,可以在Grafana中从指标下钻到相关的Trace。

当我们从告警“某个店铺上货失败率突然升高”点击进入时,会直接打开Jaeger中该店铺近期的失败Trace,看到完整的调用链瀑布图,一眼发现是“运费模板API超时”导致的连锁失败。
七、采样策略与性能开销控制
全量追踪会产生海量的Span数据,我们采用了尾部采样策略:保留所有包含错误、或耗时超过P95阈值的Trace,正常快速完成的Trace以10%概率采样。
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
sampler = ParentBased(
root=TraceIdRatioBased(0.1), # 10%采样
remote_parent_sampled=ALWAYS_ON,
remote_parent_not_sampled=ALWAYS_OFF,
local_parent_sampled=ALWAYS_ON,
local_parent_not_sampled=ALWAYS_OFF,
)
```
并且在OTel Collector中配置了`tailsamplingprocessor`,对包含`error=true`属性的Trace强制保留。
这样在保证问题可追溯的前提下,追踪数据量降低了80%,对系统性能的影响几乎可以忽略。
---
## 八、踩坑实录
**异步上下文丢失。**
在`asyncio`协程中,如果没有正确传递`context`,`start_as_current_span`会在错误的上下文中创建Span。
我们为关键的异步任务入口统一封装了`context`传递,并编写了静态检查脚本防止遗漏。
**影刀流程中Python子进程上报延迟。**

每个步骤都启动一个Python解释器,加载OTel SDK,导致步骤耗时增加了数百毫秒。
我们后来将OTel数据先写入本地文件,由Worker代理统一批量发送,避免重复初始化开销。
**Jaeger Span数量爆炸。**
初期我们给每个细小的CDP请求都创建了Span,导致每天数千万Span,Jaeger后端OOM。
经过梳理,只保留了有意义的业务节点Span,并将无异常的页面资源加载合并为一个Span。
---
## 九、写在最后
分布式追踪不是“锦上添花”,而是复杂系统排障的“刚需”。
当你的自动化系统组件数量超过一只手能数的范围,就必须让每一次调用、每一次传播都有迹可循。
OpenTelemetry提供了一套标准化的方案,让我们能够在Python、Redis、gRPC、影刀RPA、浏览器之间,构建一条完整的观测链。
> 看不见的调用链,就像没有图纸的电路板。
> > 一旦短路,只能一根根线去摸。
有了链路追踪,每一次任务失败的真相,都会被清晰地留在那里,等待你随时查阅。
---
*作者:林焱*
更多推荐


所有评论(0)