k8s 上 SpringGateway 服务发现路由无法转发到新启动的 pod 上
SpringGateWay 路由总体可以划分为以下三种。
k8s 上 SpringGateway 服务发现路由无法转发到新启动的 pod 上
SpringGateWay 路由
SpringGateWay 路由总体可以划分为以下三种。
- 静态路由
- 动态路由
- 服务发现路由
静态路由
所谓静态路由是指网关在启动时根据配置文件或者代码编写的方式配置好 Api 和路由之间的关系。 配置如下 ↓↓↓
配置文件方式
spring:
cloud:
gateway:
routes:
- id: pi-data-manager
uri: lb://pi-data-manager
predicates:
- Path=/pi-data-manager/**
动态路由
动态路由即指网关在启动时可以根据外部环境变化而变化。 如通过注册中心 或 数据库中对应的关系进行改变。 (因涉及到过多源码、不做演示)
服务发现路由
服务发现路由从本质上也是动态路由中的一种。通过配置文件 spring.cloud.gateway.discovery.locator.enabled = true 方式开始。
问题排查思路
1. 网关路由表没有更新
第一反应:路由表为什么没有更新? k8s 中是否存在服务发现的心跳机制? k8s 中是存在服务发现的监听或服务端和客户端的通知机制?
是否心跳机制异常中断? 是否网络问题?
查阅官方资料、源码等过程忽略。。。。。。。。。。。
以上问题最终灵感来源于 spring-cloud-kubernetes 官方问题单的讨论 https://github.com/spring-cloud/spring-cloud-kubernetes/issues/977。
该地址大概意思就是需要通过手动触发路由更新去解决。 其中附有一条如果需要自动触发可以参考 Eureka(灵感来源处)
2.方案对比
对比了 Eureka nacos 以及 k8s 两个版本关于服务发现的解决方案如下 ↓↓↓↓
Eureka
可以看到 initScheduledTasks 方法会初始化两个线程 CacheRefreshThread 和 HeartbeatThread , 默认每 30 秒调用一次
CacheRefreshThread 会发布 HeartbeatEvent
HeartbeatThread 更新 **lastSuccessfulHeartbeatTimestamp((时间戳
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
if (this.clientConfig.shouldFetchRegistry()) {
// 初始化 CacheRefreshThread() 线程 线程具体内容在下方
this.cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
if (this.clientConfig.shouldRegisterWithEureka()) {
this.heartbeatTask = new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
this.scheduler.schedule(this.heartbeatTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
}
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}
public void run() {
DiscoveryClient.this.refreshRegistry();
}
}
void refreshRegistry() {
// ....
boolean success = this.fetchRegistry(remoteRegionsModified);
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
// ......
this.onCacheRefreshed();
}
protected void onCacheRefreshed() {
super.onCacheRefreshed();
// 发送心跳事件
if (this.cacheRefreshedCount != null) { // might be called during construction and
// will be null
long newCount = this.cacheRefreshedCount.incrementAndGet();
log.trace("onCacheRefreshed called with count: " + newCount);
this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
}
}
nacos
在 nacos 源码中有一个类为 NacosWatch 其中有一个 start() 方法中通过任务调度器 30s 内 执行一次 nacosServicesWatch方法,该方法中向客户端发送心跳时间。 代码如下所示。
private long watchDelay = 30000;
// 30s 执行一次nacosServicesWatch方法
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
k8s 2.1.0版本
根据下图可以发现 spring-cloud-kubenetes-client-discovery 在 2.1.0 版本时大概只有SpringCloud 标准的 DiscoveryClient 客户端以及一些配置类等。并不存在一些心跳机制和监听机制。
k8s 3.0.3版本
如下如所示: spring-cloud-kubernetes 在3.x版本后新增加了 catalog 包。其中 KubernetesCatalogWatch 类中有以下一段代码。 主要是每隔 30s 发送一次HeartbeatEvent 心跳事件。 但是本次并没有打算升级版本采用这种方式解决问题。 因为这里的 30s 是由 @Scheduled 注解实现的。 该注解默认情况下是会出现线程中断永不执行的情况。 除非重启。 根因在于 @Scheduled 是一个核心线程只有 1 且没有任何策略的线程池。 具体修改方式在 解决方案链接下
@Scheduled(fixedDelayString = "${spring.cloud.kubernetes.discovery.catalogServicesWatchDelay:30000}")
void catalogServicesWatch() {
try {
List<EndpointNameAndNamespace> currentState = stateGenerator.apply(context);
if (!currentState.equals(catalogEndpointsState)) {
LOG.debug(() -> "Received endpoints update from kubernetesClient: " + currentState);
publisher.publishEvent(new HeartbeatEvent(this, currentState));
}
catalogEndpointsState = currentState;
}
catch (Exception e) {
LOG.error(e, () -> "Error watching Kubernetes Services");
}
}
解决方案
采用xxl-job的分片机制在每台机器节点上添加心跳机制弥补 spring-cloud-kubenetes 的缺失。
private final AtomicLong cacheRefreshedCount = new AtomicLong(0);
@XxlJob(value = "refreshRoutes")
public void refreshRoutes() {
if (this.cacheRefreshedCount != null) {
long newCount = this.cacheRefreshedCount.incrementAndGet();
XxlJobHelper.log("onCacheRefreshed called with count: " + newCount);
this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
}
}
更多推荐
所有评论(0)