前言

工作中使用 OpenFeign 进行跨服务调用,最近发现线上经常会遇到请求失败。

java.net.ConnectException: Connection refused: connect
复制代码

通过排查我们发现不是接口超时,而是有时候会请求到已经下线的服务导致报错。这多发生在服务提供者系统部署的时候,因为系统部署的时候会调用 Spring 容器 的 shutdown() 方法, Eureka Server 那里能够及时的剔除下线服务,但是我们上一篇文章中已经知道 readOnlyCacheMapreadWriteCacheMap 同步间隔是 30SClient 端拉取实例信息的间隔也是 30S,这就导致 Eureka Client 端存储的实例信息数据在一个临界时间范围内都是脏数据。

调整 Eureka 参数

既然由于 Eureka 本身的设计导致会存在服务实例信息延迟更新,那么我们尝试去修改几个参数来降低延迟

  • Client 端设置服务拉取间隔3S, eureka.client.registry-fetch-interval-seconds = 3
  • Server 端设置读写缓存同步间隔 3S,eureka.server.response-cache-update-interval-ms=3000

这样设置之后经过一段时间的观察发现情况有所改善,但还是存在这个问题,而且并没有改善多少。

LoadBalancer 如何获取实例信息

EurekaOpenFeign 的文章中都有提到,OpenFeign 进行远程调用的时候会通过负载均衡器选取一个实例发起 Http 请求。我们 SpringCloud 版本是 2020,已经移除了 ribbon,使用的是 LoadBalancer

通过 debug OpenFeign 调用的源码发现它是从 DiscoveryClientServiceInstanceListSupplier的构造方法获取实例信息集合 List<ServiceInstance> 的,内部调用到 CachingServiceInstanceListSupplier 构造方法,重点看 CacheFlux.lookup()

public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
   super(delegate);
   this.serviceInstances = CacheFlux.lookup(key -> {
      // TODO: configurable cache name
      Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
      if (cache == null) {
         if (log.isErrorEnabled()) {
            log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
         }
         return Mono.empty();
      }
      List<ServiceInstance> list = cache.get(key, List.class);
      if (list == null || list.isEmpty()) {
         return Mono.empty();
      }
      return Flux.just(list).materialize().collectList();
   }, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
         .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
            Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
            if (cache == null) {
               if (log.isErrorEnabled()) {
                  log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);
               }
            }
            else {
               cache.put(key, instances);
            }
         }).then());
}
复制代码

这里先去查缓存,缓存有就直接返回,缓存没有就去 CompositeDiscoveryClient.getInstances() 查询。查询完毕之后会回调到 CacheFlux.lookup(param,param2) 第二个参数的代码块,将结果放进缓存。

@Override
public List<ServiceInstance> getInstances(String serviceId) {
   if (this.discoveryClients != null) {
      for (DiscoveryClient discoveryClient : this.discoveryClients) {
         List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
         if (instances != null && !instances.isEmpty()) {
            return instances;
         }
      }
   }
   return Collections.emptyList();
}
复制代码

重点看这个方法,由于我们使用的是 Eureka 作为注册中心。所以这里会调用 EurekaDiscoveryClientgetInstances(), 最终我们发现底层其实就是从 DiscoveryClient.localRegionApps 获取的服务实例信息。

现在我们清楚了,OpenFeign 调用时,负载均衡策略还不是从 DiscoveryClient.localRegionApps 直接拿的实例信息,是自己缓存了一份。这样一来,不仅要计算 Eureka 本身的延迟,还要算上缓存时间。

SpringCloud 中有很多内存缓存的实现,这里我们选择的是 Caffine

<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>3.0.5</version>
</dependency>
复制代码

引入依赖即可自动配置,从 LoadBalancerCacheProperties 中我们能够发现默认的缓存时间是 35S,所以要解决我们的问题还需要降低缓存时间,也可以直接不使用内存缓存,每次都从 EurekaClient 拉取过来的实例信息读取即可。

通过上面的分析我们可以发现使用 OpenFeign 内部调用是无法根治这个问题的,因为 Eureka 的延迟是无法根治的,只能说在维持机器性能等各方面的前提下尽可能的缩短数据同步定时任务的时间间隔。所以我们可以换个角度,让调用失败的请求进行重试。

LoadBalancer 的两种负载均衡策略

通过源码调试,发现它有两种负载均衡策略 RoundRobinLoadBalancer、RandomLoadBalancer,轮询和随机,默认的策略是轮询

LoadBalancerClientConfiguration

@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
      LoadBalancerClientFactory loadBalancerClientFactory) {
   String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
   return new RoundRobinLoadBalancer(
         loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
复制代码

这两种策略都比较简单,没什么好说的。

轮询策略存在的问题

我们可以观察下轮询策略的实现,它有一个原子类型的成员变量,用来记录下一次请求要落到哪一个实例

final AtomicInteger position;
复制代码

核心逻辑

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
   if (instances.isEmpty()) {
      if (log.isWarnEnabled()) {
         log.warn("No servers available for service: " + serviceId);
      }
      return new EmptyResponse();
   }
   // TODO: enforce order?
   int pos = Math.abs(this.position.incrementAndGet());
   ServiceInstance instance = instances.get(pos % instances.size());
   return new DefaultResponse(instance);
}
复制代码

可以看到实现逻辑很简单,用 position 自增,然后实例数量进行求余,达到轮询的效果。乍一看好像没问题,但是它存在这样一种情况。现在我们有两个实例 192.168.1.121、192.168.1.122,这时候两个请求 A、B 过来,A 请求了 121 的,B 请求了 122 的,然后 A 请求失败了触发重试,由于轮询机制 A 重试的实例又回到了 121 ,这样就有问题了,因为还是失败,我们要让重试的请求一定能重试到其他的服务实例。

使用 TraceId 实现自定义负载均衡策略

因为重试的时候是在 OpenFeign 内部重新发起了一次 HTTP 请求,所以 traceId 并没有变,我们可以先从 MDC 上下文获取 traceId,再从缓存中获取 traceId 对应的值,如果没有就随机生成一个数字然后和 RoundRobinLoadBalancer 一样自增求余,如果缓存中已经有了就直接自增求余,这样就一定能重试到不同的实例。

这里我们缓存组件还是使用 Caffeine

private final LoadingCache<String, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
      .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
复制代码
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
   if (serviceInstances.isEmpty()) {
      log.warn("No servers available for service: " + serviceId);
      return new EmptyResponse();
   }

   String traceId = MDC.get("traceId");
   if (traceId == null) {
      traceId = UUID.randomUUID().toString();
   }

   AtomicInteger seed = positionCache.get(traceId);
   int s = seed.getAndIncrement();
   int pos = s % serviceInstances.size();
   return new DefaultResponse(serviceInstances.stream()
         .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
         .collect(Collectors.toList()).get(pos));
}
复制代码

完了之后声明我们自己的负载均衡器的 Bean

public class FeignLoadBalancerConfiguration {

    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSuppliers, Environment environment) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinRetryDifferentInstanceLoadBalancer(serviceInstanceListSuppliers,name);
    }
}
复制代码

之后在主启动类上使用 @LoadBalancerClient 指定我们自定义的负载均衡器

@LoadBalancerClient(name = "feign-test-product", configuration = FeignLoadBalancerConfiguration.class)
复制代码

设置 LoadBalancer Zone

还记得之前 Eureka 我们为了解决本机调用的时候会通过负载均衡调用到开发环境的机器设置了 zoneSpringCloud LoadBalancer 也提供了这个配置,并且从源码中我们可以发现,最终会以 LoadBalancer 设置的为准,如果没有为它设置,那么会使用 Eureka 中的 zone 配置,如果设置了就会覆盖 Eurekazone 设置

EurekaLoadBalancerClientConfiguration.postprocess()

@PostConstruct
public void postprocess() {
   if (!StringUtils.isEmpty(zoneConfig.getZone())) {
      return;
   }
   String zone = getZoneFromEureka();
   if (!StringUtils.isEmpty(zone)) {
      if (LOG.isDebugEnabled()) {
         LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
      }
      zoneConfig.setZone(zone);
   }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐