摘要: 原创出处 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本




1. 概述

本文主要分享 Eureka-Server 集群同步注册信息

Eureka-Server 集群如下图:

  • Eureka-Server 集群不区分主从节点或者 Primary & Secondary 节点,所有节点相同角色( 也就是没有角色 ),完全对等
  • Eureka-Client 可以向任意 Eureka-Client 发起任意读写操作,Eureka-Server 将操作复制到另外的 Eureka-Server 以达到最终一致性。注意,Eureka-Server 是选择了 AP 的组件。

Eureka-Server 可以使用直接配置所有节点的服务地址,或者基于 DNS 配置。推荐阅读:《Spring Cloud构建微服务架构(六)高可用服务注册中心》

本文主要类在 com.netflix.eureka.cluster 包下。

OK,让我们开始愉快的遨游在代码的海洋。

推荐 Spring Cloud 书籍

ps :注意,本文提到的同步,准确来说是复制( Replication )

2. 集群节点初始化与更新

com.netflix.eureka.cluster.PeerEurekaNodes ,Eureka-Server 集群节点集合 。构造方法如下 :


      
      
public class PeerEurekaNodes {
private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
/
* 应用实例注册表
/
protected final PeerAwareInstanceRegistry registry;
/
Eureka-Server 配置
/
protected final EurekaServerConfig serverConfig;
/
Eureka-Client 配置
/
protected final EurekaClientConfig clientConfig;
/
Eureka-Server 编解码
/
protected final ServerCodecs serverCodecs;
/
应用实例信息管理器
/
private final ApplicationInfoManager applicationInfoManager;
/
Eureka-Server 集群节点数组
/
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
/
Eureka-Server 服务地址数组
/
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
/
定时任务服务
*/
private ScheduledExecutorService taskExecutor;
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
}
  • peerEurekaNodes, peerEurekaNodeUrls, taskExecutor 属性,在构造方法中未设置和初始化,而是在 PeerEurekaNodes#start() 方法,设置和初始化,下文我们会解析这个方法。
  • Eureka-Server 在初始化时,调用 EurekaBootStrap#getPeerEurekaNodes(...) 方法,创建 PeerEurekaNodes ,点击 链接 查看该方法的实现。

2.1 集群节点启动

调用 PeerEurekaNodes#start() 方法,集群节点启动,主要完成两个逻辑:

  • 初始化集群节点信息
  • 初始化固定周期( 默认:10 分钟,可配置 )更新集群节点信息的任务

代码如下:

      
      
1: public void start() {
2: // 创建 定时任务服务
3: taskExecutor = Executors.newSingleThreadScheduledExecutor(
4: new ThreadFactory() {
5: @Override
6: public Thread newThread(Runnable r) {
7: Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
8: thread.setDaemon( true);
9: return thread;
10: }
11: }
12: );
13: try {
14: // 初始化 集群节点信息
15: updatePeerEurekaNodes(resolvePeerUrls());
16: // 初始化 初始化固定周期更新集群节点信息的任务
17: Runnable peersUpdateTask = new Runnable() {
18: @Override
19: public void run() {
20: try {
21: updatePeerEurekaNodes(resolvePeerUrls());
22: } catch (Throwable e) {
23: logger.error( "Cannot update the replica Nodes", e);
24: }
25:
26: }
27: };
28: taskExecutor.scheduleWithFixedDelay(
29: peersUpdateTask,
30: serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
31: serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
32: TimeUnit.MILLISECONDS
33: );
34: } catch (Exception e) {
35: throw new IllegalStateException(e);
36: }
37: // 打印 集群节点信息
38: for (PeerEurekaNode node : peerEurekaNodes) {
39: logger.info( "Replica node URL: " + node.getServiceUrl());
40: }
41: }

  • 第 15 行 && 第 21 行 :调用 #updatePeerEurekaNodes() 方法,更新集群节点信息。

2.2 更新集群节点信息

调用 #resolvePeerUrls() 方法,获得 Eureka-Server 集群服务地址数组,代码如下:


      
      
1: protected List<String> resolvePeerUrls() {
2: // 获得 Eureka-Server 集群服务地址数组
3: InstanceInfo myInfo = applicationInfoManager.getInfo();
4: String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
5: List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
6:
7: // 移除自己(避免向自己同步)
8: int idx = 0;
9: while (idx < replicaUrls.size()) {
10: if (isThisMyUrl(replicaUrls.get(idx))) {
11: replicaUrls.remove(idx);
12: } else {
13: idx++;
14: }
15: }
16: return replicaUrls;
17: }

调用 #updatePeerEurekaNodes() 方法,更新集群节点信息,主要完成两部分逻辑:

  • 添加新增的集群节点
  • 关闭删除的集群节点

代码如下:

      
      
1: protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
2: if (newPeerUrls.isEmpty()) {
3: logger.warn( "The replica size seems to be empty. Check the route 53 DNS Registry");
4: return;
5: }
6:
7: // 计算 新增的集群节点地址
8: Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
9: toShutdown.removeAll(newPeerUrls);
10:
11: // 计算 删除的集群节点地址
12: Set<String> toAdd = new HashSet<>(newPeerUrls);
13: toAdd.removeAll(peerEurekaNodeUrls);
14:
15: if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
16: return;
17: }
18:
19: // 关闭删除的集群节点
20: // Remove peers no long available
21: List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
22: if (!toShutdown.isEmpty()) {
23: logger.info( "Removing no longer available peer nodes {}", toShutdown);
24: int i = 0;
25: while (i < newNodeList.size()) {
26: PeerEurekaNode eurekaNode = newNodeList.get(i);
27: if (toShutdown.contains(eurekaNode.getServiceUrl())) {
28: newNodeList.remove(i);
29: eurekaNode.shutDown(); // 关闭
30: } else {
31: i++;
32: }
33: }
34: }
35:
36: // 添加新增的集群节点
37: // Add new peers
38: if (!toAdd.isEmpty()) {
39: logger.info( "Adding new peer nodes {}", toAdd);
40: for (String peerUrl : toAdd) {
41: newNodeList.add(createPeerEurekaNode(peerUrl));
42: }
43: }
44:
45: // 赋值
46: this.peerEurekaNodes = newNodeList;
47: this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
48: }

  • 第 7 至 9 行 :计算新增的集群节点地址。
  • 第 11 至 13 行 :计算删除的集群节点地址。
  • 第 19 至 34 行 :关闭删除的集群节点。
  • 第 36 至 43 行 :添加新增的集群节点。调用 #createPeerEurekaNode(peerUrl) 方法,创建集群节点,代码如下:

            
            
    1: protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
    2: HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
    3: String targetHost = hostFromUrl(peerEurekaNodeUrl);
    4: if (targetHost == null) {
    5: targetHost = "host";
    6: }
    7: return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    8: }


2.3 集群节点

com.netflix.eureka.cluster.PeerEurekaNode ,单个集群节点。

点击 链接 查看构造方法

3. 获取初始注册信息

Eureka-Server 启动时,调用 PeerAwareInstanceRegistryImpl#syncUp() 方法,从集群的一个 Eureka-Server 节点获取初始注册信息,代码如下:


      
      
1: @Override
2: public int syncUp() {
3: // Copy entire entry from neighboring DS node
4: int count = 0;
5:
6: for ( int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
7: // 未读取到注册信息,sleep 等待
8: if (i > 0) {
9: try {
10: Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
11: } catch (InterruptedException e) {
12: logger.warn( "Interrupted during registry transfer..");
13: break;
14: }
15: }
16:
17: // 获取注册信息
18: Applications apps = eurekaClient.getApplications();
19: for (Application app : apps.getRegisteredApplications()) {
20: for (InstanceInfo instance : app.getInstances()) {
21: try {
22: if (isRegisterable(instance)) { // 判断是否能够注册
23: register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 注册
24: count++;
25: }
26: } catch (Throwable t) {
27: logger.error( "During DS init copy", t);
28: }
29: }
30: }
31: }
32: return count;
33: }
  • 第 7 至 15 行 :未获取到注册信息,sleep 等待再次重试。
  • 第 17 至 30 行 :获取注册信息,若获取到,注册到自身节点。


若调用 #syncUp() 方法,未获取到应用实例,则 Eureka-Server 会有一段时间( 默认:5 分钟,可配 )不允许被 Eureka-Client 获取注册信息,避免影响 Eureka-Client 。

  • 标记 Eureka-Server 启动时,未获取到应用实例,代码如下:

            
            
    // PeerAwareInstanceRegistryImpl.java
    private boolean peerInstancesTransferEmptyOnStartup = true;
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // ... 省略其他代码
    if (count > 0) {
    this.peerInstancesTransferEmptyOnStartup = false;
    }
    // ... 省略其他代码
    }
  • 判断 Eureka-Server 是否允许被 Eureka-Client 获取注册信息,代码如下:

            
            
    // PeerAwareInstanceRegistryImpl.java
    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
    if ( this.peerInstancesTransferEmptyOnStartup) {
    // 设置启动时间
    this.startupTime = System.currentTimeMillis();
    if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
    return false;
    }
    }
    // ... 省略其他代码
    return true;
    }

4. 同步注册信息

Eureka-Server 集群同步注册信息如下图:

  • Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定间隔( 默认值 :500 毫秒,可配 )向 Eureka-Server 集群内其他节点同步( 准实时,非实时 )。

4.1 同步操作类型

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action ,同步操作类型,代码如下:


      
      
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
// ... 省略监控相关属性
}

4.2 发起 Eureka-Server 同步操作

Eureka-Server 在完成 Eureka-Client 发起的上述操作在自身节点的执行后,向集群内其他 Eureka-Server 发起同步操作。以 Register 操作举例子,代码如下:


      
      
// PeerAwareInstanceRegistryImpl.java
public void register(final InstanceInfo info, final boolean isReplication) {
// 租约过期时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注册应用实例信息
super.register(info, leaseDuration, isReplication);
// Eureka-Server 复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 最后一行,调用 #replicateToPeers(...) 方法,传递对应的同步操作类型,发起同步操作。

#replicateToPeers(...) 方法,代码如下:

      
      
1: private void replicateToPeers(Action action, String appName, String id,
2: InstanceInfo info /* optional /,
3: InstanceStatus newStatus / optional */, boolean isReplication) {
4: Stopwatch tracer = action.getTimer().start();
5: try {
6: if (isReplication) {
7: numberOfReplicationsLastMin.increment();
8: }
9:
10: // Eureka-Server 发起的请求 或者 集群为空
11: // If it is a replication already, do not replicate again as this will create a poison replication
12: if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
13: return;
14: }
15:
16: for ( final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
17: // If the url represents this host, do not replicate to yourself.
18: if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
19: continue;
20: }
21: replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
22: }
23: } finally {
24: tracer.stop();
25: }
26: }

  • 第 10 至 14 行 :Eureka-Server 在处理上述操作( Action ),无论来自 Eureka-Client 发起请求,还是 Eureka-Server 发起同步,调用的内部方法相同,通过 isReplication=true 参数,避免死循环同步。
  • 第 16 至 22 行 :循环集群内每个节点,调用 #replicateInstanceActionsToPeers(...) 方法,发起同步操作。

#replicateInstanceActionsToPeers(...) 方法,代码如下:

      
      
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error( "Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}

  • Cancel :调用 PeerEurekaNode#cancel(...) 方法,点击 链接 查看实现。
  • Heartbeat :调用 PeerEurekaNode#heartbeat(...) 方法,点击 链接 查看实现。
  • Register :调用 PeerEurekaNode#register(...) 方法,点击 链接 查看实现。
  • StatusUpdate :调用 PeerEurekaNode#statusUpdate(...) 方法,点击 链接 查看实现。
  • DeleteStatusOverride :调用 PeerEurekaNode#deleteStatusOverride(...) 方法,点击 链接 查看实现。
  • 上面的每个方法实现,我们会看到类似这么一段代码 :

            
            
    batchingDispatcher.process(
    taskId( "${action}", appName, id), // id
    new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
    @Override
    public EurekaHttpResponse<Void> execute() {
    return replicationClient.doString(...);
    }
    @Override
    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
    // do Something...
    }
    }, // ReplicationTask 子类
    expiryTime
    )
    • #task(...) 方法,生成同步操作任务编号。代码如下:

                
                
      private static String taskId(String requestType, String appName, String id) {
      return requestType + '#' + appName + '/' + id;
      }
      • 相同应用实例的相同同步操作使用相同任务编号。在 《Eureka 源码解析 —— 任务批处理》「2. 整体流程」 中,我们看到” 接收线程( Runner )合并任务,将相同任务编号的任务合并,只执行一次。 “,因此,相同应用实例的相同同步操作就能被合并,减少操作量。例如,Eureka-Server 同步某个应用实例的 Heartbeat 操作,接收同步的 Eureak-Server 挂了,一方面这个应用的这次操作会重试,另一方面,这个应用实例会发起新的 Heartbeat 操作,通过任务编号合并,接收同步的 Eureka-Server 恢复后,减少收到重复积压的任务。



4.1.1 同步操作任务

  • com.netflix.eureka.cluster.ReplicationTask ,同步任务抽象类
    • 点击 链接 查看 ReplicationTask 代码。
    • 定义了 #getTaskName() 抽象方法。
    • 定义了 #execute() 抽象方法,执行同步任务。
    • 实现了 #handleSuccess() 方法,处理成功执行同步结果。
    • 实现了 #handleFailure(...) 方法,处理失败执行同步结果。

  • com.netflix.eureka.cluster.InstanceReplicationTask ,同步应用实例任务抽象类

    • 点击 链接 查看 InstanceReplicationTask 代码。
    • 实现了父类 #getTaskName() 抽象方法。

  • com.netflix.eureka.cluster.AsgReplicationTask ,亚马逊 AWS 使用,暂时跳过。

从上面 PeerEurekaNode#同步操作(...) 方法,全部实现了 InstanceReplicationTask 类的 #execute() 方法,部分重写了 #handleFailure(...) 方法。

4.1.2 同步操作任务处理器

com.netflix.eureka.cluster.InstanceReplicationTask ,实现 TaskProcessor 接口,同步操作任务处理器。

ReplicationTaskProcessor#process(task)处理单任务,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态,暂时跳过,感兴趣的同学可以点击 链接 查看方法代码。

ReplicationTaskProcessor#process(tasks)处理批量任务,用于 Eureka-Server 集群注册信息的同步操作任务,通过调用被同步的 Eureka-Server 的 peerreplication/batch/ 接口,一次性将批量( 多个 )的同步操作任务发起请求,代码如下:

      
      
1: @Override
2: public ProcessingResult process(List<ReplicationTask> tasks) {
3: // 创建 批量提交同步操作任务的请求对象
4: ReplicationList list = createReplicationListOf(tasks);
5: try {
6: // 发起 批量提交同步操作任务的请求
7: EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
8: // 处理 批量提交同步操作任务的响应
9: int statusCode = response.getStatusCode();
10: if (!isSuccess(statusCode)) {
11: if (statusCode == 503) {
12: logger.warn( "Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
13: return ProcessingResult.Congestion;
14: } else {
15: // Unexpected error returned from the server. This should ideally never happen.
16: logger.error( "Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
17: return ProcessingResult.PermanentError;
18: }
19: } else {
20: handleBatchResponse(tasks, response.getEntity().getResponseList());
21: }
22: } catch (Throwable e) {
23: if (isNetworkConnectException(e)) {
24: logNetworkErrorSample( null, e);
25: return ProcessingResult.TransientError;
26: } else {
27: logger.error( "Not re-trying this exception because it does not seem to be a network exception", e);
28: return ProcessingResult.PermanentError;
29: }
30: }
31: return ProcessingResult.Success;
32: }

  • 第 4 行 :创建批量提交同步操作任务的请求对象( ReplicationList ) 。比较易懂,咱就不啰嗦贴代码了。
    • ReplicationList ,点击 链接 查看类。
    • ReplicationInstance ,点击 链接 查看类。
    • #createReplicationListOf(...) ,点击 链接 查看方法。
    • #createReplicationInstanceOf(...) ,点击 链接 查看方法。

  • 第 7 行 :调用 JerseyReplicationClient#submitBatchUpdates(...) 方法,请求 peerreplication/batch/ 接口,一次性将批量( 多个 )的同步操作任务发起请求。

    • JerseyReplicationClient#submitBatchUpdates(...) 方法,点击 链接 查看方法。
    • ReplicationListResponse ,点击 链接 查看类。
    • ReplicationInstanceResponse ,点击 链接 查看类。

  • 第 9 至 31 行 :处理批量提交同步操作任务的响应,在 「4.4 处理 Eureka-Server 同步结果」 详细解析。

4.3 接收 Eureka-Server 同步操作

com.netflix.eureka.resources.PeerReplicationResource ,同步操作任务 Resource ( Controller )。

peerreplication/batch/ 接口,映射 PeerReplicationResource#batchReplication(...) 方法,代码如下:

      
      
1: @Path( "batch")
2: @POST
3: public Response batchReplication(ReplicationList replicationList) {
4: try {
5: ReplicationListResponse batchResponse = new ReplicationListResponse();
6: // 逐个同步操作任务处理,并将处理结果( ReplicationInstanceResponse ) 合并到 ReplicationListResponse 。
7: for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
8: try {
9: batchResponse.addResponse(dispatch(instanceInfo));
10: } catch (Exception e) {
11: batchResponse.addResponse( new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
12: logger.error(instanceInfo.getAction() + " request processing failed for batch item "
13: + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
14: }
15: }
16: return Response.ok(batchResponse).build();
17: } catch (Throwable e) {
18: logger.error( "Cannot execute batch Request", e);
19: return Response.status(Status.INTERNAL_SERVER_ERROR).build();
20: }
21: }
22:
23: private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
24: ApplicationResource applicationResource = createApplicationResource(instanceInfo);
25: InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
26:
27: String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
28: String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
29: String instanceStatus = toString(instanceInfo.getStatus());
30:
31: Builder singleResponseBuilder = new Builder();
32: switch (instanceInfo.getAction()) {
33: case Register:
34: singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
35: break;
36: case Heartbeat:
37: singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
38: break;
39: case Cancel:
40: singleResponseBuilder = handleCancel(resource);
41: break;
42: case StatusUpdate:
43: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
44: break;
45: case DeleteStatusOverride:
46: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
47: break;
48: }
49: return singleResponseBuilder.build();
50: }

  • 第 7 至 15 行 :逐个处理单个同步操作任务,并将处理结果( ReplicationInstanceResponse ) 添加到 ReplicationListResponse 。
  • 第 23 至 50 行 :处理单个同步操作任务,返回处理结果( ReplicationInstanceResponse )。
    • 第 24 至 25 行 :创建 ApplicationResource , InstanceResource 。我们看到,实际该方法是把单个同步操作任务提交到其他 Resource ( Controller ) 处理,Eureka-Server 收到 Eureka-Client 请求响应的 Resource ( Controller ) 是相同的逻辑
    • Register :点击 链接 查看 #handleRegister(...) 方法。
    • Heartbeat :点击 链接 查看 #handleHeartbeat(...) 方法。
    • Cancel :点击 链接 查看 #handleCancel(...) 方法。
    • StatusUpdate :点击 链接 查看 #handleStatusUpdate(...) 方法。
    • DeleteStatusOverride :点击 链接 查看 #handleDeleteStatusOverride(...) 方法。


4.4 处理 Eureka-Server 同步结果

�� 想想就有小激动,终于写到这里了。

ReplicationTaskProcessor#process(tasks) 方法,处理批量提交同步操作任务的响应,代码如下:

      
      
1: @Override
2: public ProcessingResult process(List<ReplicationTask> tasks) {
3: // 创建 批量提交同步操作任务的请求对象
4: ReplicationList list = createReplicationListOf(tasks);
5: try {
6: // 发起 批量提交同步操作任务的请求
7: EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
8: // 处理 批量提交同步操作任务的响应
9: int statusCode = response.getStatusCode();
10: if (!isSuccess(statusCode)) {
11: if (statusCode == 503) {
12: logger.warn( "Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
13: return ProcessingResult.Congestion;
14: } else {
15: // Unexpected error returned from the server. This should ideally never happen.
16: logger.error( "Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
17: return ProcessingResult.PermanentError;
18: }
19: } else {
20: handleBatchResponse(tasks, response.getEntity().getResponseList());
21: }
22: } catch (Throwable e) {
23: if (isNetworkConnectException(e)) {
24: logNetworkErrorSample( null, e);
25: return ProcessingResult.TransientError;
26: } else {
27: logger.error( "Not re-trying this exception because it does not seem to be a network exception", e);
28: return ProcessingResult.PermanentError;
29: }
30: }
31: return ProcessingResult.Success;
32: }

  • 第 10 行 ,调用 #isSuccess(...) 方法,判断请求是否成功,响应状态码是否在 [200, 300) 范围内。
  • 第 11 至 13 行 :状态码 503 ,目前 Eureka-Server 返回 503 的原因是被限流。在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 详细解析。该情况为瞬时错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
  • 第 14 至 18 行 :非预期状态码,目前 Eureka-Server 在代码上看下来,不会返回这样的状态码。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
  • 第 20 行 :请求成功,调用 #handleBatchResponse(...) 方法,逐个处理每个 ReplicationTask 和 ReplicationInstanceResponse 。这里有一点要注意下,请求成功指的是整个请求成功,实际每个 ReplicationInstanceResponse 可能返回的状态码不在 [200, 300) 范围内。该方法下文详细解析。
  • 第 23 至 25 行 :请求发生网络异常,例如网络超时,打印网络异常日志。目前日志的打印为部分采样,条件为网络发生异常每间隔 10 秒打印一条,避免网络发生异常打印超级大量的日志。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。

    • #isNetworkConnectException(...) ,点击 链接 查看方法。
    • #logNetworkErrorSample(...) ,点击 链接 查看方法。

  • 第 26 至 29 行 :非预期异常,目前 Eureka-Server 在代码上看下来,不会抛出这样的异常。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。



#handleBatchResponse(...) 方法,代码如下:

      
      
private void handleBatchResponse(List<ReplicationTask> tasks, List<ReplicationInstanceResponse> responseList) {
if (tasks.size() != responseList.size()) {
// This should ideally never happen unless there is a bug in the software.
logger.error( "Batch response size different from submitted task list ({} != {}); skipping response analysis", responseList.size(), tasks.size());
return;
}
for ( int i = 0; i < tasks.size(); i++) {
handleBatchResponse(tasks.get(i), responseList.get(i));
}
}
private void handleBatchResponse(ReplicationTask task, ReplicationInstanceResponse response) {
// 执行成功
int statusCode = response.getStatusCode();
if (isSuccess(statusCode)) {
task.handleSuccess();
return;
}
// 执行失败
try {
task.handleFailure(response.getStatusCode(), response.getResponseEntity());
} catch (Throwable e) {
logger.error( "Replication task " + task.getTaskName() + " error handler failure", e);
}
}

  • ReplicationTask#handleSuccess() 方法,无任务同步操作任务重写,是个空方法,代码如下:

            
            
    // ReplicationTask.java
    public void handleSuccess() {
    }
  • ReplicationTask#handleFailure() 方法,有两个同步操作任务重写:

    • Cancel :当 Eureka-Server 不存在下线的应用实例时,返回 404 状态码,此时打印错误日志,代码如下:

                
                
      // PeerEurekaNode#cancel(...)
      @Override
      public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
      super.handleFailure(statusCode, responseEntity);
      if (statusCode == 404) {
      logger.warn( "{}: missing entry.", getTaskName());
      }
      }
      • x

    • Heartbeat :情况较为复杂,我们换一行继续说,避免排版有问题,影响阅读。



噔噔噔恰,本文的重要头戏来啦!Last But Very Importment !!!

Eureka-Server 是允许同一时刻允许在任意节点被 Eureka-Client 发起写入相关的操作,网络是不可靠的资源,Eureka-Client 可能向一个 Eureka-Server 注册成功,但是网络波动,导致 Eureka-Client 误以为失败,此时恰好 Eureka-Client 变更了应用实例的状态,重试向另一个 Eureka-Server 注册,那么两个 Eureka-Server 对该应用实例的状态产生冲突。

再例如…… 我们不要继续举例子,网络波动真的很复杂。我们来看看 Eureka 是怎么处理的。

应用实例( InstanceInfo ) 的 lastDirtyTimestamp 属性,使用时间戳,表示应用实例的版本号,当请求方( 不仅仅是 Eureka-Client ,也可能是同步注册操作的 Eureka-Server ) 向 Eureka-Server 发起注册时,若 Eureka-Server 已存在拥有更大 lastDirtyTimestamp 该实例( 相同应用并且相同应用实例编号被认为是相同实例 ),则请求方注册的应用实例( InstanceInfo ) 无法覆盖注册此 Eureka-Server 的该实例( 见 AbstractInstanceRegistry#register(...) 方法 )。例如我们上面举的例子,第一个 Eureka-Server 向 第二个 Eureka-Server 同步注册应用实例时,不会注册覆盖,反倒是第二个 Eureka-Server 同步注册应用到第一个 Eureka-Server ,注册覆盖成功,因为 lastDirtyTimestamp ( 应用实例状态变更时,可以设置 lastDirtyTimestamp 为当前时间,见 ApplicationInfoManager#setInstanceStatus(status) 方法 )。

但是光靠注册请求判断 lastDirtyTimestamp 显然是不够的,因为网络异常情况下时,同步操作任务多次执行失败到达过期时间后,此时在 Eureka-Server 集群同步起到最终一致性最最最关键性出现了:Heartbeat 。因为 Heartbeat 会周期性的执行,通过它一方面可以判断 Eureka-Server 是否存在心跳对应的应用实例,另外一方面可以比较应用实例的 lastDirtyTimestamp 。当满足下面任意条件,Eureka-Server 返回 404 状态码:

请求方接收到 404 状态码返回后,认为 Eureka-Server 应用实例实际是不存在的,重新发起应用实例的注册。以本文的 Heartbeat 为例子,代码如下:

      
      
// PeerEurekaNode#heartbeat(...)
1: @Override
2: public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
3: super.handleFailure(statusCode, responseEntity);
4: if (statusCode == 404) {
5: logger.warn( "{}: missing entry.", getTaskName());
6: if (info != null) {
7: logger.warn( "{}: cannot find instance id {} and hence replicating the instance with status {}",
8: getTaskName(), info.getId(), info.getStatus());
9: register(info);
10: }
11: } else if (config.shouldSyncWhenTimestampDiffers()) {
12: InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
13: if (peerInstanceInfo != null) {
14: syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
15: }
16: }
17: }

  • 第 4 至 10 行 :接收到 404 状态码,调用 #register(...) 方法,向该被心跳同步操作失败的 Eureka-Server 发起注册本地的应用实例的请求。

    • 上述 3) ,会使用请求参数 overriddenStatus 存储到 Eureka-Server 的应用实例覆盖状态集合( AbstractInstanceRegistry.overriddenInstanceStatusMap ),点击 链接 查看触发条件代码位置。

  • 第 11 至 16 行 :恰好是 3) 反过来的情况,本地的应用实例的 lastDirtyTimestamp 小于 Eureka-Server 该应用实例的,此时 Eureka-Server 返回 409 状态码,点击 链接 查看触发条件代码位置。调用 #syncInstancesIfTimestampDiffers() 方法,覆盖注册本地应用实例,点击 链接 查看方法。


OK,撒花!记住:Eureka 通过 Heartbeat 实现 Eureka-Server 集群同步的最终一致性。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐