博客园Logo
首页
新闻
博问
专区
闪存
班级

代码改变世界
搜索
注册
登录
bojiangzhou

英雄修身齐家治国平天下

随笔 - 49 文章 - 12 评论 - 432 0
博客园 首页 新随笔 联系 管理
SpringCloud 源码系列(2)—— 注册中心 Eureka(中)

目录

五、服务注册
1、实例信息注册器初始化
2、客户端实例注册
3、Eureka Server 接收注册请求
4、Eureka Server 控制台
5、服务注册的整体流程图
六、抓取注册表
1、Eureka Client 启动时全量抓取注册表
2、Eureka Server 注册表多级缓存机制
3、Eureka Server 注册表多级缓存过期机制
4、Eureka Client 定时拉取增量注册表
5、Eureka Server 返回增量注册表
7、Eureka 抓取注册表总体流程图
七、服务续约
1、Eureka Client 定时发送心跳
2、Eureka Server 接收心跳续约
八、服务下线
1、Eureka Client 下线
2、Eureka Server 服务下线
九、服务故障
1、摘除实例的定时任务初始化
2、定时摘除过期的实例
3、如何判断实例是否过期
十、自我保护机制
1、摘除实例时的自我保护机制
2、自我保护机制导致实例未下线的情况
3、最近一分钟计数器的设计
4、服务故障摘除和自我保护机制图

回到顶部
五、服务注册
1、实例信息注册器初始化

服务注册的代码位置不容易发现,我们看 DiscoveryClient 初始化调度任务的这个方法,这段代码会去初始化一个实例信息复制器 InstanceInfoReplicator,这个复制器就包含了实例的注册(明明是注册却叫 Replicator 感觉怪怪的)。

① DiscoveryClient 初始化调度器的流程

先基于 DiscoveryClient、InstanceInfo 构造 InstanceInfoReplicator,然后还有两个参数为实例信息复制间隔时间(默认30秒)、并发的数量(默认为2)。
创建了一个实例状态变更监听器,并注册到 ApplicationInfoManager。当实例状态变更时,就会触发这个监听器,并调用 InstanceInfoReplicator 的 onDemandUpdate 方法。
启动 InstanceInfoReplicator,默认延迟40秒,也就是说服务启动可能40秒之后才会注册到注册中心。
复制代码
1 private void initScheduledTasks() {
2 // 省略定时刷新注册表的任务…
3
4 if (clientConfig.shouldRegisterWithEureka()) {
5 // 省略定时心跳的任务…
6
7 // 实例信息复制器,用于定时更新自己状态,并向注册中心注册
8 instanceInfoReplicator = new InstanceInfoReplicator(
9 this,
10 instanceInfo,
11 clientConfig.getInstanceInfoReplicationIntervalSeconds(),
12 2); // burstSize
13
14 // 实例状态变更的监听器
15 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
16 @Override
17 public String getId() {
18 return “statusChangeListener”;
19 }
20
21 @Override
22 public void notify(StatusChangeEvent statusChangeEvent) {
23 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
24 logger.error(“Saw local status change event {}”, statusChangeEvent);
25 } else {
26 logger.info(“Saw local status change event {}”, statusChangeEvent);
27 }
28 instanceInfoReplicator.onDemandUpdate();
29 }
30 };
31
32 // 向 ApplicationInfoManager 注册状态变更监听器
33 if (clientConfig.shouldOnDemandUpdateStatusChange()) {
34 applicationInfoManager.registerStatusChangeListener(statusChangeListener);
35 }
36
37 // 启动实例信息复制器,默认延迟时间40秒
38 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
39 } else {
40 logger.info(“Not registering with Eureka server per configuration”);
41 }
42 }
复制代码
② InstanceInfoReplicator 的构造方法

创建了一个单线程的调度器
设置 started 为 false
创建了以分钟为单位的限流器,每分钟默认最多只能调度4次
复制代码
1 InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
2 this.discoveryClient = discoveryClient;
3 this.instanceInfo = instanceInfo;
4 // 单线程的调度器
5 this.scheduler = Executors.newScheduledThreadPool(1,
6 new ThreadFactoryBuilder()
7 .setNameFormat(“DiscoveryClient-InstanceInfoReplicator-%d”)
8 .setDaemon(true)
9 .build());
10
11 this.scheduledPeriodicRef = new AtomicReference();
12 // started 设置为 false
13 this.started = new AtomicBoolean(false);
14 // 以分钟为单位的限流器
15 this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
16 // 间隔时间,默认为30秒
17 this.replicationIntervalSeconds = replicationIntervalSeconds;
18 this.burstSize = burstSize;
19 // 允许每分钟更新的频率 60 * 2 / 30 = 4
20 this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
21 logger.info(“InstanceInfoReplicator onDemand update allowed rate per min is {}”, allowedRatePerMinute);
22 }
复制代码
③ 启动 InstanceInfoReplicator

将 started 设置为 true,代表已经启动了
调用 instanceInfo.setIsDirty() 方法,将实例设置为 dirty=true,并更新了最后一次设置 dirty 的时间戳
InstanceInfoReplicator 实现了 Runnable,它本身被当成任务来调度,然后延迟40秒开始调度当前任务,并将 Future 放到本地变量中
复制代码
1 public void start(int initialDelayMs) {
2 // 启动时 started 设置为 true
3 if (started.compareAndSet(false, true)) {
4 // 设置为 dirty,便于下一次心跳时同步到 eureka server
5 instanceInfo.setIsDirty();
6 // 延迟40秒后开始调度当前任务
7 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
8 // 将 Future 放到本地变量中
9 scheduledPeriodicRef.set(next);
10 }
11 }
12
13 ///////
14
15 public synchronized void setIsDirty() {
16 isInstanceInfoDirty = true;
17 lastDirtyTimestamp = System.currentTimeMillis();
18 }
复制代码
2、客户端实例注册

① 实现注册的run方法

接着看 InstanceInfoReplicator 的 run 方法,这个方法就是完成注册的核心位置。

首先会更新实例的信息,如果有变更就会设置 dirty=true
如过是 dirty 的,就会调用 DiscoveryClient 的 register 方法注册实例
实例注册后,就把 dirty 设置为 false
最后在 finally 中继续下一次的调度,默认是每隔30秒调度一次,注意他这里是把调度结果 Future 放到本地变量中
复制代码
1 public void run() {
2 try {
3 // 更新本地实例信息,如果实例信息有变更,则 dirty=true
4 discoveryClient.refreshInstanceInfo();
5
6 // 设置为 dirty 时的时间戳
7 Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
8 if (dirtyTimestamp != null) {
9 // 注册实例
10 discoveryClient.register();
11 // 设置 dirty=false
12 instanceInfo.unsetIsDirty(dirtyTimestamp);
13 }
14 } catch (Throwable t) {
15 logger.warn(“There was a problem with the instance info replicator”, t);
16 } finally {
17 // 30秒之后再调度
18 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
19 scheduledPeriodicRef.set(next);
20 }
21 }
复制代码
② 实例信息刷新

再来细看下 refreshInstanceInfo 刷新实例信息的方法:

首先刷新了数据中心的信息
然后刷新续约信息,主要就是将 EurekaClientConfig 的续约配置与本地的续约配置做对比,如果变更了就重新创建续约信息,并设置实例为dirty。这种情况一般就是运行期间动态更新实例的配置,然后重新注册实例信息。
接着使用健康检查器检查实例健康状况,从 getHealthCheckHandler 这段代码进去不难发现,我们可以自定义健康检查器,例如当本地的一些资源未创建成功、某些核心线程池down了就认为实例不可用,这个时候就可以自定义健康检查器。如果没有自定义健康检查器,那就直接返回实例当前的状态。我们可以实现 HealthCheckHandler 接口自定义健康检查器。
最后就会调用 ApplicationInfoManager 的 setInstanceStatus 设置实例状态,会判断如果状态发生变更,就会发出状态变更的通知,这样就会触发前面定义的状态变更监听器,然后调用 InstanceInfoReplicator 的 onDemandUpdate 方法。
View Code
③ 向 eureka server 注册

在 run 方法里调用了 discoveryClient.register() 方法实现了客户端实例向注册中心的注册,进入到 register 方法可以看到,他就是使用前面构造的 EurekaTransport 来发起远程调用。

一层层进去,很容易发现就是调用了 eureka-server 的 POST /apps/{appName} 接口,后面我们就从 eureka-core 中找这个接口就可以找到注册中心实现服务注册的入口了。

View Code
④ 注册中心设置实例状态为已启动

再回想下注册中心的初始化流程,在最后调用 openForTraffic 方法时,最后也会调用 ApplicationInfoManager 的 setInstanceStatus 方法,将实例状态设置为已启动,这个时候就会触发客户端注册到注册中心的动作。

applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
⑤ 完成监听实例变更的方法

状态变更器会调用 onDemandUpdate 方法来完成实例状态变更后的逻辑。

它这里一个是用到了限流器来限制每分钟这个方法只能被调用4次,即避免了频繁的注册行为
然后在调度时,它会从本地变量中取出上一次调度的 Future,如果任务还没执行完,它会直接取消掉
最后就是调用 run 方法,完成服务的注册
View Code
⑥ 限流器

最后简单看下限流器 RateLimiter 的设计:

从它的注释中可以看出,eureka 的 RateLimiter 是基于令牌桶算法实现的限流器
acquire 方法有两个参数:
burstSize:允许以突发方式进入系统的最大请求数
averageRate:设置的时间窗口内允许进入的请求数
View Code
3、Eureka Server 接收注册请求

① 找到实例注册的API入口

从前面的分析中,我们知道服务端注册的API是 POST /apps/{appName},由于 eureka 是基于 jersey 来通信的,想找到API入口还是有点费劲的,至少没有 springmvc 那么容易。

先看 ApplicationsResource 这个类,可以找到 getApplicationResource 这个方法的路径是符合 /apps/{appName} 这个规则的。然后可以看到它里面创建了 ApplicationResource,再进入到这个类里面,就可以找到 @Post 标注的 addInstance 方法,这就是注册的入口了。可以看到它是调用了注册表的 register 方法来注册实例的。

复制代码
1 @Path("/{version}/apps")
2 @Produces({“application/xml”, “application/json”})
3 public class ApplicationsResource {
4 private final EurekaServerConfig serverConfig;
5 private final PeerAwareInstanceRegistry registry;
6 private final ResponseCache responseCache;
7
8 // 符合规则 /apps/{appName}
9 @Path("{appId}")
10 public ApplicationResource getApplicationResource(
11 @PathParam(“version”) String version,
12 @PathParam(“appId”) String appId) {
13 CurrentRequestVersion.set(Version.toEnum(version));
14 try {
15 // 真正的入口
16 return new ApplicationResource(appId, serverConfig, registry);
17 } finally {
18 CurrentRequestVersion.remove();
19 }
20 }
21 }
22
23 /////////////////////////////////
24
25 @Produces({“application/xml”, “application/json”})
26 public class ApplicationResource {
27
28 private final PeerAwareInstanceRegistry registry;
29
30 @POST
31 @Consumes({“application/json”, “application/xml”})
32 public Response addInstance(InstanceInfo info,
33 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
34 logger.debug(“Registering instance {} (replication={})”, info.getId(), isReplication);
35
36 registry.register(info, “true”.equals(isReplication));
37 return Response.status(204).build(); // 204 to be backwards compatible
38 }
39 }
复制代码
addInstance 接口有两个参数:

InstanceInfo:服务实例,主要有两块数据:
基本信息:主机名、IP地址、端口号、URL地址
租约信息:保持心跳的间隔时间、最近心跳的时间、服务注册的时间、服务启动的时间
isReplication:这个参数是从请求头中取的,表示是否是在同步 server 节点的实例。在集群模式下,因为客户端实例注册到注册中心后,会同步到其它 server节点,所以如果是eureka-server之间同步信息,这个参数就为 true,避免循环同步。
② 实例注册

进入到注册表的 register 方法,可以看到主要就是调用父类的 register 方法注册实例,然后同步到 eureka server 集群中的其它 server 节点。集群同步放到后面来看,现在只需要知道注册实例时会同步到其它server节点即可。

复制代码
1 @Override
2 public void register(final InstanceInfo info, final boolean isReplication) {
3 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
4 // 如果实例中没有周期的配置,就设置为默认的 90 秒
5 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
6 leaseDuration = info.getLeaseInfo().getDurationInSecs();
7 }
8 // 注册实例
9 super.register(info, leaseDuration, isReplication);
10 // 复制到集群其它 server 节点
11 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
12 }
复制代码
接着看父类的注册方法,它的主要流程如下:

首先可以看到eureka server保存注册表(registry)的数据结构是 ConcurrentHashMap<String, Map<String, Lease>>,key 就是服务名称,value 就是对应的实例,因为一个服务可能会部署多个实例。
根据服务名称从注册表拿到实例表,然后根据实例ID拿到实例的租约信息 Lease
如果租约信息存在,说明已经注册过相同的实例了,然后就对比已存在实例和新注册实例的最后更新时间,如果新注册的是旧的,就替换为已存在的实例来完成注册
如果租约信息不存在,说明是一个新注册的实例,这时会更新两个阈值:
期望续约的客户端数量 +1
每分钟续约次数的阈值,如果低于这个值,说明有很多客户端没有发送心跳,这时eureka就认为可能网络出问题了,就会有另一些机制,这个后面再说
然后就根据注册的实例信息和续约周期创建新的租约,并放入注册表中去
接着根据当前时间戳、服务名称、实例ID封装一个 Pair,然后放入到最近注册的队列中 recentRegisteredQueue,先记住这个队列就行了
根据实例的 overriddenStatus 判断,不为空的话,可能就只是要更新实例的状态,这个时候就会只变更实例的状态,而不会改变 dirty
然后是设置了实例的启动时间戳,设置了实例的 ActionType 为 ADDED
将租约加入到最近变更的队列 recentlyChangedQueue,先记住这个队列
最后一步失效缓存,一步步进去可以发现,主要就是将读写缓存 readWriteCacheMap 中与这个实例相关的缓存失效掉,这个缓存后面分析抓取注册表的时候再来细看
View Code
更新每分钟续约次数的阈值:

复制代码
1 protected void updateRenewsPerMinThreshold() {
2 // 每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约百分比
3 // 例如,一共注册了 10 个实例,那么期望续约的客户端数量为 10,间隔时间默认为 30秒,就是每个客户端应该每30秒发送一次心跳,续约百分比默认为 0.85
4 // 每分钟续约次数阈值 = 10 * (60.0 / 30) * 0.85 = 17,也就是说每分钟至少要接收到 17 此续约请求
5 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
6 * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
7 * serverConfig.getRenewalPercentThreshold());
8 }
复制代码
这就是注册表 registry 缓存服务实例信息的结构,可以看出 eureka 是基于内存来组织注册表的,使用的是 ConcurrentHashMap 来保证多线程并发安全。

4、Eureka Server 控制台

前面已经将服务实例注册上去了,现在来看下 eureka server 的控制台页面是怎么获取这些数据的。

前面已经分析过 eureka-server 的 web.xml 中配置了欢迎页为 status.jsp ,这就是控制台的页面。

从 status.jsp 可以看出,其实就是从 EurekaServerContext 上下文获取注册表,然后读取注册表注册的服务实例,然后遍历展示到表格中。

View Code
5、服务注册的整体流程图

下面通过一张图来看看服务实例注册的整个流程。

回到顶部
六、抓取注册表
1、Eureka Client 启动时全量抓取注册表

客户端启动初始化 DiscoveryClient 时,其中有段代码如下:这一步调用 fetchRegistry 就是在启动时全量抓取注册表缓存到本地中。

复制代码
1 if (clientConfig.shouldFetchRegistry()) {
2 try {
3 // 拉取注册表:全量抓取和增量抓取
4 boolean primaryFetchRegistryResult = fetchRegistry(false);
5 if (!primaryFetchRegistryResult) {
6 logger.info(“Initial registry fetch from primary servers failed”);
7 }
8 boolean backupFetchRegistryResult = true;
9 if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
10 backupFetchRegistryResult = false;
11 logger.info(“Initial registry fetch from backup servers failed”);
12 }
13 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
14 throw new IllegalStateException(“Fetch registry error at startup. Initial fetch failed.”);
15 }
16 } catch (Throwable th) {
17 logger.error(“Fetch registry error at startup: {}”, th.getMessage());
18 throw new IllegalStateException(th);
19 }
20 }
复制代码
进入 fetchRegistry 方法,可以看到,首先获取本地的 Applications,如果为空就会调用 getAndStoreFullRegistry 方法全量抓取注册表并缓存到本地。

View Code
进入 getAndStoreFullRegistry 方法可以发现,就是调用 GET /apps 接口抓取全量注册表,因此等会服务端就从这个入口进去看抓取全量注册表的逻辑。注册表抓取回来之后,就放到本地变量 localRegionApps 中。

View Code
2、Eureka Server 注册表多级缓存机制

① 全量抓取注册表的接口

全量抓取注册表的接口是 GET /apps,跟找注册接口是类似的,最终可以找到 ApplicationsResource 的 getContainers 方法就是全量抓取注册表的入口。

可以看出,我们可以通过请求头来指定返回 xml 格式还是 json 格式,可以指定是否要压缩返回等。
然后创建了全量缓存的 Key
接着根据缓存的 key 从 responseCache 中全量抓取注册表
复制代码
1 @GET
2 public Response getContainers(@PathParam(“version”) String version,
3 @HeaderParam(HEADER_ACCEPT) String acceptHeader,
4 @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
5 @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
6 @Context UriInfo uriInfo,
7 @Nullable @QueryParam(“regions”) String regionsStr) {
8 // 省略部分代码…
9
10 // JSON 类型
11 KeyType keyType = Key.KeyType.JSON;
12 String returnMediaType = MediaType.APPLICATION_JSON;
13 if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
14 keyType = Key.KeyType.XML;
15 returnMediaType = MediaType.APPLICATION_XML;
16 }
17
18 // 全量注册表的缓存key
19 Key cacheKey = new Key(Key.EntityType.Application,
20 ResponseCacheImpl.ALL_APPS,
21 keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
22 );
23
24 Response response;
25 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
26 // 压缩返回
27 response = Response.ok(responseCache.getGZIP(cacheKey))
28 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
29 .header(HEADER_CONTENT_TYPE, returnMediaType)
30 .build();
31 } else {
32 // 根据缓存 key 从 responseCache 获取全量注册表
33 response = Response.ok(responseCache.get(cacheKey))
34 .build();
35 }
36 CurrentRequestVersion.remove();
37 return response;
38 }
复制代码
② ResponseCache 多级缓存读取

ResponseCache 就是 eureka server 读取注册表的核心组件,它的内部采用了多级缓存的机制来快速响应客户端抓取注册表的请求,下面就来看看 ResponseCache。

缓存读取的流程:

如果设置了使用只读缓存(默认true),就先从只读缓存 readOnlyCacheMap 中读取;readOnlyCacheMap 使用 ConcurrentHashMap 实现,ConcurrentHashMap 支持并发访问,读取速度很快。
如果读写缓存中没有,就从读写缓存 readWriteCacheMap 中读取,读取出来后并写入到只读缓存中;readWriteCacheMap 使用 google guava 的 LoadingCache 实现,LoadingCache 支持在没有元素的时候使用 CacheLoader 加载元素。
如果没有开启使用只读缓存,就直接从读写缓存中获取。
复制代码
1 public String get(final Key key) {
2 return get(key, shouldUseReadOnlyResponseCache);
3 }
4
5 ////////////////////////////////////////////////////
6
7 String get(final Key key, boolean useReadOnlyCache) {
8 // => getValue
9 Value payload = getValue(key, useReadOnlyCache);
10 if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
11 return null;
12 } else {
13 return payload.getPayload();
14 }
15 }
16
17 ////////////////////////////////////////////////////
18
19 Value getValue(final Key key, boolean useReadOnlyCache) {
20 Value payload = null;
21 try {
22 if (useReadOnlyCache) {
23 // 开启使用只读缓存,则先从只读缓存读取
24 // readOnlyCacheMap => ConcurrentHashMap<Key, Value>
25 final Value currentPayload = readOnlyCacheMap.get(key);
26 if (currentPayload != null) {
27 payload = currentPayload;
28 } else {
29 // 只读缓存中没有,则从读写缓存中读取,然后放入只读缓存中
30 // readWriteCacheMap => LoadingCache<Key, Value>
31 payload = readWriteCacheMap.get(key);
32 readOnlyCacheMap.put(key, payload);
33 }
34 } else {
35 // 未开启只读缓存,就从读写缓存中读取
36 payload = readWriteCacheMap.get(key);
37 }
38 } catch (Throwable t) {
39 logger.error(“Cannot get value for key : {}”, key, t);
40 }
41 return payload;
42 }
复制代码
③ ResponseCache 初始化

分析 eureka server EurekaBootStrap 启动初始化时,最后有一步去初始化 eureka server 上下文,它里面就会去初始化注册表,初始化注册表的时候就会初始化 ResponseCache,这里就来分析下这个初始化干了什么。

主要就是使用 google guava cache 构造了一个读写缓存 readWriteCacheMap,初始容量为 1000。注意这个读写缓存的特性:每隔 180 秒定时过期,然后元素不存在的时候就会使用 CacheLoader 从注册表中读取。
接着如果配置了使用只读缓存,还会开启一个定时任务,每隔30秒将读写缓存 readWriteCacheMap 的数据同步到只读缓存 readOnlyCacheMap。
复制代码
1 ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
2 this.serverConfig = serverConfig;
3 this.serverCodecs = serverCodecs;
4 // 是否使用只读缓存,默认为 true
5 this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
6 // 保存注册表
7 this.registry = registry;
8 // 缓存更新间隔时间,默认30秒
9 long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
10 // 使用 google guava cache 构造一个读写缓存
11 this.readWriteCacheMap =
12 // 初始容量为1000
13 CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
14 // 缓存的数据在写入多久后过期,默认180秒,也就是说 readWriteCacheMap 会定时过期
15 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
16 .removalListener(new RemovalListener<Key, Value>() {
17 @Override
18 public void onRemoval(RemovalNotification<Key, Value> notification) {
19 Key removedKey = notification.getKey();
20 if (removedKey.hasRegions()) {
21 Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
22 regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
23 }
24 }
25 })
26 // 当key对应的元素不存在时,使用定义 CacheLoader 加载元素
27 .build(new CacheLoader<Key, Value>() {
28 @Override
29 public Value load(Key key) throws Exception {
30 if (key.hasRegions()) {
31 Key cloneWithNoRegions = key.cloneWithoutRegions();
32 regionSpecificKeys.put(cloneWithNoRegions, key);
33 }
34 // 获取元素
35 Value value = generatePayload(key);
36 return value;
37 }
38 });
39
40 if (shouldUseReadOnlyResponseCache) {
41 // 如果配置了使用只读缓存,就开启一个定时任务,定期将 readWriteCacheMap 的数据同步到 readOnlyCacheMap 中
42 // 默认间隔时间是 30 秒
43 timer.schedule(getCacheUpdateTask(),
44 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
45 + responseCacheUpdateIntervalMs),
46 responseCacheUpdateIntervalMs);
47 }
48
49 try {
50 Monitors.registerObject(this);
51 } catch (Throwable e) {
52 logger.warn(“Cannot register the JMX monitor for the InstanceRegistry”, e);
53 }
54 }
复制代码
generatePayload 方法:

View Code
3、Eureka Server 注册表多级缓存过期机制

这节来总结下 eureka server 注册表多级缓存的过期时机,其实前面都已经分析过了。

① 主动过期

分析服务注册时已经说过,服务注册完成后,调用了 invalidateCache 来失效缓存,进去可以看到就是将读写缓存 readWriteCacheMap 中的服务、所有服务、增量服务的缓存失效掉。

那这里就要注意了,如果服务注册、下线、故障之类的,这里只是失效了读写缓存,然后可能要间隔30秒才能同步到只读缓存 readOnlyCacheMap,那么其它客户端可能要隔30秒后才能感知到。

1 private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
2 // invalidate cache
3 responseCache.invalidate(appName, vipAddress, secureVipAddress);
4 }
缓存失效:

View Code
② 定时过期

读写缓存 readWriteCacheMap 在构建的时候,指定了一个自动过期的时间,默认值是180秒,所以往 readWriteCacheMap 中放入一个数据过后,等180秒过后,它就自动过期了。然后下次读取的时候发现缓存中没有这个 key,就会使用 CacheLoader 重新加载到这个缓存中。

这种定时过期机制就是每隔一段时间来同步注册表与缓存的数据。

③ 被动过期

初始化 ResponseCache 时,如果启用了只读缓存,就会创建一个定时任务(每隔30秒运行一次)来同步 readWriteCacheMap 与 readOnlyCacheMap 中的数据,对于 readOnlyCacheMap 来说这就是一种被动过期。

View Code
4、Eureka Client 定时拉取增量注册表

① 客户端注册表刷新定时任务

前面介绍 DiscoveryClient 初始化时,在初始化调度任务这一步,如果要抓取注册表,就会创建一个调度器每隔 30 秒执行一次 cacheRefreshTask,它对 CacheRefreshThread 做了封装,进去可以看到,它其实就是调用 refreshRegistry 方法刷新注册表。

复制代码
1 private void initScheduledTasks() {
2 if (clientConfig.shouldFetchRegistry()) {
3 // 抓取注册表的间隔时间,默认30秒
4 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
5 // 刷新缓存调度器延迟时间扩大倍数,在任务超时的时候,将扩大延迟时间
6 // 这在出现网络抖动、eureka-sever 不可用时,可以避免频繁发起无效的调度
7 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
8 // 注册表刷新的定时任务
9 cacheRefreshTask = new TimedSupervisorTask(
10 “cacheRefresh”,
11 scheduler,
12 cacheRefreshExecutor,
13 registryFetchIntervalSeconds,
14 TimeUnit.SECONDS,
15 expBackOffBound,
16 new CacheRefreshThread() // 刷新注册表的任务
17 );
18 // 30秒后开始调度刷新注册表的任务
19 scheduler.schedule(
20 cacheRefreshTask,
21 registryFetchIntervalSeconds, TimeUnit.SECONDS);
22 }
23 }
复制代码
refreshRegistry 方法:

View Code
refreshRegistry 里面又调用了 fetchRegistry 抓取注册表,fetchRegistry 在前面分析全量抓取注册表时已经展示过了。全量抓取注册表之后,本地 applications 不为空了,这时就会走 getAndUpdateDelta 增量更新的方法。

View Code
② 增量更新本地注册表

接着看 getAndUpdateDelta 增量更新方法:

首先调用 eureka server GET /apps/delta 接口获取增量的注册表
如果增量的注册表为空,就会调用 getAndStoreFullRegistry 方法全量抓取注册表
增量注册表不为空,就将其合并到本地注册表中
然后根据本地注册表的 applications 重新计算一个 hash 值
eureka server 返回的 delta 中包含一个 appsHashCode,代表了 eureka server 端的注册表的 hash 值,如果与本地计算的 hash 值不同,则说明本地注册表与server端注册表不一致,那就会全量拉取注册表更新到本地缓存中
可以看到,eureka 增量抓取的思路来更新本地缓存,并使用了 hash 值来保证服务端与本地的数据一致性。在分布式系统里,要进行数据同步,采用 hash 值比对的思想,这是值得学习的一个思路。

复制代码
1 private void getAndUpdateDelta(Applications applications) throws Throwable {
2 long currentUpdateGeneration = fetchRegistryGeneration.get();
3
4 Applications delta = null;
5 // 调用远程接口增量抓取:GET apps/delta
6 EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
7 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
8 delta = httpResponse.getEntity();
9 }
10
11 // 如果增量抓取的数据为空,就会进行一次全量抓取
12 if (delta == null) {
13 logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
14 + “Hence got the full registry.”);
15 getAndStoreFullRegistry();
16 }
17
18 else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
19 logger.debug(“Got delta update with apps hashcode {}”, delta.getAppsHashCode());
20 String reconcileHashCode = “”;
21 // 加锁更新本地注册表
22 if (fetchRegistryUpdateLock.tryLock()) {
23 try {
24 // 抓取到增量的注册表后,跟本地的注册表合并
25 updateDelta(delta);
26 // 注册表合并完成后,根据本地 applications 计算一个 hash 值
27 reconcileHashCode = getReconcileHashCode(applications);
28 } finally {
29 fetchRegistryUpdateLock.unlock();
30 }
31 } else {
32 logger.warn(“Cannot acquire update lock, aborting getAndUpdateDelta”);
33 }
34 // delta 中会返回 server 端注册表的 hash 值,如果和本地计算出来的 hash 值不一样,
35 // 说明本地注册表跟 server 端注册表不一样,就会从 server 全量拉取注册表更新到本地缓存
36 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
37 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
38 }
39 } else {
40 logger.warn(“Not updating application delta as another thread is updating it already”);
41 logger.debug(“Ignoring delta update with apps hashcode {}, as another thread is updating it already”, delta.getAppsHashCode());
42 }
43 }
复制代码
③ 增量注册表合并到本地

再来看下增量注册表合并到本地发方法 updateDelta,其实就是遍历返回来的服务实例,然后根据实例的 ActionType 分别处理,比如前面分析实例注册时 ActionType 就设置了 ADDED,后面分析实例下线时还可以看到设置了 ActionType 为 DELETED。

View Code
5、Eureka Server 返回增量注册表

① 抓取增量注册表的入口

从前分析知道,增量抓取注册表单接口为 GET/apps/delta,可以很容易找到位于 ApplicationsResource 下的 getContainerDifferential 就是抓取增量注册表的入口。

可以看到,跟抓取注册表类似,也是先构建一个缓存的Key,然后从多级缓存 ResponseCache 中获取。这里的key是 ALL_APPS_DELTA。

复制代码
1 @Path(“delta”)
2 @GET
3 public Response getContainerDifferential(
4 @PathParam(“version”) String version,
5 @HeaderParam(HEADER_ACCEPT) String acceptHeader,
6 @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
7 @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
8 @Context UriInfo uriInfo, @Nullable @QueryParam(“regions”) String regionsStr) {
9
10 Key cacheKey = new Key(Key.EntityType.Application,
11 // 增量服务:ALL_APPS_DELTA
12 ResponseCacheImpl.ALL_APPS_DELTA,
13 keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
14 );
15
16 final Response response;
17
18 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
19 response = Response.ok(responseCache.getGZIP(cacheKey))
20 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
21 .header(HEADER_CONTENT_TYPE, returnMediaType)
22 .build();
23 } else {
24 // 从多级缓存中获取增量注册表
25 response = Response.ok(responseCache.get(cacheKey)).build();
26 }
27
28 CurrentRequestVersion.remove();
29 return response;
30 }
复制代码
与全量抓取注册表,读取多级缓存的流程都是类似的,唯一的区别就是 Key 不同,全量抓取时是 ALL_APPS,增量抓取时 ALL_APPS_DELTA,区别就在于 readWriteCacheMap 加载数据到缓存中时走的逻辑不一样,可以再看看下面的 generatePayload 方法就知道了。

View Code
② 增量注册表的设计

之后会调用 registry.getApplicationDeltas() 获取增量注册表,进去可以发现,增量的注册表其实就是 recentlyChangedQueue 这个最近变更队列里的数据,通过遍历 recentlyChangedQueue 生成 Applications。

在返回 apps 之前,先获取了本地所有应用,并计算了一个 hash 值,然后设置到 apps 中。这就和前一节对应起来了,抓取增量注册表时,服务端会返回一个全量注册表的 hash 值,然后客户端将增量注册表合并到本地后,再根据本地的全量注册表计算一个 hash 值,然后将两个 hash 值做对比,如果不一致,说明服务端和客户端的数据是不一致的,这时客户端就会重新向服务端全量拉取注册表到本地。

复制代码
1 public Applications getApplicationDeltas() {
2 GET_ALL_CACHE_MISS_DELTA.increment();
3 Applications apps = new Applications();
4 apps.setVersion(responseCache.getVersionDelta().get());
5 Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
6 write.lock();
7 try {
8 // 最近变更队列 recentlyChangedQueue,这就是增量的注册表
9 // recentlyChangedQueue 只保留了最近3分钟有变化的实例,如实例上线、下线、故障剔除
10 Iterator iter = this.recentlyChangedQueue.iterator();
11 logger.debug(“The number of elements in the delta queue is : {}”,
12 this.recentlyChangedQueue.size());
13 while (iter.hasNext()) {
14 Lease lease = iter.next().getLeaseInfo();
15 InstanceInfo instanceInfo = lease.getHolder();
16 logger.debug(
17 “The instance id {} is found with status {} and actiontype {}”,
18 instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
19 Application app = applicationInstancesMap.get(instanceInfo
20 .getAppName());
21 if (app == null) {
22 app = new Application(instanceInfo.getAppName());
23 applicationInstancesMap.put(instanceInfo.getAppName(), app);
24 apps.addApplication(app);
25 }
26 app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
27 }
28
29 // 省略部分代码…
30
31 // 获取所有应用实例
32 Applications allApps = getApplications(!disableTransparentFallback);
33 // 根据所有应用实例计算一个 hash 值,并设置到要返回的 apps 中
34 apps.setAppsHashCode(allApps.getReconcileHashCode());
35 return apps;
36 } finally {
37 write.unlock();
38 }
39 }
复制代码
再来看看 recentlyChangedQueue 是如何设计来保存增量信息的。

再看看前面提到过的注册表初始化的构造方法,最后创建了一个每隔30秒执行一次的定时调度任务。这个任务会遍历 recentlyChangedQueue 这个队列,判断每个元素的最后更新时间是否超过了 180 秒,如果超过了,就会从队列中移除这个元素。超过 180 秒的实例变更信息,就会认为这些变更信息都已经同步到客户端了,因为客户端是每隔30秒拉取一次增量注册表的。因此客户端多次拉取增量注册表可能拉取到同样的变更信息,不过最终合并到本地都是一样的。

因此可以看出,eureka 利用 recentlyChangedQueue 这个最近变更队列保存了最近3分钟以内实例的变更信息,如新服务注册、服务下线等,然后客户端每次就是拉取这个变更队列。

复制代码
1 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
2 this.serverConfig = serverConfig;
3 this.clientConfig = clientConfig;
4 this.serverCodecs = serverCodecs;
5 // 最近下线的循环队列
6 this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
7 // 最近注册的循环队列
8 this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
9
10 // 最近一分钟续约的计数器
11 this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
12
13 // 一个定时调度任务,定时剔除最近改变队列中过期的实例
14 this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
15 // 调度任务延迟 30 秒开始执行
16 serverConfig.getDeltaRetentionTimerIntervalInMs(),
17 // 默认每隔 30 秒执行一次
18 serverConfig.getDeltaRetentionTimerIntervalInMs());
19 }
20
21 /////////////////////////////////////////
22
23 private TimerTask getDeltaRetentionTask() {
24 return new TimerTask() {
25
26 @Override
27 public void run() {
28 // 最近变更的队列
29 Iterator it = recentlyChangedQueue.iterator();
30 while (it.hasNext()) {
31 // 最近更新时间超过 180 秒就认为数据已经同步到各个客户端了,就从队列中移除
32 if (it.next().getLastUpdateTime() <
33 // retentionTimeInMSInDeltaQueue:delta队列数据保留时间,默认 180 秒
34 System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
35 it.remove();
36 } else {
37 break;
38 }
39 }
40 }
41
42 };
43 }
复制代码
7、Eureka 抓取注册表总体流程图

下面还是用一张图整体展示下服务抓取注册表的整理流程。

服务注册、服务下线、实例故障剔除都会将读写缓存 readWriteCacheMap 中对应的实例失效掉,然后加入到最近变更队列 recentlyChangedQueue 中,因此这三种情况下,增量抓取注册表的逻辑都是类似的。

回到顶部
七、服务续约
在分布式系统中,服务续约机制是非常重要的,这样能让中心系统(注册中心)知道客户端还存活着。接下来就来看看服务续约的机制。

1、Eureka Client 定时发送心跳

在初始化 DiscoveryClient 的调度任务时,下面这部分代码就是在创建定时发送心跳的任务,心跳每隔30秒发送一次。发送心跳的接口是 PUT /apps/{appName}/{instanceId}。

复制代码
1 private void initScheduledTasks() {
2 // 定时刷新本地缓存…
3
4 if (clientConfig.shouldRegisterWithEureka()) {
5 // 续约间隔时间,默认30秒
6 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
7 // 心跳调度器的延迟时间扩大倍数,默认10
8 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
9 logger.info("Starting heartbeat executor: " + “renew interval is: {}”, renewalIntervalInSecs);
10
11 // 心跳的定时任务
12 heartbeatTask = new TimedSupervisorTask(
13 “heartbeat”,
14 scheduler,
15 heartbeatExecutor,
16 renewalIntervalInSecs,
17 TimeUnit.SECONDS,
18 expBackOffBound,
19 new HeartbeatThread()
20 );
21 // 30秒后开始调度心跳的任务
22 scheduler.schedule(
23 heartbeatTask,
24 renewalIntervalInSecs, TimeUnit.SECONDS);
25
26 // 服务注册…
27 } else {
28 logger.info(“Not registering with Eureka server per configuration”);
29 }
30 }
31
32 //////////////////////////////////////////////
33
34 private class HeartbeatThread implements Runnable {
35 public void run() {
36 if (renew()) {
37 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
38 }
39 }
40 }
41
42 //////////////////////////////////////////////
43
44 boolean renew() {
45 EurekaHttpResponse httpResponse;
46 try {
47 // 发送心跳的接口:PUT /apps/{appName}/{instanceId}
48 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
49 logger.debug(PREFIX + “{} - Heartbeat status: {}”, appPathIdentifier, httpResponse.getStatusCode());
50 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
51 REREGISTER_COUNTER.increment();
52 logger.info(PREFIX + “{} - Re-registering apps/{}”, appPathIdentifier, instanceInfo.getAppName());
53 long timestamp = instanceInfo.setIsDirtyWithTime();
54 // 服务端未找到对应的实例,就重新注册
55 boolean success = register();
56 if (success) {
57 instanceInfo.unsetIsDirty(timestamp);
58 }
59 return success;
60 }
61 // 续约成功
62 return httpResponse.getStatusCode() == Status.OK.getStatusCode();
63 } catch (Throwable e) {
64 logger.error(PREFIX + “{} - was unable to send heartbeat!”, appPathIdentifier, e);
65 return false;
66 }
67 }
复制代码
2、Eureka Server 接收心跳续约

顺着 PUT /apps/{appName}/{instanceId} 找可以发现,服务端接收注册的入口在 InstanceResource 的 renewLease 方法中,它调用了注册表单 renew 方法进行服务续约。

View Code
进去可以看到,调用了父类的 renew 方法续约,然后会判断 isReplication ,如果是复制,说明是来自 eureka-server 集群中其它节点的同步请求,就复制到其它节点。复制到其它集群这块代码在前面已经提到过了,就不再展示。

复制代码
1 public boolean renew(final String appName, final String id, final boolean isReplication) {
2 // 调用父类(AbstractInstanceRegistry)的 renew 续约
3 if (super.renew(appName, id, isReplication)) {
4 // 续约完成后同步到集群其它节点
5 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
6 return true;
7 }
8 return false;
9 }
复制代码
接着看父类的 renew 续约方法:

首先根据服务名从注册表中取出租约信息
然后根据实例ID取出实例的租约信息
然后判断是否是覆盖实例状态
将最近一分钟续约次数计数器 renewsLastMin +1
最后调用实例租约对象的 renew 方法进行续约,其内部只是更新了租约的最后更新时间 lastUpdateTimestamp ,更新为当前时间+续约间隔时间。
复制代码
1 public boolean renew(String appName, String id, boolean isReplication) {
2 RENEW.increment(isReplication);
3 // 根据服务名从注册表取出租约信息
4 Map<String, Lease> gMap = registry.get(appName);
5 Lease leaseToRenew = null;
6 if (gMap != null) {
7 // 根据实例ID取出实例租约信息
8 leaseToRenew = gMap.get(id);
9 }
10 if (leaseToRenew == null) {
11 RENEW_NOT_FOUND.increment(isReplication);
12 logger.warn(“DS: Registry: lease doesn’t exist, registering resource: {} - {}”, appName, id);
13 return false;
14 } else {
15 InstanceInfo instanceInfo = leaseToRenew.getHolder();
16 if (instanceInfo != null) {
17 // touchASGCache(instanceInfo.getASGName());
18 InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
19 instanceInfo, leaseToRenew, isReplication);
20 if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
21 logger.info(“Instance status UNKNOWN possibly due to deleted override for instance {}”
22 + “; re-register required”, instanceInfo.getId());
23 RENEW_NOT_FOUND.increment(isReplication);
24 return false;
25 }
26 if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
27 logger.info(
28 "The instance status {} is different from overridden instance status {} for instance {}. "
29 + “Hence setting the status to overridden status”, instanceInfo.getStatus().name(),
30 overriddenInstanceStatus.name(),
31 instanceInfo.getId());
32 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
33
34 }
35 }
36 // 最近一分钟续约计数器+1
37 renewsLastMin.increment();
38 // 续约
39 leaseToRenew.renew();
40 return true;
41 }
42 }
43
44 ////////////////////////////////////////////////
45
46 public void renew() {
47 // 更新最后更新时间,在当前时间的基础上加了周期时间,默认90秒
48 lastUpdateTimestamp = System.currentTimeMillis() + duration;
49 }
复制代码
回到顶部
八、服务下线
1、Eureka Client 下线

eureka client 服务关闭停止时,会触发 DiscoveryClient 的 shutdown 关闭 eureka-client,我们就从 shutdown 方法来看看 eureka-client 的下线。

首先移除了注册的状态变更器,这个时候不再需要监听实例状态的变更了
然后关闭了一系列的调度任务,停止与 eureka-server 的交互,比如定时发送心跳。同时也释放了资源。
之后调用了 unregister 取消注册,其实就是调用了 server 端的 DELETE /apps/{appName}/{instanceId} 下线实例
最后再关闭了一些其它资源,如 EurekaTransport。
复制代码
1 @PreDestroy
2 @Override
3 public synchronized void shutdown() {
4 if (isShutdown.compareAndSet(false, true)) {
5 logger.info(“Shutting down DiscoveryClient …”);
6
7 // 移除状态变更监听器
8 if (statusChangeListener != null && applicationInfoManager != null) {
9 applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
10 }
11
12 // 停止调度任务,释放资源:
13 // instanceInfoReplicator、heartbeatExecutor、cacheRefreshExecutor
14 // scheduler、cacheRefreshTask、heartbeatTask
15 cancelScheduledTasks();
16
17 // If APPINFO was registered
18 if (applicationInfoManager != null
19 && clientConfig.shouldRegisterWithEureka()
20 && clientConfig.shouldUnregisterOnShutdown()) {
21 applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
22 // 调用 eureka-server 的下线接口下线实例
23 unregister();
24 }
25
26 // 继续释放资源
27 if (eurekaTransport != null) {
28 eurekaTransport.shutdown();
29 }
30 heartbeatStalenessMonitor.shutdown();
31 registryStalenessMonitor.shutdown();
32
33 Monitors.unregisterObject(this);
34
35 logger.info(“Completed shut down of DiscoveryClient”);
36 }
37 }
38
39 void unregister() {
40 // It can be null if shouldRegisterWithEureka == false
41 if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
42 try {
43 logger.info(“Unregistering …”);
44 // 取消注册:DELETE /apps/{appName}/{instanceId}
45 EurekaHttpResponse httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
46 logger.info(PREFIX + “{} - deregister status: {}”, appPathIdentifier, httpResponse.getStatusCode());
47 } catch (Exception e) {
48 logger.error(PREFIX + “{} - de-registration failed{}”, appPathIdentifier, e.getMessage(), e);
49 }
50 }
51 }
复制代码
2、Eureka Server 服务下线

顺着 DELETE /apps/{appName}/{instanceId} 接口可以找到 InstanceResouce 的 cancelLease 方法就是客户端下线的入口。

进入注册的 cancel 方法,可以看到与前面的一些接口是类似的,先调用服务的 cancel 方法下线实例,然后调用 replicateToPeers 复制到集群中其它节点。然后 cancel 方法其实是调用的 internalCancel 方法。

复制代码
1 @DELETE
2 public Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
3 try {
4 // 下线实例
5 boolean isSuccess = registry.cancel(app.getName(), id,
6 “true”.equals(isReplication));
7
8 if (isSuccess) {
9 logger.debug(“Found (Cancel): {} - {}”, app.getName(), id);
10 return Response.ok().build();
11 } else {
12 logger.info(“Not Found (Cancel): {} - {}”, app.getName(), id);
13 return Response.status(Status.NOT_FOUND).build();
14 }
15 } catch (Throwable e) {
16 logger.error(“Error (cancel): {} - {}”, app.getName(), id, e);
17 return Response.serverError().build();
18 }
19 }
20
21 //////////////////////////////////////////
22
23 public boolean cancel(final String appName, final String id,
24 final boolean isReplication) {
25 if (super.cancel(appName, id, isReplication)) {
26 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
27
28 return true;
29 }
30 return false;
31 }
32
33 //////////////////////////////////////////
34
35 public boolean cancel(String appName, String id, boolean isReplication) {
36 return internalCancel(appName, id, isReplication);
37 }
复制代码
再来看下 internalCancel 方法:

首先根据服务名从注册表取出服务所有实例的租约信息,再根据实例ID移除实例租约信息
将移除的实例加入到最近下线的一个循环队列 recentCanceledQueue
下线实例,注意这里设置了实例的下线时间 evictionTimestamp 为当前时间
然后设置实例的 ActionType 为 DELETED,再将下线的实例加入最近变更的队列 recentlyChangedQueue
之后失效掉缓存 readWriteCacheMap,服务实例变更了就必须清理缓存。不过 readWriteCacheMap 可能要30秒才会同步到 readOnlyCacheMap。
最后将期望续约的客户端数量-1,然后更新了每分钟续约次数阈值
复制代码
1 protected boolean internalCancel(String appName, String id, boolean isReplication) {
2 read.lock();
3 try {
4 CANCEL.increment(isReplication);
5 // 根据服务名称取出服务租约信息
6 Map<String, Lease> gMap = registry.get(appName);
7 Lease leaseToCancel = null;
8 if (gMap != null) {
9 // 根据实例ID移除实例租约信息
10 leaseToCancel = gMap.remove(id);
11 }
12 // 将移除的实例ID加入到最近下线的队列中
13 recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + “(” + id + “)”));
14 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
15 if (instanceStatus != null) {
16 logger.debug(“Removed instance id {} from the overridden map which has value {}”, id, instanceStatus.name());
17 }
18 if (leaseToCancel == null) {
19 CANCEL_NOT_FOUND.increment(isReplication);
20 logger.warn(“DS: Registry: cancel failed because Lease is not registered for: {}/{}”, appName, id);
21 return false;
22 } else {
23 // 下线实例,设置了实例的下线时间 evictionTimestamp 为当前时间戳
24 leaseToCancel.cancel();
25 InstanceInfo instanceInfo = leaseToCancel.getHolder();
26 String vip = null;
27 String svip = null;
28 if (instanceInfo != null) {
29 // 设置实例 ActionType 为 DELETED
30 instanceInfo.setActionType(ActionType.DELETED);
31 // 加入最近变更队列中
32 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
33 instanceInfo.setLastUpdatedTimestamp();
34 vip = instanceInfo.getVIPAddress();
35 svip = instanceInfo.getSecureVipAddress();
36 }
37 // 失效缓存
38 invalidateCache(appName, vip, svip);
39 logger.info(“Cancelled instance {}/{} (replication={})”, appName, id, isReplication);
40 }
41 } finally {
42 read.unlock();
43 }
44
45 synchronized (lock) {
46 // 期望续约的客户端数量 - 1
47 if (this.expectedNumberOfClientsSendingRenews > 0) {
48 // Since the client wants to cancel it, reduce the number of clients to send renews.
49 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
50 // 更新每分钟续约次数的阈值
51 updateRenewsPerMinThreshold();
52 }
53 }
54
55 return true;
56 }
复制代码
回到顶部
九、服务故障
服务正常停止时,会触发 DiscoveryClient 的 shutdown 方法关闭 eureka-client,并向注册中心发送下线的通知。但如果客户端宕机或非正常关机,注册中心就无法接收到客户端下线的通知了,这时注册中心就会有一个定时任务,根据心跳来判断客户端实例是否故障下线了,然后摘除故障的实例。

1、摘除实例的定时任务初始化

在 EurekaBootStrap 初始化的最后几步中,调用了注册表的 openForTraffic 做一些最后的设置,其中最后一步调用了 super.postInit 方法做最后的初始化,里面就创建了定时摘除过期实例的调度任务。

1 registry.openForTraffic(applicationInfoManager, registryCount);
View Code
postInit:

首先是开启了最近一分钟续约次数的计数器
然后创建了定时摘除过期实例的调度任务,调度任务每隔60秒执行一次
复制代码
1 protected void postInit() {
2 // 启动 统计最近一分钟续约次数的计数器
3 renewsLastMin.start();
4 if (evictionTaskRef.get() != null) {
5 evictionTaskRef.get().cancel();
6 }
7 // 定时剔除任务
8 evictionTaskRef.set(new EvictionTask());
9 evictionTimer.schedule(evictionTaskRef.get(),
10 serverConfig.getEvictionIntervalTimerInMs(),
11 // 每隔60秒执行一次
12 serverConfig.getEvictionIntervalTimerInMs());
13 }
复制代码
2、定时摘除过期的实例

① 摘除实例的定时任务

可以看到,每次运行摘除实例的任务时,会先获取一个补偿时间,因为两次 EvictionTask 执行的间隔时间可能超过了设置的60秒,比如 GC 导致的停顿或本地时间漂移导致计时不准确等。然后就调用了 evict 方法摘除实例。

在计算时间差的场景中,这种补偿时间的思路是值得学习的,要考虑到时间差的不准确性。

复制代码
1 class EvictionTask extends TimerTask {
2
3 private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
4
5 @Override
6 public void run() {
7 try {
8 // 获取补偿时间,因为两次 EvictionTask 执行的间隔时间可能超过了设置的60秒,比如 GC 导致的停顿或本地时间漂移导致计时不准确
9 long compensationTimeMs = getCompensationTimeMs();
10 logger.info(“Running the evict task with compensationTime {}ms”, compensationTimeMs);
11 evict(compensationTimeMs);
12 } catch (Throwable e) {
13 logger.error(“Could not run the evict task”, e);
14 }
15 }
16
17 long getCompensationTimeMs() {
18 long currNanos = getCurrentTimeNano();
19 long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
20 if (lastNanos == 0L) {
21 return 0L;
22 }
23 // 两次任务运行的间隔时间
24 long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
25 // 补偿时间 = 任务运行间隔时间 - 剔除任务的间隔时间(默认60秒)
26 long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
27 return compensationTime <= 0L ? 0L : compensationTime;
28 }
29
30 long getCurrentTimeNano() { // for testing
31 return System.nanoTime();
32 }
33 }
复制代码
② 摘除实例

摘除实例的过程如下:

首先判断是否启用了租约过期的机制(主要就是自我保护机制,下一章节再说)。
遍历注册表,判断实例是否过期,将过期的实例加入集合列表中。
计算摘除实例的数量限制,主要就是出于自我保护机制,避免一次摘除过多实例。
然后就是从要摘除的集合中随机选择限制数量内的过期实例来摘除掉。
摘除实例其实就是调用了实例下线的方法 internalCancel,主要就是从注册表中移除实例、加入最近变更的队列、失效缓存等,具体可以回看服务下线那节。
复制代码
1 public void evict(long additionalLeaseMs) {
2 logger.debug(“Running the evict task”);
3
4 // 是否启用了租约过期
5 if (!isLeaseExpirationEnabled()) {
6 logger.debug(“DS: lease expiration is currently disabled.”);
7 return;
8 }
9
10 // We collect first all expired items, to evict them in random order. For large eviction sets,
11 // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
12 // the impact should be evenly distributed across all applications.
13 List<Lease> expiredLeases = new ArrayList<>();
14 for (Entry<String, Map<String, Lease>> groupEntry : registry.entrySet()) {
15 Map<String, Lease> leaseMap = groupEntry.getValue();
16 if (leaseMap != null) {
17 for (Entry<String, Lease> leaseEntry : leaseMap.entrySet()) {
18 Lease lease = leaseEntry.getValue();
19 // 判断实例租约是否过期
20 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
21 // 加入到过期的集合列表中
22 expiredLeases.add(lease);
23 }
24 }
25 }
26 }
27
28 // 先获取注册表已注册的实例数量
29 int registrySize = (int) getLocalRegistrySize();
30 // 注册表数量保留的阈值:已注册的实例数 * 续约百分比阈值(默认0.85)
31 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
32 // 剔除的数量限制,也就是说一次最多只能剔除 15% 的实例
33 int evictionLimit = registrySize - registrySizeThreshold;
34
35 // 得到最小的剔除数量
36 int toEvict = Math.min(expiredLeases.size(), evictionLimit);
37 if (toEvict > 0) {
38 logger.info(“Evicting {} items (expired={}, evictionLimit={})”, toEvict, expiredLeases.size(), evictionLimit);
39
40 Random random = new Random(System.currentTimeMillis());
41 for (int i = 0; i < toEvict; i++) {
42 // 根据要剔除的数量从 expiredLeases 中随机剔除 toEvict 个过期实例
43 int next = i + random.nextInt(expiredLeases.size() - i);
44 Collections.swap(expiredLeases, i, next);
45 Lease lease = expiredLeases.get(i);
46
47 String appName = lease.getHolder().getAppName();
48 // 实例ID
49 String id = lease.getHolder().getId();
50 EXPIRED.increment();
51 logger.warn(“DS: Registry: expired lease for {}/{}”, appName, id);
52 // 调用下线的方法
53 internalCancel(appName, id, false);
54 }
55 }
56 }
复制代码
③ 分批次摘取实例

可以看到,过期的实例并不是一次性摘除的,而是计算了一个阈值 toEvict,一次只随机摘除 toEvict 个过期实例,采用了分批摘取+随机摘取的机制。

比如注册表一共有20个实例,那么最多可以摘除的实例数 toEvict = 20 - 20 * 0.85 = 3 个,也就是说就算有5个实例过期了,那这一次也只能随机摘除其中3个,另外两个要等到下一次执行摘除任务时再摘除。

这种分批摘取机制+随机摘取机制可能会导致有些过期实例要过很久才能下线,尤其是在开发环境这种频繁启动、停止服务的场景中。

3、如何判断实例是否过期

从上面可以看到,eureka 调用了 lease.isExpired(additionalLeaseMs) 来判断实例是否过期。进入 isExpired 这个方法可以看到,如果已经设置了摘除时间,或者 当前时间 > (实例最后更新时间 + 续约周期(90秒) + 补偿时间),就认为实例已经过期了,说明实例已经超过一个周期没有续约了,就认为这个客户端实例发生了故障,无法续约,要被摘除掉。

复制代码
1 /**
2 * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
3 *
4 * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
5 * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
6 * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
7 * not be fixed.
8 *
9 * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
10 */
11 public boolean isExpired(long additionalLeaseMs) {
12 // 已经设置过剔除时间,或者 当前时间 > (实例最后更新时间 + 续约周期(90秒) + 补偿时间)
13 return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
14 }
复制代码
这里其实要注意的是另外一个问题,可以看看 isExpired 的注释,eureka 说这其实是一个bug,但不打算修复了,因为它的 duration 其实是被加了两次的,下面来看看怎么回事。

先看下 lastUpdateTimestamp 是如何更新的,在客户端续约的时候会更新 lastUpdateTimestamp,将其设置为 当前时间 + duration 间隔周期(默认90秒),

1 public void renew() {
2 // 更新最后更新时间,在当前时间的基础上加了一个周期间隔时间,默认90秒
3 lastUpdateTimestamp = System.currentTimeMillis() + duration;
4 }
这个 duration 在注册的时候也有设置,我们通过这个来看看它的含义是什么。可以看到,如果客户端没有配置 durationInSecs,就会设置为默认的 90 秒。

从 getDurationInSecs 的注释可以看出,这个 duration 的意思是等待客户端多久没有续约之后就将其剔除,默认为 90 秒。比如客户端每隔 30 秒续约一次,那可能超过3次没有续约之后,就会认为客户端实例故障了,就要将其摘除掉。

复制代码
1 public void register(final InstanceInfo info, final boolean isReplication) {
2 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
3 // 如果实例中没有周期的配置,就设置为默认的 90 秒
4 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
5 leaseDuration = info.getLeaseInfo().getDurationInSecs();
6 }
7 // 注册实例
8 super.register(info, leaseDuration, isReplication);
9 // 复制到集群其它 server 节点
10 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
11 }
12
13 //////////////////////////////////////////////
14
15 /**
16 * Returns client specified setting for eviction (e.g. how long to wait w/o
17 * renewal event)
18 *
19 * @return time in milliseconds since epoch.
20 */
21 public int getDurationInSecs() {
22 return durationInSecs;
23 }
复制代码
但实际上并不是90秒后摘除实例,可以看到 isExpired 里面将 lastUpdateTimestamp 又加了一个 duration,也就是 180 秒了。也就是说客户端实例超过 180 秒未续约才被认为是故障了,然后要将其摘除。

isExpired 的注释也说了,续约的方法 renew() 错误的计算了 lastUpdateTimestamp,实际的过期周期是 2 * duration,但是 eureka 并不打算修复这个bug,因为它的影响范围很小。

所以这里得出一个结论,客户端关闭了(非正常下线),注册表中的实例并不是90秒后就摘除了,至少是 180 秒后才会被摘除。

回到顶部
十、自我保护机制
如果网段偶尔抖动或暂时不可用,就无法接收到客户端的续约,因此 eureka server 为了保证可用性,就会去判断最近一分钟收到的心跳次数是否小于指定的阈值,是的就会触发自我保护机制,关闭租约失效剔除,不再摘除实例,从而保护注册信息。

1、摘除实例时的自我保护机制

摘除实例的 evict 方法调用了 isLeaseExpirationEnabled 这个方法判断是否触发自我保护机制,如果返回 false,就不会摘除实例了。

先看看 isLeaseExpirationEnabled 这个方法:

首先,如果没有启用自我保护机制,就返回 true,那就可以摘除实例
如果启用了自我保护机制(默认启用),就判断每分钟续约阈值 > 0 且 最近一分钟续约次数 > 每分钟续约阈值 就是启用了租约过期
复制代码
1 public boolean isLeaseExpirationEnabled() {
2 // 先判断是否启用了自我保护机制
3 if (!isSelfPreservationModeEnabled()) {
4 // The self preservation mode is disabled, hence allowing the instances to expire.
5 return true;
6 }
7 // 每分钟续约阈值大于0, 且 最近一分钟续约次数 大于 每分钟续约阈值
8 return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
9 }
10
11 public boolean isSelfPreservationModeEnabled() {
12 return serverConfig.shouldEnableSelfPreservation();
13 }
复制代码
这个每分钟续约阈值 numberOfRenewsPerMinThreshold 在前面很多地方都看到过了,服务注册、下线、openForTraffic、以及有个定时任务每隔15分钟都会调用下面这个方法来更新 numberOfRenewsPerMinThreshold。

复制代码
1 protected void updateRenewsPerMinThreshold() {
2 // 每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约阈值
3 // 例如,一共注册了 10 个实例,那么期望续约的客户端数量为 10,间隔时间默认为 30秒,就是每个客户端应该每30秒发送一次心跳,续约百分比默认为 0.85
4 // 每分钟续约次数阈值 = 10 * (60.0 / 30) * 0.85 = 17,也就是说每分钟至少要接收到 17 此续约请求
5 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
6 * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
7 * serverConfig.getRenewalPercentThreshold());
8 }
复制代码
expectedNumberOfClientsSendingRenews 在实例注册的时候会 + 1,在实例下线的时候会 - 1,其代表的就是期望续约的客户端数量。

复制代码
1 /////////////// 实例注册 ///////////////
2 synchronized (lock) {
3 if (this.expectedNumberOfClientsSendingRenews > 0) {
4 // Since the client wants to register it, increase the number of clients sending renews
5 // 期望续约的客户端数量 + 1
6 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
7 // 更新每分钟续约请求次数的阈值,这个阈值在后面很多地方都会用到
8 updateRenewsPerMinThreshold();
9 }
10 }
11
12
13 /////////////// 实例下线(下线、故障摘除) ///////////////
14 synchronized (lock) {
15 // 期望续约的客户端数量 - 1
16 if (this.expectedNumberOfClientsSendingRenews > 0) {
17 // Since the client wants to cancel it, reduce the number of clients to send renews.
18 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
19 // 更新每分钟续约次数的阈值
20 updateRenewsPerMinThreshold();
21 }
22 }
复制代码
而最近一分钟续约次数计数器 renewsLastMin 在每个客户端续约的时候就会+1,可以回看下 renew 方法,最后调用了 renewsLastMin.increment() 增加一次续约次数。而 renewsLastMin.getCount() 返回的是上一分钟总的续约次数。

1 public long getNumOfRenewsInLastMin() {
2 return renewsLastMin.getCount();
3 }
根据以上代码举个例子来看看实例故障时的自我保护机制:

比如注册了20个实例,实例默认发送心跳续约的间隔时间为30秒,续约的阈值为 0.85,并且开启了自我保护机制。
那么期望续约的客户端数量 expectedNumberOfClientsSendingRenews = 20,每分钟发送心跳的阈值 numberOfRenewsPerMinThreshold = 20 * (60 / 30 )* 0.85 = 34。
正常来说20个实例每分钟发送心跳的次数 renewsLastMin = 20 * (60 / 30)= 40 次。
那么 numberOfRenewsPerMinThreshold(34) > 0 && renewsLastMin(40)> numberOfRenewsPerMinThreshold(34)就是满足的,就允许摘除故障的实例。
那如果有 3 个实例上一分钟没有发送续约了,这个时候 renewsLastMin = 17 * (60 / 30)= 34 次,而 numberOfRenewsPerMinThreshold 还是不变,因为注册表的实例并未移除,因此这时条件就不满足了,就算实例真的故障了,也不能摘除实例了。
这就是 eureka-server 的自我保护机制,他认为如果短时间内有过的的实例未发送心跳(超过15%),它会认为是自己网络故障导致客户端不能发送心跳,就进入自我保护机制,避免误摘除实例。

2、自我保护机制导致实例未下线的情况

在开发环境中,因为会频繁重启服务,会发现有些服务已经是下线状态了(DOWN),但服务实例一直未被摘除,就是因为 eureka-server 的自我保护机制导致的,下面来看下。

① 启用自我保护机制的情况

首先 eureka-server 做了如下配置,启用注册中心:

1 eureka:
2 server:
3 # 是否启用自我保护机制
4 enable-self-preservation: true
启动几个客户端实例:

然后快速将 demo-consumer 停止掉(如果正常关闭,会调用 cancel 下线实例),这时就会看到 demo-consumer 已经DOWN了,但是实例一直未被移除。

可以看到,上一分钟续约的次数为 4 次,期望每分钟续约次数为6次,因为不满足判断的条件,所以就触发了自我保护机制,导致一直无法摘除实例。

注意期望续约的客户端数量为4,而实际注册的客户端实例是3个,这是因为 springcloud 在调用 openForTraffic 设置了初始值为 1。

② 关闭自我保护机制的情况

配置如下,关闭自我保护机制:

1 eureka:
2 server:
3 # 是否启用自我保护机制
4 enable-self-preservation: false
这时注册中心控制台会提示我们关闭了自我保护机制:

同样的操作,快速停掉实例,发现实例还是未被摘除:

那其实是因为实例要180秒后才会被认为是过期的,所以等3分钟以后,实例就会下线了。

1 public boolean isExpired(long additionalLeaseMs) {
2 return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
3 }
③ 快速关闭多个实例

这次同时关闭2个实例来看看,在2分钟之后,发现只有一个实例下线了,这因为eureka-server一次只会摘除15%的实例。

④ DOWN 是怎么来的

那么DOWN这个状态是怎么来的呢?由于我本地是用IDEA启动的客户端实例,其实在关闭的时候,会触发状态变更监听器,然后就会触发一次注册的调用,注册的状态是 DOWN,因此实例状态马上就变为 DOWN 了。

如果直接 kill 这个进程,就不会触发状态变更监听器了,注册中心的实例就不会变为 DOWN 了,但是实例已经下线变为不可用的状态了。

⑤ 实例快速下线

经过前面的测试可以总结出来,要想实例快速下线,可以调整如下一些参数。

eureka-server 配置:

复制代码
1 eureka:
2 server:
3 # 是否启用自我保护机制
4 enable-self-preservation: false
5 # 每分钟续约阈值
6 renewal-percent-threshold: 0
7 # 摘除实例的定时任务的间隔时间
8 eviction-interval-timer-in-ms: 10000
复制代码
eureka-client 配置:

1 eureka:
2 instance:
3 # 判断实例多久未发送心跳就判定为过期
4 lease-expiration-duration-in-seconds: 60
3、最近一分钟计数器的设计

来看下最近一分钟续约次数计数器 renewsLastMin 是如何统计上一分钟的续约次数的,renewsLastMin 的类型是 MeasuredRate,这个类的设计也是值得学习的一个点。

MeasuredRate 利用两个桶来计数,一个保存上一间隔时间的计数,一个保存当前这一间隔时间的计数,然后使用定时任务每隔一定间隔时间就将当前这个桶的计数替换到上一个桶里。然后增加计数的时候增加当前桶,获取数量的时候从上一个桶里获取,就实现了获取上一个间隔时间的计数。

复制代码
1 public class MeasuredRate {
2 private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
3 // 利用了两个桶来计数,一个是上一分钟,一个是当前这一分钟
4 private final AtomicLong lastBucket = new AtomicLong(0);
5 private final AtomicLong currentBucket = new AtomicLong(0);
6
7 private final long sampleInterval;
8 private final Timer timer;
9
10 private volatile boolean isActive;
11
12 /**
13 * @param sampleInterval in milliseconds
14 /
15 public MeasuredRate(long sampleInterval) {
16 // 间隔时间
17 this.sampleInterval = sampleInterval;
18 // 定时器
19 this.timer = new Timer(“Eureka-MeasureRateTimer”, true);
20 this.isActive = false;
21 }
22
23 public synchronized void start() {
24 if (!isActive) {
25 timer.schedule(new TimerTask() {
26 @Override
27 public void run() {
28 try {
29 // 每分钟执行一次,将当前这一分钟的次数设置到上一分钟的桶里
30 lastBucket.set(currentBucket.getAndSet(0));
31 } catch (Throwable e) {
32 logger.error(“Cannot reset the Measured Rate”, e);
33 }
34 }
35 }, sampleInterval, sampleInterval);
36
37 isActive = true;
38 }
39 }
40
41 public synchronized void stop() {
42 if (isActive) {
43 timer.cancel();
44 isActive = false;
45 }
46 }
47
48 /
*
49 * Returns the count in the last sample interval.
50 /
51 public long getCount() {
52 // 获取计数时是获取的上一分钟这个桶的计数
53 return lastBucket.get();
54 }
55
56 /
*
57 * Increments the count in the current sample interval.
58 */
59 public void increment() {
60 // 增加计数的时候是增加的当前这个桶的计数
61 currentBucket.incrementAndGet();
62 }
63 }
复制代码
4、服务故障摘除和自我保护机制图

下面用一张图来总结下服务故障摘除和自我保护机制。

作者:bojiangzhou
出处:http://www.cnblogs.com/chiangchou/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
分类: springcloud
标签: eureka, springcloud, 注册表
好文要顶 关注我 收藏该文
bojiangzhou
关注 - 7
粉丝 - 659
+加关注
0 0
« 上一篇: SpringCloud 源码系列(1)—— 注册中心 Eureka(上)
posted on 2020-12-03 01:46 bojiangzhou 阅读(72) 评论(0) 编辑 收藏
刷新评论刷新页面返回顶部
登录后才能发表评论,立即 登录 或 注册, 访问 网站首页
【推荐】News: 大型组态、工控、仿真、CADGIS 50万行VC++源码免费下载
【推荐】博客园 & 陌上花开HIMMR 给单身的程序员小哥哥助力脱单啦~
【推荐】了不起的开发者,挡不住的华为,园子里的品牌专区
【推荐】未知数的距离,毫秒间的传递,声网与你实时互动
【福利】AWS携手博客园为开发者送免费套餐与抵扣券
【推荐】阿里云爆款特惠,精选爆款产品低至0.95折
相关博文:
· SpringCloud一(eureka)
· springcloud-Eureka组件
· SpringCloud组件Eureka
· springcloud的eureka服务
· SpringCloud微服务的Eureka
» 更多推荐…
最新 IT 新闻:
· 谷歌开始允许员工在公司举行户外会议 回应称为改善协作
· 极兔为何“杀不死”?
· 科学家发明全新胶水 升华转换为气体就能释放粘合力
· 2021年第一个假期!元旦火车票今日开售
· 骁龙888的八个8:都在这一张图里了!
» 更多新闻…
公告

昵称: bojiangzhou
园龄: 4年11个月
粉丝: 659
关注: 7
+加关注
搜索

找找看
积分与排名
积分 - 124868
排名 - 6406
随笔分类 (52)

dbutils(1)
devops(1)
easyui(2)
ExtJs(1)
fastdfs(1)
hibernate(1)
IDE(1)
java(9)
JavaScript(6)
jvm(4)
mybatis(1)
mysql(7)
redis(3)
spring(1)
spring boot(2)
springcloud(2)
并发编程(1)
环境搭建(1)
设计模式(1)
项目(4)
项目管理(2)
最新评论

  1. Re:Kubernetes 深入学习(一) —— 入门和集群安装部署
    kube-flannel.yml 文件找不到,flannel启动一直失败

–把酒问青天92
2. Re:学生成绩管理系统/学生信息管理系统
为啥验证码显示不出来

–3070674739
3. Re:基于 Javassist 和 Javaagent 实现动态切面
内容太全,看的快睡着了。要是能拆成系列文章就好了

–安静的李先生
4. Re:JVM性能调优(4) —— 性能调优工具
大佬牛逼!

–Asimov001
5. Re:JVM性能调优(3) —— 内存分配和垃圾回收调优
非常的感谢,JVM性能调优这块讲得很详细

–太古里车神
阅读排行榜

  1. jQuery jsonp跨域请求(254543)
  2. 用FastDFS一步步搭建文件管理系统(203685)
  3. 在Intellij IDEA中使用Debug(192521)
  4. 学生成绩管理系统/学生信息管理系统(145731)
  5. 基于SpringBoot搭建应用开发框架(一) —— 基础架构(72630)
  6. easyui-textbox 和 easyui-validatebox 设置值和获取值(62453)
  7. 毕业设计之进销存管理系统 —— 一步步搭建自己的框架及系统(39945)
  8. hibernate基于注解的维护权反转:@OneToMany(mappedBy=)(37877)
  9. MySql学习(三) —— 子查询(where、from、exists) 及 连接查询(left join、right join、inner join、union join)(35315)
  10. JavaScript 面向对象(一) —— 基础篇(32772)
    评论排行榜
  11. 学生成绩管理系统/学生信息管理系统(82)
  12. 毕业设计之进销存管理系统 —— 一步步搭建自己的框架及系统(66)
  13. 基于SpringBoot搭建应用开发框架(一) —— 基础架构(62)
  14. 用FastDFS一步步搭建文件管理系统(45)
  15. 在Intellij IDEA中使用Debug(35)
  16. jQuery jsonp跨域请求(31)
  17. 基于SpringBoot搭建应用开发框架(二) —— 登录认证(15)
  18. 跟着视频做的SSH项目总结(11)
  19. 手机进销存系统/供应链管理系统(9)
  20. 基于 Javassist 和 Javaagent 实现动态切面(8)
    推荐排行榜
  21. 在Intellij IDEA中使用Debug(99)
  22. jQuery jsonp跨域请求(98)
  23. 基于SpringBoot搭建应用开发框架(一) —— 基础架构(61)
  24. 用FastDFS一步步搭建文件管理系统(61)
  25. 基于SpringBoot搭建应用开发框架(二) —— 登录认证(58)
    Copyright © 2020 bojiangzhou
    Powered by .NET 5.0.0 on Kubernetes
Logo

云原生社区为您提供最前沿的新闻资讯和知识内容

更多推荐