响应式微服务Spring Cloud与Spring WebFlux(二)之 链路跟踪
上篇文章我们讲到了Reactive Feign,它在响应式微服务中替换了阻塞模型的Open Feign。今天我们要讨论的是响应式的链路跟踪。在微服务架构中,原来的单体被划分为多个细小的模块部署,一个请求可能需要横跨多个小的服务才能完成它所要实现的功能。在传统阻塞模型中有很多优秀的框架可以供我们解决这个问题,比如Zipkin,SkyWalking等等。那在Spring Webflux中他们表现如何呢
上篇文章我们讲到了Reactive Feign,它在响应式微服务中替换了阻塞模型的Open Feign。今天我们要讨论的是响应式的链路跟踪。在微服务架构中,原来的单体被划分为多个细小的模块部署,一个请求可能需要横跨多个小的服务才能完成它所要实现的功能。在传统阻塞模型中有很多优秀的框架可以供我们解决这个问题,比如Zipkin,SkyWalking等等。那在Spring Webflux中他们表现如何呢?
Zipkin,SkyWalking对Spring WebFlux的支持
Zipkin
Zipkin是在Spring Cloud中是使用 Spring Cloud Sleuth进行链路跟踪的。使用Spring的组件当然会对Spring自己家的产品有很好的支持,所以Zipkin对于Spring WebFlux是支持的。
SkyWalking
SkyWalking是使用探针的方式进行链路跟踪。具体的原理还没有仔细了解过,所以就直接去GitHub的issue中看它的Spring WebFlux的支持吧。看下图确实在Spring WebFlux中使用SkyWalking有一些问题。作者对关于Spring WebFlux的问题的处理态度也是:
我想表达的意思只是SkyWalking对于Spring Webflux的支持可能不是那么完善,作者的精力也是有限的。如果你想让SkyWalking对Spring WebFlux支持更加友好那么你可以做这方面的工作。所以,如果现在先在Spring WebFlux中使用SkyWaking还是要观望一些。
那么看到这里是不是觉得Zipkin就是我们的唯一选择了?最后笔者也没有选用Zipkin,原因就是我为什么要使用Zipkin?链路跟踪的意义是知道一个请求在各个服务之间调用的情况,避免出现问题不知道是哪个服务的问题。仅此而已。在微服务架构中,除了这个问题还有像日志统一管理等各种各样的问题。不可能为了一个问题引入一个框架,导致系统的维护成本增加。所以最后决定自己去实现一个简单的链路跟踪。
传统模式Spring MVC下实现
我们可以在每次请求到达网关时生成一个唯一请求id,这个请求到每个服务时都会携带这个id,在日志打印时将这个id打印出来。这样我们就可以清楚的知道每个请求的链路了。这个方案中我们有几个问题要解决:
- 通过什么方式生成唯一请求id?
- 怎么将这个id携带到每一个服务中?
- 日志打印需要每次自己手动打印吗?
前两个问题我会在后面实现Spring WebFlux链路跟踪中进行解答。
第三个问题是不需要自己手动每次打印,设置好slf4j的打印格式就可以像打印线程名,日期一样自动打印了。具体实现需要使用slf4j的MDC,像下面那样设置。
MDC.put("mdc_trace_id", traceId);
这样在拦截器中将每次请求的id设置进MDC就完成了第一步。接着再日志格式配置的地方添加%X{trace_id}就可以了。
pattern>%d{HH:mm:ss.SSS} [%-5p] [Thread: %t] %X{trace_id} %c:%L - %m%n</pattern>
在传统模式下实现链路跟踪还是很容易的,最后你需要做的就是将这些日志进行集中管理,方便查看。
响应式Spring WebFlux下实现
在响应式中的实现方案和传统Spring MVC下一样。也是每次请求到达网关时生成一个唯一请求id,这个请求到每个服务时都会携带这个id,在日志打印时将这个id打印出来。所以它也会有如下的几个问题:
- 通过什么方式生成唯一请求id?
- 怎么将这个id携带到每一个服务中?
- 日志打印需要每次自己手动打印吗?
通过什么方式生成唯一的请求id?
在Spring WebFlux官方文档中有提到log id。
Log Id
In WebFlux, a single request can be run over multiple threads and the thread ID is not useful for correlating log messages that belong to a specific request. This is why WebFlux log messages are prefixed with a request-specific ID by default.
On the server side, the log ID is stored in the
ServerWebExchange
attribute (LOG_ID_ATTRIBUTE), while a fully formatted prefix based on that ID is available fromServerWebExchange#getLogPrefix()
. On theWebClient
side, the log ID is stored in theClientRequest
attribute (LOG_ID_ATTRIBUTE) ,while a fully formatted prefix is available fromClientRequest#logPrefix()
.
官方对于这个id功能的解释是由于WebFlux在处理一个请求时会涉及多次线程的切换,所以线程id对于一个请求它的日志信息关联的作用就不大了(在spring MVC中基本很少发生线程切换,线程id就可以关联到这个请求的所有日志)。这个log id保存在ServerWebExchange中,在WebClient中也有保存。
我们如果使用这个log id作为请求的唯一id,那它有没有唯一性的保证,不会是每次都是从1开始然后开始自增的吧?我们可以去源码中看看log id的生成规则。
在org.springframework.web.server.adapter.DefaultServerWebExchange的构造方法中对ServerWebExchange.LOG_ID_ATTRIBUTE设置初始值。
this.attributes.put(ServerWebExchange.LOG_ID_ATTRIBUTE, request.getId());
@Override
@Nullable
protected String initId() {
if (this.request instanceof Connection) {
return ((Connection) this.request).channel().id().asShortText() +
"-" + logPrefixIndex.incrementAndGet();
}
return null;
}
@Override
public String asShortText() {
String shortValue = this.shortValue;
if (shortValue == null) {
this.shortValue = shortValue = ByteBufUtil.hexDump(data, data.length - RANDOM_LEN, RANDOM_LEN);
}
return shortValue;
}
--data的生成规则,机器mac号-进程号-序列号-随机数
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
我看可以看一个大概,它确实是有使用自增,但是不是从1开始的。log id使用的request的id,request的id又使用的是channel id,这里的channel是netty中的channel。最后,我们可以看到channel id的生成需要使用到机器号,进程号,随机数,序列号,时间戳等。然后进行缩短再加一个自增数。可以说在一个微服务体系中重复的概率很低,在构成中有时间戳不会导致后面的序号和前面重复,机器号和进程号保证了不会和其他服务重复。所以log id完全可以用来作为请求的id(Spring WebFlux也确是使用它作为请求id)。
怎么将这个id携带到每一个服务中?
我们可以将log id放入Spring WebFlux上下文中(上下文怎么使用请看这篇),然后自动将它加入到Reactive Feign的头中(Reactive Feign请看这篇)。在被调用方的拦截器中再将log id放入到上下文中,并设置到ServerWebExchange的ServerWebExchange.LOG_ID_ATTRIBUTE属性中,将它自动生成的覆盖掉。这样就完成了log id的传递。
日志打印需要每次自己手动打印吗?
在Spring WebFlux中我们可以使用slf4j的MDC吗?可以,也不可以。在拦截器中我们可以将log id设置到MDC,但是由于MDC是使用ThreadLocal实现的,所以它只对当前线程有用,如果请求发生线程切换那就会打不出trace_id。同时也记得请求结束后清除MDC的trace_id,所以MDC在这里就显得有点鸡肋了。建议在重点的地方(比如异常)需要记录trace_id便于排除问题和用户行为分析的地方,手动的从上下文中或exchange.getLogPrefix()获取打印。
代码实现
上面说了这么多,可能看的有点头晕。下面展示一下具体的代码实现。
Spring cloud Gateway中的拦截器:
/**
* @description:
* @author: lc
* @createDate: 2021/2/14
*/
@Component
@Slf4j
public class OutLogFilter implements GlobalFilter, Ordered {
private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());
--设置log id到请求头
Consumer<HttpHeaders> httpHeaders = httpHeader -> {
httpHeader.set(SystemConstant.TRACE_ID, exchange.getAttribute(ServerWebExchange.LOG_ID_ATTRIBUTE));
};
ServerHttpRequest serverHttpRequest = exchange.getRequest().mutate().headers(httpHeaders).build();
exchange.mutate().request(serverHttpRequest).build();
return chain.filter(exchange).subscriberContext(ctx -> {
log.info("设置log{}", exchange.getLogPrefix());
return ctx.put(SystemConstant.TRACE_ID, exchange.getAttribute(ServerWebExchange.LOG_ID_ATTRIBUTE));
}).onErrorResume(error -> {
return deal(exchange).then(Mono.error(error));
}).then(deal(exchange));
}
--打印请求日志
private Mono deal(ServerWebExchange exchange) {
return Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(REQUEST_TIME_BEGIN);
if (startTime != null) {
StringBuilder sb = new StringBuilder("请求地址:")
.append(exchange.getRequest().getURI().getRawPath())
.append(" 耗时: ")
.append(System.currentTimeMillis() - startTime)
.append("ms");
DataBuffer dataBuffer = (DataBuffer) exchange.getAttributes()
.get(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR);
if (dataBuffer != null) {
String s = dataBuffer.toString(Charset.forName("UTF-8"));
sb.append(" body:").append(s);
}
sb.append(" params:").append(exchange.getRequest().getQueryParams());
exchange.getResponse();
log.info(exchange.getLogPrefix() + sb.toString());
}
});
}
@Override
public int getOrder() {
return -100000;
}
}
被调用方拦截器 :
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnMissingBean(name = "outAuthFilter")
public class LogFilter implements OrderedWebFilter {
private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";
@Override
public int getOrder() {
return -200;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String headTraceId = exchange.getRequest().getHeaders().getFirst(SystemConstant.TRACE_ID);
log.info("获取头的log id .{}", headTraceId);
final String traceId;
if (headTraceId == null) {
traceId = exchange.getAttribute(ServerWebExchange.LOG_ID_ATTRIBUTE);
} else {
traceId = headTraceId;
}
exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());
exchange.getAttributes().put(LOG_ID_ATTRIBUTE, traceId);
return chain.filter(exchange).subscriberContext(ctx -> {
log.info("设置log:{}", traceId);
return ctx.put(SystemConstant.TRACE_ID, traceId);
});
}
}
总结与展望
本文描述了实现链路跟踪的方式,目前来说还没有很优雅的方式像阻塞模型MDC那样自动的打印log id。还是需要手动的方式去处理,相信后面一些日志框架会增加响应式这方面的特性。将log id打印出来后,我们可以使用日志收集器将日志汇集到一起,然后分析日志。这里笔者考虑后面使用logstash+clickhouse来实现。
感谢阅读,希望对你有帮助。
参考资料
更多推荐
所有评论(0)