SkyWalking Agent数据采集和上报原理浅析
Skywalking是一款云原生的APM(应用性能管理)系统。包含了事件日志、链路追踪、指标监控能力,同时提供了数据的采集、数据存储以及数据展示三个先对独立的子系统。是一款“广义的”分布式链路追踪系统。上一篇文章《SkyWalking如何通过修改字节码让插件生效》讲了SkyWalking的插件工作原理,这篇文章就来探索一下SkyWalking到底是如何实现链路追踪和指标监控的功能的。整体工作原理采
Skywalking是一款云原生的APM(应用性能管理)系统。包含了事件日志、链路追踪、指标监控能力,同时提供了数据的采集、数据存储以及数据展示三个先对独立的子系统。是一款“广义的”分布式链路追踪系统。
上一篇文章《SkyWalking如何通过修改字节码让插件生效》讲了SkyWalking的插件工作原理,这篇文章就来探索一下SkyWalking到底是如何实现链路追踪和指标监控的功能的。
整体工作原理
采集数据过程分析
链路追踪核心概念:Trace、Span。
具体在这里不展开,可以自行参考《OpenTracing语义标准》。
大致如下:
- 一个Trace对应一次完整的调用链路。
- 一个线程内的调用对应一个TraceSegement。
- 同一个线程内方法每调用一次就生成一个Span,同时spanId + 1。
- 跨进程(如服务之间的调用)或跨线程(如异步调用)生成新的TraceSegement和Sapn,并通过指针指向上游链路信息。
领域建模
核心领域模型主要分为四个部分:ids、trace、tag、context。
ids
DistributedTraceId
org.skywalking.apm.agent.core.context.ids.DistributedTraceId
,分布式链路追踪编号抽象类。
NewDistributedTraceId
org.skywalking.apm.agent.core.context.ids.NewDistributedTraceId
,新建 的分布式链路追踪id。
PropagatedTraceId
org.skywalking.apm.agent.core.context.ids.PropagatedTraceId
,传播 的分布式链路追踪编号,该id并不是重新生成的,而是从上游服务传递过来的。
例如,A 服务调用 B 服务时,A 服务会将 DistributedTraceId 对象带给 B 服务,B 服务会调用 PropagatedTraceId(String id)
构造方法 ,创建 PropagatedTraceId 对象。
trace
AbstractSpan
org.skywalking.apm.agent.core.context.trace.AbstractSpan
,Span 接口。
NoopSpan
org.skywalking.apm.agent.core.context.trace.NoopSpan
,实现 AbstractSpan 接口,无操作的 Span,即不会采集数据 。
配合IgnoredTracerContext
使用用于尽可能降低内存和 gc 成本。
NoopExitSpan
org.skywalking.apm.agent.core.context.trace.NoopExitSpan
,继承 StackBasedTracingSpan 抽象类,无操作的出口 Span 。
AbstractTracingSpan
org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan
,实现 AbstractSpan 接口,链路追踪 Span 抽象类。
StackBasedTracingSpan
org.skywalking.apm.agent.core.context.trace.StackBasedTracingSpan
,实现 AbstractTracingSpan 抽象类,基于栈的链路追踪 Span 抽象类。
LocalSpan
org.skywalking.apm.agent.core.context.trace.LocalSpan
,继承 AbstractTracingSpan 抽象类,本地 Span ,用于一个普通方法的链路追踪,例如本地方法。
EntrySpan
org.skywalking.apm.agent.core.context.trace.EntrySpan
,实现 StackBasedTracingSpan 抽象类,入口 Span ,用于服务提供者( Service Provider ) ,例如 Tomcat 。
EntrySpan 是 TraceSegment 的第一个 Span ,这也是为什么称为"入口" Span 的原因。
ExitSpan
org.skywalking.apm.agent.core.context.trace.ExitSpan
,继承 StackBasedTracingSpan 抽象类,出口 Span ,用于服务消费者( Service Consumer ) ,例如 HttpClient 。
一个 TraceSegment 会有多个 ExitSpan 对象 ,例如【服务 A】远程调用【服务 B】,然后【服务 A】再次远程调用【服务 B】,或者然后【服务 A】远程调用【服务 C】。
TraceSegment
org.skywalking.apm.agent.core.context.trace.TraceSegment
,如上图所示,是一次分布式链路追踪( Distributed Trace ) 的一段。
tag
AbstractTag
org.skywalking.apm.agent.core.context.tag.AbstractTag<T>
,标签抽象类。注意,这个类的用途是将标签属性设置到 Span 上,或者说,它是设置 Span 的标签的工具类。
StringTag
org.skywalking.apm.agent.core.context.tag.StringTag
,值类型为 String 的标签实现类。
IntegerTag
org.skywalking.apm.agent.core.context.tag.IntegerTag
,值类型为 Integer 的标签实现类。
Tags
org.skywalking.apm.agent.core.context.tag.Tags
,常用 Tag 枚举类,内部定义了多个 HTTP 、DB 相关的 StringTag 的静态变量。
context
AbstractTracerContext
org.skywalking.apm.agent.core.context.AbstractTracerContext
,链路追踪上下文接口。
TracingContext
org.skywalking.apm.agent.core.context.TracingContext
,实现 AbstractTracerContext 接口,链路追踪上下文实现类。
与之搭配使用的还有ContextCarrier和ContextSnapshot
ContextCarrier:org.skywalking.apm.agent.core.context.ContextCarrier
,跨进程 Context 传输载体。
ContextSnapshot:org.skywalking.apm.agent.core.context.ContextSnapshot
,跨线程 Context 传递快照。
ContextManager
org.skywalking.apm.agent.core.context.ContextManager
,实现了 BootService 、TracingContextListener 、IgnoreTracerContextListener 接口,链路追踪上下文管理器。
ContextManager 封装了所有 AbstractTracerContext 提供的方法,从而实现,外部调用者,例如 SkyWalking 的插件,只调用 ContextManager 的方法,而不调用 AbstractTracerContext 的方法。
SK的链路追踪数据的采集过程其实是一个生产者-消费者模型。
数据生产
- MethodInterceptor,自定义的插件逻辑(在下面会说)。在执行完插件逻辑,调用后置方法关闭span。
- TracingContext,链路追踪上下文。从上下文中活跃的 span栈 弹出栈顶那个,将其加入到上下文持有的TraceSegment的已完成span列表中,如何此时活跃的span栈空了,就通知TraceSegment完成监听器IListener。
- TraceSegmentServiceClient,实现了IListener接口。监听到TraceSegment完成后,会去调用DataCarrier生产数据。
- DataCarrier,数据处理组件。持有QueueBuffer数组,核心逻辑包括生产数据和消费数据,将完成的TraceSegment保存到QueueBuffer里。
- QueueBuffer,数据队列。有多个,会根据算法计算TraceSegment对应的index,根据index找到要保存的目标队列。
数据消费
- TraceSegmentServiceClient,启动消费服务。
- DataCarrier,创建消费者驱动ConsumeDriver,通过消费者驱动来消费消息。
- ConsumeDriver,消费者驱。首先给消费者线程分配消费队列QueueBuffer,然后启动消费者线程消费采集到的数据。
- ConsumerThread,消费者线程。分配一个列表用来存放当前线程即将消费的数据,把QueueBuffer中的数据放到列表中,执行真正的数据处理逻辑。
- TraceSegmentServiceClient,执行真正的数据处理逻辑。将TraceSegment转换为GRPC定义的协议数据(traceId,traceSegmentId,spans等),通过GRPC调用远程接口将采集的数据传递给SK Collector。
- SK Collector,SkyWalking服务端的数据收集服务。
生产-消费全景图
具体插件分析
上面从抽象的组件角度分析了数据采集上报的过程,下面以一个插件为例看一下在什么时候触发 数据生产 的。
以常见的Controller方法的处理为例,分析插件RestControllerInstrumentation。
RestControllerInstrumentation.java
public class RestControllerInstrumentation extends AbstractControllerInstrumentation {
public static final String ENHANCE_ANNOTATION = "org.springframework.web.bind.annotation.RestController";
@Override
protected String[] getEnhanceAnnotations() {
return new String[] {ENHANCE_ANNOTATION};
}
}
看到自身并没有核心的逻辑,再看其父类AbstractControllerInstrumentation。
AbstractControllerInstrumentation.java
public abstract class AbstractControllerInstrumentation extends AbstractSpring5Instrumentation {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.mvc.v5.ControllerConstructorInterceptor";
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new DeclaredInstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return byMethodInheritanceAnnotationMatcher(named("org.springframework.web.bind.annotation.RequestMapping"));
}
@Override
public String getMethodsInterceptor() {
return Constants.REQUEST_MAPPING_METHOD_INTERCEPTOR;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new DeclaredInstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return byMethodInheritanceAnnotationMatcher(named("org.springframework.web.bind.annotation.GetMapping"))
.or(byMethodInheritanceAnnotationMatcher(named("org.springframework.web.bind.annotation.PostMapping")))
.or(byMethodInheritanceAnnotationMatcher(named("org.springframework.web.bind.annotation.PutMapping")))
.or(byMethodInheritanceAnnotationMatcher(named("org.springframework.web.bind.annotation.DeleteMapping")))
.or(byMethodInheritanceAnnotationMatcher(named("org.springframework.web.bind.annotation.PatchMapping")));
}
@Override
public String getMethodsInterceptor() {
return Constants.REST_MAPPING_METHOD_INTERCEPTOR;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return ClassAnnotationMatch.byClassAnnotationMatch(getEnhanceAnnotations());
}
protected abstract String[] getEnhanceAnnotations();
}
看到getInstanceMethodsInterceptPoints
方法是不是很熟悉,上一篇文章《SkyWalking如何通过修改字节码让插件生效》里面讲到自定义插件需要继承ClassInstanceMethodsEnhancePluginDefine
并重写其getInstanceMethodsInterceptPoints
方法。
事实上AbstractControllerInstrumentation也继承了ClassInstanceMethodsEnhancePluginDefine,说明他其实也是自定义的插件,只不过是SkyWalking内置的。
既然是插件那就重点看getInstanceMethodsInterceptPoints
指定的拦截器RestMappingMethodInterceptor。
RestMappingMethodInterceptor.java
public class RestMappingMethodInterceptor extends AbstractMethodInterceptor {
@Override
public String getRequestURL(Method method) {...}
}
发现本身也没有核心逻辑,继续看其父类AbstractMethodInterceptor。
AbstractMethodInterceptor.java
public abstract class AbstractMethodInterceptor implements InstanceMethodsAroundInterceptor {
...
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
...
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
try {
...
} finally {
ContextManager.stopSpan();
}
}
return ret;
}
...
}
注意到afterMethod
方法的finally语句块里出现了ContextManager.stopSpan();
而这正是上面分析数据生产的入口!
总结
通过这篇文章可以有以下收获:
Q:如何生产数据?
A:将采集到的TraceSegment,发送到内存数据队列。
Q:如何消费数据?
A:从消费线程内存数据队列拿数据,然后通过GRPC发送到SK Collector。
Q:什么时候生产数据?
A:在每个插件定义的逻辑最后会调用ContextManager.stopSpan()
来生产数据。
更多推荐
所有评论(0)