影刀RPA店群自动化可观测性实战:Python协同分布式链路追踪与全链路上下文传播


一个上货任务失败,追查原因要翻遍四个系统的日志。

调度引擎说“任务已分发”,Worker说“流程已启动”,影刀说“元素点击成功”,但最终商品就是没上架。

在这里插入图片描述

随着店群自动化系统的微服务化——调度引擎、Worker代理、影刀流程、浏览器CDP、Redis消息队列——一个业务操作横跨五六个组件。
当某个店铺的上货任务最终失败时,我们必须把散落在各处的日志片段像拼图一样拼起来,才能还原完整的故事线。

这已经不是“查日志”,而是“做刑侦”。

分布式链路追踪,正是为了解决这类跨服务、跨进程、跨中间件的排障难题而生。
这篇文章就完整展开我们如何将OpenTelemetry引入店群自动化系统,实现从调度请求到浏览器点击的全链路可观测性。

拼多多店群自动化上架方案


在这里插入图片描述

一、没有追踪的时候,我们在黑暗中摸索

先描述一个真实的排障场景。

某天早晨,运营反馈拼多多店铺“xx旗舰店”的上货任务连续失败。
我们打开Kibana,输入店铺ID,出来上百条日志。有调度引擎的、Worker A的、Worker B的(因为重试调度到了另一台)、Redis Streams的消费记录、影刀流程内部的步骤日志。

但问题在于:这些日志之间没有关联ID。

我们不知道Worker A上的那条“元素定位失败”日志,到底对应调度引擎的哪一次任务分发。
只能靠时间戳模糊对齐,人工拼接出大概的执行路径。浪费大量时间。
在这里插入图片描述

分布式追踪的核心价值,就是给每一次业务操作分配一个全局唯一的Trace ID,并在所有组件间传播这个ID,让所有日志和指标都能串起来。


二、OpenTelemetry引入:标准化的可观测性框架

经过选型,我们采用了OpenTelemetry(简称OTel)作为追踪标准。
它提供了Python SDK,支持自动埋点和手动埋点,且后端可以对接Jaeger、Zipkin或直接写入Elasticsearch。

我们首先在Python调度引擎和Worker代理中集成了OTel。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def init_tracer(service_name: str):
    resource = Resource(attributes={SERVICE_NAME: service_name})
        provider = TracerProvider(resource=resource)
            
            ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/cfbab05dc7e849f5915255b8030dd27e.png#pic_center)

                jaeger_exporter = JaegerExporter(
                        agent_host_name="jaeger-agent",
                                agent_port=6831,
                                    )
                                        provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
                                            trace.set_tracer_provider(provider)
                                                return trace.get_tracer(__name__)
tracer = init_tracer("scheduler-engine")

调度引擎在接收到一个Webhook事件或定时触发时,会创建一个根Span,代表这次业务操作的整体生命周期。

TEMU店群如何管理运营?

from opentelemetry.trace import SpanKind

async def handle_new_order(order_event):
    with tracer.start_as_current_span(
            "handle_new_order",
                    kind=SpanKind.SERVER,
                            attributes={
                                        "shop_id": order_event["shop_id"],
                                                    "platform": order_event["platform"],
                                                                "order_id": order_event["order_id"],
                                                                        }
                                                                            ) as span:
                                                                                    trace_id = span.get_span_context().trace_id
                                                                                            # 将trace_id注入任务消息,传播到下游
                                                                                                    task_message = build_task(order_event)
                                                                                                            task_message["trace_id"] = format(trace_id, '032x')
                                                                                                                    await task_queue.enqueue(task_message)
                                                                                                                    ```
从这里开始,Trace ID就与这条任务消息绑定,准备跨越进程边界传播。

---

## 三、跨Redis传播:在消息中携带上下文

任务消息通过Redis Streams在调度引擎和Worker代理之间传递。  
要保证Trace上下文不丢失,必须在消息体中显式携带`traceparent`或`trace_id`。

我们采用W3C Trace Context标准,在消息的Header中注入`traceparent`字段。

```python
from opentelemetry.propagate import inject

async def enqueue_task(task, span_context):
    carrier = {}
        inject(carrier)  # 将当前span上下文注入到carrier字典
            task["traceparent"] = carrier.get("traceparent")
                await redis.xadd("task:stream", {"data": json.dumps(task)})
                ```
Worker代理从Redis拉取到任务后,提取`traceparent`并还原出一个远程Span,作为当前执行操作的父Span。

```python
from opentelemetry.propagate import extract

async def consume_task(message):
    task_data = json.loads(message["data"])
        carrier = {"traceparent": task_data.get("traceparent")}
            ctx = extract(carrier)
                
                    with tracer.start_as_current_span(
                            "execute_task",
                                    context=ctx,
                                            kind=SpanKind.CONSUMER,
                                                    attributes={
                                                                "task_id": task_data["task_id"],
                                                                            "flow_name": task_data["flow_name"],
                                                                                        "shop_id": task_data["shop_id"],
                                                                                                }
                                                                                                    ) as span:
                                                                                                            result = await run_shadow_flow(task_data)
                                                                                                                    span.set_attribute("task.result", result)
                                                                                                                    ```
这样一来,Jaeger中就能看到一条完整的Trace链路:`handle_new_order` → `enqueue_task` → `execute_task`。  
之前断裂的日志,被Trace ID串联了起来。

---

## 四、向影刀RPA流程注入追踪

这步是最棘手的。影刀RPA流程是独立的Windows进程,无法直接使用Python的OTel SDK。  
但影刀流程中允许调用Python脚本作为步骤节点。我们利用这一点来传播和记录Span。

任务启动时,Worker代理将`trace_id`和当前`span_id`通过命令行参数传入影刀流程。

ShadowBot.exe --flow=“pdd_upload” --params=“shop_id=1032&trace_id=abc123&span_id=def456”


在影刀流程的关键步骤(如“点击提交按钮”),我们插入一个“执行Python脚本”节点,脚本中读取参数,并创建一个子Span,上报到本地的OTel Collector。

```python
import sys
import json
from opentelemetry import trace
from opentelemetry.trace import SpanContext, TraceFlags, NonRecordingSpan

# 从命令行参数恢复父Span上下文
trace_id_hex = sys.argv[1]  # 传入的trace_id
span_id_hex = sys.argv[2]   # 传入的parent_span_id

parent_span_context = SpanContext(
    trace_id=int(trace_id_hex, 16),
        span_id=int(span_id_hex, 16),
            is_remote=True,
                trace_flags=TraceFlags(0x01),
                )
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
    "shadow_flow_step:click_submit",
        context=trace.set_span_in_context(NonRecordingSpan(parent_span_context)),
            attributes={"step": "submit", "shop_id": sys.argv[3]}
            ) as span:
                # 执行实际的点击操作
                    ...
                        span.set_attribute("success", True)
                        ```
虽然这些Span是由不同的Python子进程上报的,但共享同一个Trace ID,Jaeger会将它们拼接在一起。  
这样,我们就实现了从调度引擎到影刀RPA内部步骤的完整链路。

---

## 五、浏览器端的追踪嵌入

自动化任务中最耗时的部分往往是页面加载和渲染。  
我们利用CDP(Chrome DevTools Protocol)向浏览器注入内联的追踪逻辑,在页面关键生命周期事件(`DOMContentLoaded`、`load`)触发时,通过`fetch`向OTel Collector上报事件。

```python
async def inject_page_tracing(cdp, trace_id, span_id):
    script = f"""
        const traceId = '{trace_id}';
            const parentSpanId = '{span_id}';
                window.addEventListener('DOMContentLoaded', () => {{
                        const spanId = Math.random().toString(16).substr(2, 16);
                                fetch('http://otel-collector:4318/v1/traces', {{
                                            method: 'POST',
                                                        headers: {{'Content-Type': 'application/json'}},
                                                                    body: JSON.stringify({{
                                                                                    resourceSpans: [{{
                                                                                                        scopeSpans: [{{
                                                                                                                                spans: [{{
                                                                                                                                                            traceId: traceId,
                                                                                                                                                                                        spanId: spanId,
                                                                                                                                                                                                                    parentSpanId: parentSpanId,
                                                                                                                                                                                                                                                name: 'page.dom_ready',
                                                                                                                                                                                                                                                                            startTimeUnixNano: Date.now() * 1000000,
                                                                                                                                                                                                                                                                                                        endTimeUnixNano: Date.now() * 1000000,
                                                                                                                                                                                                                                                                                                                                }}]
                                                                                                                                                                                                                                                                                                                                                    }}]
                                                                                                                                                                                                                                                                                                                                                                    }}]
                                                                                                                                                                                                                                                                                                                                                                                }})
                                                                                                                                                                                                                                                                                                                                                                                        }});
                                                                                                                                                                                                                                                                                                                                                                                            }});
                                                                                                                                                                                                                                                                                                                                                                                                """
                                                                                                                                                                                                                                                                                                                                                                                                    await cdp.evaluate(script)
                                                                                                                                                                                                                                                                                                                                                                                                    ```
这些页面级别的Span,帮助我们精确量化每个页面的DOM构建耗时,并与后端任务Span形成父子关系。  
当发现某个店铺页面的DOM耗时突然从2秒飙升到15秒时,就能快速定位到平台前端可能改版或该店铺模板存在问题。

---

## 六、可观测性的三个支柱:Traces + Logs + Metrics 的关联

仅有Traces还不够。我们通过OTel的日志桥接,将Trace ID自动注入到Python的结构化日志中。

```python
import logging
from opentelemetry.instrumentation.logging import LoggingInstrumentor

LoggingInstrumentor().instrument(set_logging_format=True)

logger = logging.getLogger(__name__)
logger.info("Task started", extra={"shop_id": "1032"})
# 输出日志中自动带上了 trace_id 和 span_id

同时,Prometheus指标中也记录了Trace相关的信息,可以在Grafana中从指标下钻到相关的Trace。

在这里插入图片描述

当我们从告警“某个店铺上货失败率突然升高”点击进入时,会直接打开Jaeger中该店铺近期的失败Trace,看到完整的调用链瀑布图,一眼发现是“运费模板API超时”导致的连锁失败。


七、采样策略与性能开销控制

全量追踪会产生海量的Span数据,我们采用了尾部采样策略:保留所有包含错误、或耗时超过P95阈值的Trace,正常快速完成的Trace以10%概率采样。

from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

sampler = ParentBased(
    root=TraceIdRatioBased(0.1),  # 10%采样
        remote_parent_sampled=ALWAYS_ON,
            remote_parent_not_sampled=ALWAYS_OFF,
                local_parent_sampled=ALWAYS_ON,
                    local_parent_not_sampled=ALWAYS_OFF,
                    )
                    ```
并且在OTel Collector中配置了`tailsamplingprocessor`,对包含`error=true`属性的Trace强制保留。

这样在保证问题可追溯的前提下,追踪数据量降低了80%,对系统性能的影响几乎可以忽略。

---

## 八、踩坑实录

**异步上下文丢失。**  
在`asyncio`协程中,如果没有正确传递`context`,`start_as_current_span`会在错误的上下文中创建Span。  
我们为关键的异步任务入口统一封装了`context`传递,并编写了静态检查脚本防止遗漏。

**影刀流程中Python子进程上报延迟。**  
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/98c4229e970d45bf86d290f41f1119b3.png#pic_center)

每个步骤都启动一个Python解释器,加载OTel SDK,导致步骤耗时增加了数百毫秒。  
我们后来将OTel数据先写入本地文件,由Worker代理统一批量发送,避免重复初始化开销。

**Jaeger Span数量爆炸。**  
初期我们给每个细小的CDP请求都创建了Span,导致每天数千万Span,Jaeger后端OOM。  
经过梳理,只保留了有意义的业务节点Span,并将无异常的页面资源加载合并为一个Span。

---

## 九、写在最后

分布式追踪不是“锦上添花”,而是复杂系统排障的“刚需”。

当你的自动化系统组件数量超过一只手能数的范围,就必须让每一次调用、每一次传播都有迹可循。  
OpenTelemetry提供了一套标准化的方案,让我们能够在Python、Redis、gRPC、影刀RPA、浏览器之间,构建一条完整的观测链。

> 看不见的调用链,就像没有图纸的电路板。  
> > 一旦短路,只能一根根线去摸。
有了链路追踪,每一次任务失败的真相,都会被清晰地留在那里,等待你随时查阅。

---

*作者:林焱*

更多推荐