Eureka 源码解析 —— Eureka-Server 集群同步
摘要: 原创出处 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道源码」欢迎转载,保留摘要,谢谢!本文主要基于 Eureka 1.8.X 版本1. 概述2. 集群节点初始化与更新2.1 集群节点启动2.2 更新集群节点信息2.3 集群节点3. 获取初始注册信息4. 同步注册信息4.1 同步操作类型4.2 发起 Eureka-Serve
摘要: 原创出处 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Eureka 1.8.X 版本
1. 概述
本文主要分享 Eureka-Server 集群同步注册信息。
Eureka-Server 集群如下图:
- Eureka-Server 集群不区分主从节点或者 Primary & Secondary 节点,所有节点相同角色( 也就是没有角色 ),完全对等。
- Eureka-Client 可以向任意 Eureka-Client 发起任意读写操作,Eureka-Server 将操作复制到另外的 Eureka-Server 以达到最终一致性。注意,Eureka-Server 是选择了 AP 的组件。
Eureka-Server 可以使用直接配置所有节点的服务地址,或者基于 DNS 配置。推荐阅读:《Spring Cloud构建微服务架构(六)高可用服务注册中心》 。
本文主要类在 com.netflix.eureka.cluster
包下。
OK,让我们开始愉快的遨游在代码的海洋。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
ps :注意,本文提到的同步,准确来说是复制( Replication )。
2. 集群节点初始化与更新
com.netflix.eureka.cluster.PeerEurekaNodes
,Eureka-Server 集群节点集合 。构造方法如下 :
|
peerEurekaNodes
,peerEurekaNodeUrls
,taskExecutor
属性,在构造方法中未设置和初始化,而是在PeerEurekaNodes#start()
方法,设置和初始化,下文我们会解析这个方法。- Eureka-Server 在初始化时,调用
EurekaBootStrap#getPeerEurekaNodes(...)
方法,创建 PeerEurekaNodes ,点击 链接 查看该方法的实现。
2.1 集群节点启动
调用 PeerEurekaNodes#start()
方法,集群节点启动,主要完成两个逻辑:
- 初始化集群节点信息
- 初始化固定周期( 默认:10 分钟,可配置 )更新集群节点信息的任务
代码如下:
|
- 第 15 行 && 第 21 行 :调用
#updatePeerEurekaNodes()
方法,更新集群节点信息。
2.2 更新集群节点信息
调用 #resolvePeerUrls()
方法,获得 Eureka-Server 集群服务地址数组,代码如下:
|
- 第 2 至 5 行 :获得 Eureka-Server 集群服务地址数组。
EndpointUtils#getDiscoveryServiceUrls(...)
方法,逻辑与 《Eureka 源码解析 —— EndPoint 与 解析器》「3.4 ConfigClusterResolver」 基本类似。EndpointUtils 正在逐步,猜测未来这里会替换。 - 第 7 至 15 行 :移除自身节点,避免向自己同步。
调用 #updatePeerEurekaNodes()
方法,更新集群节点信息,主要完成两部分逻辑:
- 添加新增的集群节点
- 关闭删除的集群节点
代码如下:
|
- 第 7 至 9 行 :计算新增的集群节点地址。
- 第 11 至 13 行 :计算删除的集群节点地址。
- 第 19 至 34 行 :关闭删除的集群节点。
第 36 至 43 行 :添加新增的集群节点。调用
#createPeerEurekaNode(peerUrl)
方法,创建集群节点,代码如下:1: protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {2: HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);3: String targetHost = hostFromUrl(peerEurekaNodeUrl);4: if (targetHost == null) {5: targetHost = "host";6: }7: return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);8: }- 第 2 行 :创建 Eureka-Server 集群通信客户端,在 《Eureka 源码解析 —— 网络通信》「4.2 JerseyReplicationClient」 有详细解析。
- 第 7 行 :创建 PeerEurekaNode ,在 「2.3 PeerEurekaNode」 有详细解析。
2.3 集群节点
com.netflix.eureka.cluster.PeerEurekaNode
,单个集群节点。
点击 链接 查看构造方法
- 第 129 行 :创建 ReplicationTaskProcessor 。在 「4.1.2 同步操作任务处理器」 详细解析
- 第 131 至 140 行 :创建批量任务分发器,在 《Eureka 源码解析 —— 任务批处理》 有详细解析。
- 第 142 至 151 行 :创建单任务分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG (
Autoscaling Group
) 同步状态。暂时跳过。
3. 获取初始注册信息
Eureka-Server 启动时,调用 PeerAwareInstanceRegistryImpl#syncUp()
方法,从集群的一个 Eureka-Server 节点获取初始注册信息,代码如下:
|
- 第 7 至 15 行 :未获取到注册信息,
sleep
等待再次重试。 - 第 17 至 30 行 :获取注册信息,若获取到,注册到自身节点。
- 第 22 行 :判断应用实例是否能够注册到自身节点。主要用于亚马逊 AWS 环境下的判断,若非部署在亚马逊里,都返回
true
。点击 链接 查看实现。 - 第 23 行 :调用
#register()
方法,注册应用实例到自身节点。在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》 有详细解析。
- 第 22 行 :判断应用实例是否能够注册到自身节点。主要用于亚马逊 AWS 环境下的判断,若非部署在亚马逊里,都返回
若调用 #syncUp()
方法,未获取到应用实例,则 Eureka-Server 会有一段时间( 默认:5 分钟,可配 )不允许被 Eureka-Client 获取注册信息,避免影响 Eureka-Client 。
标记 Eureka-Server 启动时,未获取到应用实例,代码如下:
// PeerAwareInstanceRegistryImpl.javaprivate boolean peerInstancesTransferEmptyOnStartup = true;public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// ... 省略其他代码if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}// ... 省略其他代码}
判断 Eureka-Server 是否允许被 Eureka-Client 获取注册信息,代码如下:
// PeerAwareInstanceRegistryImpl.javapublic boolean shouldAllowAccess(boolean remoteRegionRequired) {if ( this.peerInstancesTransferEmptyOnStartup) {// 设置启动时间this.startupTime = System.currentTimeMillis();if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {return false;}}// ... 省略其他代码return true;}
4. 同步注册信息
Eureka-Server 集群同步注册信息如下图:
- Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定间隔( 默认值 :500 毫秒,可配 )向 Eureka-Server 集群内其他节点同步( 准实时,非实时 )。
4.1 同步操作类型
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action
,同步操作类型,代码如下:
|
- Register ,在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》 有详细解析
- Heartbeat ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》 有详细解析
- Cancel ,在 《Eureka 源码解析 —— 应用实例注册发现(三)之下线》 有详细解析
- StatusUpdate ,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》 有详细解析
- DeleteStatusOverride ,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》 有详细解析
4.2 发起 Eureka-Server 同步操作
Eureka-Server 在完成 Eureka-Client 发起的上述操作在自身节点的执行后,向集群内其他 Eureka-Server 发起同步操作。以 Register 操作举例子,代码如下:
|
- 最后一行,调用
#replicateToPeers(...)
方法,传递对应的同步操作类型,发起同步操作。
#replicateToPeers(...)
方法,代码如下:
|
- 第 10 至 14 行 :Eureka-Server 在处理上述操作( Action ),无论来自 Eureka-Client 发起请求,还是 Eureka-Server 发起同步,调用的内部方法相同,通过
isReplication=true
参数,避免死循环同步。 - 第 16 至 22 行 :循环集群内每个节点,调用
#replicateInstanceActionsToPeers(...)
方法,发起同步操作。
#replicateInstanceActionsToPeers(...)
方法,代码如下:
|
- Cancel :调用
PeerEurekaNode#cancel(...)
方法,点击 链接 查看实现。 - Heartbeat :调用
PeerEurekaNode#heartbeat(...)
方法,点击 链接 查看实现。 - Register :调用
PeerEurekaNode#register(...)
方法,点击 链接 查看实现。 - StatusUpdate :调用
PeerEurekaNode#statusUpdate(...)
方法,点击 链接 查看实现。 - DeleteStatusOverride :调用
PeerEurekaNode#deleteStatusOverride(...)
方法,点击 链接 查看实现。 上面的每个方法实现,我们都会看到类似这么一段代码 :
batchingDispatcher.process(taskId( "${action}", appName, id), // idnew InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {public EurekaHttpResponse<Void> execute() {return replicationClient.doString(...);}public void handleFailure(int statusCode, Object responseEntity) throws Throwable {// do Something...}}, // ReplicationTask 子类expiryTime)#task(...)
方法,生成同步操作任务编号。代码如下:private static String taskId(String requestType, String appName, String id) {return requestType + '#' + appName + '/' + id;}- 相同应用实例的相同同步操作使用相同任务编号。在 《Eureka 源码解析 —— 任务批处理》「2. 整体流程」 中,我们看到” 接收线程( Runner )合并任务,将相同任务编号的任务合并,只执行一次。 “,因此,相同应用实例的相同同步操作就能被合并,减少操作量。例如,Eureka-Server 同步某个应用实例的 Heartbeat 操作,接收同步的 Eureak-Server 挂了,一方面这个应用的这次操作会重试,另一方面,这个应用实例会发起新的 Heartbeat 操作,通过任务编号合并,接收同步的 Eureka-Server 恢复后,减少收到重复积压的任务。
- InstanceReplicationTask ,同步操作任务,在 「4.1.1 同步操作任务」 详细解析。
expiryTime
,任务过期时间。
4.1.1 同步操作任务
com.netflix.eureka.cluster.ReplicationTask
,同步任务抽象类- 点击 链接 查看 ReplicationTask 代码。
- 定义了
#getTaskName()
抽象方法。 - 定义了
#execute()
抽象方法,执行同步任务。 - 实现了
#handleSuccess()
方法,处理成功执行同步结果。 - 实现了
#handleFailure(...)
方法,处理失败执行同步结果。
com.netflix.eureka.cluster.InstanceReplicationTask
,同步应用实例任务抽象类
- 点击 链接 查看 InstanceReplicationTask 代码。
- 实现了父类
#getTaskName()
抽象方法。
com.netflix.eureka.cluster.AsgReplicationTask
,亚马逊 AWS 使用,暂时跳过。
从上面 PeerEurekaNode#同步操作(...)
方法,全部实现了 InstanceReplicationTask 类的 #execute()
方法,部分重写了 #handleFailure(...)
方法。
4.1.2 同步操作任务处理器
com.netflix.eureka.cluster.InstanceReplicationTask
,实现 TaskProcessor 接口,同步操作任务处理器。
- TaskProcessor ,在 《Eureka 源码解析 —— 任务批处理》「10. 任务执行器【执行任务】」 有详细解析。
- 点击 链接 查看 InstanceReplicationTask 代码。
ReplicationTaskProcessor#process(task)
,处理单任务,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group
) 同步状态,暂时跳过,感兴趣的同学可以点击 链接 查看方法代码。
ReplicationTaskProcessor#process(tasks)
,处理批量任务,用于 Eureka-Server 集群注册信息的同步操作任务,通过调用被同步的 Eureka-Server 的 peerreplication/batch/
接口,一次性将批量( 多个 )的同步操作任务发起请求,代码如下:
|
- 第 4 行 :创建批量提交同步操作任务的请求对象( ReplicationList ) 。比较易懂,咱就不啰嗦贴代码了。
- 第 7 行 :调用
JerseyReplicationClient#submitBatchUpdates(...)
方法,请求peerreplication/batch/
接口,一次性将批量( 多个 )的同步操作任务发起请求。
JerseyReplicationClient#submitBatchUpdates(...)
方法,点击 链接 查看方法。- ReplicationListResponse ,点击 链接 查看类。
- ReplicationInstanceResponse ,点击 链接 查看类。
- 第 9 至 31 行 :处理批量提交同步操作任务的响应,在 「4.4 处理 Eureka-Server 同步结果」 详细解析。
4.3 接收 Eureka-Server 同步操作
com.netflix.eureka.resources.PeerReplicationResource
,同步操作任务 Resource ( Controller )。
peerreplication/batch/
接口,映射 PeerReplicationResource#batchReplication(...)
方法,代码如下:
|
- 第 7 至 15 行 :逐个处理单个同步操作任务,并将处理结果( ReplicationInstanceResponse ) 添加到 ReplicationListResponse 。
- 第 23 至 50 行 :处理单个同步操作任务,返回处理结果( ReplicationInstanceResponse )。
- 第 24 至 25 行 :创建 ApplicationResource , InstanceResource 。我们看到,实际该方法是把单个同步操作任务提交到其他 Resource ( Controller ) 处理,Eureka-Server 收到 Eureka-Client 请求响应的 Resource ( Controller ) 是相同的逻辑。
- Register :点击 链接 查看
#handleRegister(...)
方法。 - Heartbeat :点击 链接 查看
#handleHeartbeat(...)
方法。 - Cancel :点击 链接 查看
#handleCancel(...)
方法。 - StatusUpdate :点击 链接 查看
#handleStatusUpdate(...)
方法。 - DeleteStatusOverride :点击 链接 查看
#handleDeleteStatusOverride(...)
方法。
4.4 处理 Eureka-Server 同步结果
�� 想想就有小激动,终于写到这里了。
接 ReplicationTaskProcessor#process(tasks)
方法,处理批量提交同步操作任务的响应,代码如下:
|
- 第 10 行 ,调用
#isSuccess(...)
方法,判断请求是否成功,响应状态码是否在 [200, 300) 范围内。 - 第 11 至 13 行 :状态码 503 ,目前 Eureka-Server 返回 503 的原因是被限流。在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 详细解析。该情况为瞬时错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
- 第 14 至 18 行 :非预期状态码,目前 Eureka-Server 在代码上看下来,不会返回这样的状态码。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
- 第 20 行 :请求成功,调用
#handleBatchResponse(...)
方法,逐个处理每个 ReplicationTask 和 ReplicationInstanceResponse 。这里有一点要注意下,请求成功指的是整个请求成功,实际每个 ReplicationInstanceResponse 可能返回的状态码不在 [200, 300) 范围内。该方法下文详细解析。 第 23 至 25 行 :请求发生网络异常,例如网络超时,打印网络异常日志。目前日志的打印为部分采样,条件为网络发生异常每间隔 10 秒打印一条,避免网络发生异常打印超级大量的日志。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
第 26 至 29 行 :非预期异常,目前 Eureka-Server 在代码上看下来,不会抛出这样的异常。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。
#handleBatchResponse(...)
方法,代码如下:
|
ReplicationTask#handleSuccess()
方法,无任务同步操作任务重写,是个空方法,代码如下:// ReplicationTask.javapublic void handleSuccess() {}ReplicationTask#handleFailure()
方法,有两个同步操作任务重写:Cancel :当 Eureka-Server 不存在下线的应用实例时,返回 404 状态码,此时打印错误日志,代码如下:
// PeerEurekaNode#cancel(...)public void handleFailure(int statusCode, Object responseEntity) throws Throwable {super.handleFailure(statusCode, responseEntity);if (statusCode == 404) {logger.warn( "{}: missing entry.", getTaskName());}}- x
Heartbeat :情况较为复杂,我们换一行继续说,避免排版有问题,影响阅读。
噔噔噔恰,本文的重要头戏来啦!Last But Very Importment !!!
Eureka-Server 是允许同一时刻允许在任意节点被 Eureka-Client 发起写入相关的操作,网络是不可靠的资源,Eureka-Client 可能向一个 Eureka-Server 注册成功,但是网络波动,导致 Eureka-Client 误以为失败,此时恰好 Eureka-Client 变更了应用实例的状态,重试向另一个 Eureka-Server 注册,那么两个 Eureka-Server 对该应用实例的状态产生冲突。
再例如…… 我们不要继续举例子,网络波动真的很复杂。我们来看看 Eureka 是怎么处理的。
应用实例( InstanceInfo ) 的 lastDirtyTimestamp
属性,使用时间戳,表示应用实例的版本号,当请求方( 不仅仅是 Eureka-Client ,也可能是同步注册操作的 Eureka-Server ) 向 Eureka-Server 发起注册时,若 Eureka-Server 已存在拥有更大 lastDirtyTimestamp
该实例( 相同应用并且相同应用实例编号被认为是相同实例 ),则请求方注册的应用实例( InstanceInfo ) 无法覆盖注册此 Eureka-Server 的该实例( 见 AbstractInstanceRegistry#register(...)
方法 )。例如我们上面举的例子,第一个 Eureka-Server 向 第二个 Eureka-Server 同步注册应用实例时,不会注册覆盖,反倒是第二个 Eureka-Server 同步注册应用到第一个 Eureka-Server ,注册覆盖成功,因为 lastDirtyTimestamp
( 应用实例状态变更时,可以设置 lastDirtyTimestamp
为当前时间,见 ApplicationInfoManager#setInstanceStatus(status)
方法 )。
但是光靠注册请求判断 lastDirtyTimestamp
显然是不够的,因为网络异常情况下时,同步操作任务多次执行失败到达过期时间后,此时在 Eureka-Server 集群同步起到最终一致性最最最关键性出现了:Heartbeat 。因为 Heartbeat 会周期性的执行,通过它一方面可以判断 Eureka-Server 是否存在心跳对应的应用实例,另外一方面可以比较应用实例的 lastDirtyTimestamp
。当满足下面任意条件,Eureka-Server 返回 404 状态码:
- 1)Eureka-Server 应用实例不存在,点击 链接 查看触发条件代码位置。
- 2)Eureka-Server 应用实例状态为
UNKNOWN
,点击 链接 查看触发条件代码位置。为什么会是UNKNOWN
,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》「 4.3 续租场景」 有详细解析。 - 3)请求的
lastDirtyTimestamp
更大,点击 链接 查看触发条件代码位置。
请求方接收到 404 状态码返回后,认为 Eureka-Server 应用实例实际是不存在的,重新发起应用实例的注册。以本文的 Heartbeat 为例子,代码如下:
|
第 4 至 10 行 :接收到 404 状态码,调用
#register(...)
方法,向该被心跳同步操作失败的 Eureka-Server 发起注册本地的应用实例的请求。- 上述 3) ,会使用请求参数
overriddenStatus
存储到 Eureka-Server 的应用实例覆盖状态集合(AbstractInstanceRegistry.overriddenInstanceStatusMap
),点击 链接 查看触发条件代码位置。
- 上述 3) ,会使用请求参数
第 11 至 16 行 :恰好是 3) 反过来的情况,本地的应用实例的
lastDirtyTimestamp
小于 Eureka-Server 该应用实例的,此时 Eureka-Server 返回 409 状态码,点击 链接 查看触发条件代码位置。调用#syncInstancesIfTimestampDiffers()
方法,覆盖注册本地应用实例,点击 链接 查看方法。
OK,撒花!记住:Eureka 通过 Heartbeat 实现 Eureka-Server 集群同步的最终一致性。
更多推荐
所有评论(0)