Eureka服务端集群数据同步原理
Eureka服务端集群数据同步原理Eureka作为服务注册中心,在集群部署下,集群内节点之间数据一致性是通过节点之间数据同步来实现的,数据同步采用的是Acceptor - Worker 模式的消息广播机制来完成的,整个过程大致就是:1)某个节点收到客户端的消息(注册、心跳、下线、状态变更等)后,刷新本地注册信息;2)遍历所有的节点(会排除自己),将消息转发到其他节点;为了实现数据同步(Eureka
Eureka服务端集群数据同步原理
Eureka作为服务注册中心,在集群部署下,集群内节点之间数据一致性是通过节点之间数据同步来实现的,数据同步采用的是Acceptor - Worker 模式的消息广播机制来完成的,整个过程大致就是:
1)某个节点收到客户端的消息(注册、心跳、下线、状态变更等)后,刷新本地注册信息;
2)遍历所有的节点(会排除自己),将消息转发到其他节点;
为了实现数据同步(Eureka保证的AP特性),每个几点需要维护一个节点列表,这个节点列表就是PeerEurekaNodes,她负责管理所有的PeerEurekaNode。
1 节点上数据处理流程(消息广播处理流程)
2 相关类图
3 详细分析
3.1 PeerEurekaNode
服务器节点对象,负责节点上任务相关线程的初始化和任务的处理;代表的就是一个Eureka Server Node(集群中一个对等节点),向集群中其他节点发送复制事件(Register、Renew、Cancel、Status Change),先来看看该类的定义。
1)构造函数负责初始化TaskDispatcher,包括批量任务处理和非批量任务处理两种;
2)register,cancel,heartbeat,updateStatus等方法将当前节点注册等信息复制到对等服务器节点;
3)PeerEurekaNode对象在Eureka Server服务初始化时根据serverUrl配置或者DNS地址获取所有Zone(如果没有配置Zone,默认是default)下的serviceUrl,每个serviceUrl代表一个节点;
4)PeerEurekaNodes类中在服务初始化时开启了一个定时线程,默认10分钟刷新PeerEurekaNode节点;(如果是基于配置的方式,需要支持配置刷新动态生效,如果是DNS模式,则定时线程会动态解析DNS以达到动态刷新效果);
5)部分参数:
名称 | 类型 | 说明 |
id | String | 执行器标识,默认“target_${host}" |
maxBufferSize | int | 复制任务最大任务数,默认10000 |
workloadSize | int | 每次发送批量任务最大数,默认250 |
workerCount | int | 工作线程数,默认20 |
maxBatchDelay | long | 默认500 |
congestionRetryDelayMs | long | 默认1000 |
networkFailureRetryMs | long | 默认100 |
taskProcessor | TaskProcessor<T> | 任务处理器,向其他节点发送任务请求 |
acceptorExecutor | AcceptorExecutor<ID, T> | 任务接收器,接收从客户端发来的任务 |
taskExecutor | TaskExecutors<ID, T> | 调用任务处理器的任务执行器 |
6)部分源码
public class PeerEurekaNode {
// 其他代码...
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
// 在事件删除之前能被复制的最大保留时间
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
// 创建复制任务
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
// 初始化批量任务调度
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
// 初始化非批量任务调度
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
// 实例注册任务
public void register(final InstanceInfo info) throws Exception {
// 任务过期时间,默认90s,获取取续约时间(默认30s)
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
// 实例下线任务
public void cancel(final String appName, final String id) throws Exception {
// 任务过期时间,默认30s
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
// 实例续约任务
public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
if (primeConnection) {
// We do not care about the result for priming request.
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
public void statusUpdate(final String asgName, final ASGStatus newStatus) {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
nonBatchingDispatcher.process(
asgName,
new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) {
public EurekaHttpResponse<?> execute() {
return replicationClient.statusUpdate(asgName, newStatus);
}
},
expiryTime
);
}
public void statusUpdate(final String appName, final String id,
final InstanceStatus newStatus, final InstanceInfo info) {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("statusUpdate", appName, id),
new InstanceReplicationTask(targetHost, Action.StatusUpdate, info, null, false) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.statusUpdate(appName, id, newStatus, info);
}
},
expiryTime
);
}
public void deleteStatusOverride(final String appName, final String id, final InstanceInfo info) {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("deleteStatusOverride", appName, id),
new InstanceReplicationTask(targetHost, Action.DeleteStatusOverride, info, null, false) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.deleteStatusOverride(appName, id, info);
}
},
expiryTime);
}
// 其他代码...
}
3.2 TaskDispatchers
消息接收任务调度初始化工具类:创建消息接收执行器,由执行器负责调度处理和关闭;
TaskDispatchers主要创建两种TaskDispatcher(批量处理的TaskDispatcher和非批量处理的TaskDispatcher):
public class TaskDispatchers {
public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
int maxBufferSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
// workerCount默认1
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
// workerCount默认20
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
}
3.3 AcceptorExecutor
消息接收执行器
-
1)接收从PeerEurekaNode发送过来的任务加入任务管理队列;
-
2)由线程AcceptorRunner负责将任务在不同队列之间流转,流转过程;
-
3)AcceptorRunner线程在轮询TaskQueue队列,如果在单次轮询中,没有将新的任务加入WorkQueue,则会睡眠10ms;
-
4)在将任务加入WorkQueue时存在需要延时情况,会延迟存放,避免瞬时发生高频异常现象;
-
5)如果TaskPending队列满,会清除reprocessQueue任务,acceptorQueue也会丢弃最老的任务;
-
6)批量任务指派到WorkQueue时,每次最多加入默认250个任务,TaskPending默认最大10000个任务,批次任务指派时默认加入延时500ms的任务;
class AcceptorExecutor<ID, T> {
// 任务ID
private final String id;
// 即将执行的任务池(pendingTasks)中任务最大任务数,默认值10000
private final int maxBufferSize;
// 批量发送时最大批量大小,默认250
private final int maxBatchingSize;
// 批量发送最大延时,默认500ms
private final long maxBatchingDelay;
// 控制acceptorThread是否中断
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
// acceptorQueue和reprocessQueue负责存放接收消息广播任务
// 任务接收队列
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
// 需要重新处理的任务队列
private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
private final Thread acceptorThread;
// 即将处理的任务
private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
private final Deque<ID> processingOrder = new LinkedList<>();
private final Semaphore singleItemWorkRequests = new Semaphore(0);
// 单Worker模式下的任务项队列(即将要交给Workder处理的任务)
private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
private final Semaphore batchWorkRequests = new Semaphore(0);
// 批量Worker模式下的任务项队列(即将要交给Workder处理的任务)
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
// 重试任务整形
private final TrafficShaper trafficShaper;
AcceptorExecutor(String id,
int maxBufferSize,
int maxBatchingSize,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs) {
this.id = id;
this.maxBufferSize = maxBufferSize;
this.maxBatchingSize = maxBatchingSize;
this.maxBatchingDelay = maxBatchingDelay;
this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
// 创建并开启消息接收执行线程
this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
this.acceptorThread.setDaemon(true);
this.acceptorThread.start();
... // 监控初始化(监控相关在另外一篇介绍)
}
// 普通任务入队
void process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
// 多个重试任务入队
void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
reprocessQueue.addAll(holders);
replayedTasks += holders.size(); // 监控指标参数
trafficShaper.registerFailure(processingResult);
}
// 单个重试任务入队
void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) {
reprocessQueue.add(taskHolder);
replayedTasks++; // 监控指标参数
trafficShaper.registerFailure(processingResult);
}
// 获取单任务Worker项队列
BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
singleItemWorkRequests.release();
return singleItemWorkQueue;
}
// 获取批量任务Worker项队列
BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
batchWorkRequests.release();
return batchWorkQueue;
}
// 停止Worker线程
void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
Monitors.unregisterObject(id, this);
acceptorThread.interrupt();
}
}
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
// 如果发生了网络故障事件(Congestion或者TransientError),scheduleTime时间需要延迟
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
private boolean isFull() {
return pendingTasks.size() >= maxBufferSize;
}
private void drainInputQueues() throws InterruptedException {
do {
// 将任务加入pendingTasks集合
drainReprocessQueue();
drainAcceptorQueue();
if (!isShutdown.get()) {
// 如果任务队列空了,则在acceptorQueue上阻塞10ms
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}
/**
* acceptor队列“排水”,其实就是将acceptorQueue中任务加入处理队列processingOrder和pendingTasks集合
*/
private void drainAcceptorQueue() {
while (!acceptorQueue.isEmpty()) {
appendTaskHolder(acceptorQueue.poll());
}
}
/**
* reprocess队列“排水”,其实就是将reprocessQueue中任务加入处理队列processingOrder和pendingTasks集合
*/
private void drainReprocessQueue() {
long now = System.currentTimeMillis();
// 只要reprocessQueue队列中有任务且pendingTasks队列未满,则取出reprocessQueue队列中的任务(未过期任务)加入pendingTasks队列
while (!reprocessQueue.isEmpty() && !isFull()) {
// 取出reprocessQueue中队尾任务(这里可能是考虑到重试任务队列优化处理新进来的任务)
TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
ID id = taskHolder.getId();
if (taskHolder.getExpiryTime() <= now) {
expiredTasks++;
} else if (pendingTasks.containsKey(id)) {
overriddenTasks++;
} else {
pendingTasks.put(id, taskHolder);
// 加入队首位置
processingOrder.addFirst(id);
}
}
if (isFull()) {
queueOverflows += reprocessQueue.size();
// 如果isFull,就会清除掉重试任务队列
reprocessQueue.clear();
}
}
/**
* 加入任务处理队列,如果任务存在,则记录任务覆盖次数;如果pendingTasks队列满,则移除processingOrder队首任务;
*/
private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
if (isFull()) {
// 如果满了,就从pendingTasks中删除processingOrder最早进入的任务(这样做的目的是丢弃processingOrder中最老的任务
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else {
overriddenTasks++;
}
}
/**
* 单任务指派:将任务存放到即将交给worker的队列singleItemWorkQueue
*/
void assignSingleItemWork() {
if (!processingOrder.isEmpty()) {
if (singleItemWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
while (!processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
singleItemWorkQueue.add(holder);
return;
}
expiredTasks++;
}
singleItemWorkRequests.release();
}
}
}
/**
* 批量任务指派:将任务存放到即将交给worker的队列batchWorkQueue
*/
void assignBatchWork() {
if (hasEnoughTasksForNextBatch()) {
// 获得许可信号量
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
// 从pendingTask集合中取出len个任务,放入batchWorkQueue
while (holders.size() < len && !processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
// 非过期任务
holders.add(holder);
} else {
expiredTasks++;
}
}
if (holders.isEmpty()) {
// pendingTasks任务取完则释放许可信号量
batchWorkRequests.release();
} else {
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
batchWorkQueue.add(holders);
}
}
}
}
/**
* 判断任务是否满足批量发送
*/
private boolean hasEnoughTasksForNextBatch() {
if (processingOrder.isEmpty()) {
return false;
}
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
// 队首任务提交时间是否过了任务延迟时间
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
return delay >= maxBatchingDelay;
}
}
}
3.4 任务调度整形TrafficShaper
在将任务分派给Worker线程之前,提供任务准入控制策略,主要用于在发生网络拥堵(Congestion)、瞬时故障(TransientError)时,进行延迟处理。
-
1)网络拥堵(Congestion)默认延时1s,最大不超过30s;
-
2)网络瞬时故障(TransientError)是,最大延时100ms,默认也不超过30s;
-
3)transmissionDelay方法返回的就是(上次异常发生时间与当前时间之差)距离延迟时间还差多少时间的意思;
class TrafficShaper {
// 最大延迟时间,30s
private static final long MAX_DELAY = 30 * 1000;
// 网络拥堵延时
private final long congestionRetryDelayMs;
// 网络故障延时
private final long networkFailureRetryMs;
// 记录最近的网络拥堵和故障时间
private volatile long lastCongestionError;
private volatile long lastNetworkFailure;
// congestionRetryDelayMs从PeerEurekaNode来,默认1000ms, networkFailureRetryMs默认100ms
TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) {
this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs);
this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs);
}
void registerFailure(ProcessingResult processingResult) {
if (processingResult == ProcessingResult.Congestion) {
lastCongestionError = System.currentTimeMillis();
} else if (processingResult == ProcessingResult.TransientError) {
lastNetworkFailure = System.currentTimeMillis();
}
}
long transmissionDelay() {
if (lastCongestionError == -1 && lastNetworkFailure == -1) {
return 0;
}
long now = System.currentTimeMillis();
if (lastCongestionError != -1) {
long congestionDelay = now - lastCongestionError;
if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
return congestionRetryDelayMs - congestionDelay;
}
lastCongestionError = -1;
}
if (lastNetworkFailure != -1) {
long failureDelay = now - lastNetworkFailure;
if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
return networkFailureRetryMs - failureDelay;
}
lastNetworkFailure = -1;
}
return 0;
}
}
3.5 TaskExecutors
负责管理一些worker线程,worker线程负责从AcceptorExecutor获取workQueue任务,交给TaskProcessor执行。分为两种线程:
第一种:SingleTaskWorkerRunnable:单个任务Worker线程,默认1个;
// 单个任务Worker线程
static class SingleTaskWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
SingleTaskWorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> acceptorExecutor) {
super(workerName, isShutdown, metrics, processor, acceptorExecutor);
}
@Override
public void run() {
try {
while (!isShutdown.get()) {
// 从AcceptorExecutor单个任务队列中获取任务
BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem();
TaskHolder<ID, T> taskHolder;
// 没有任务会阻塞1s
while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
if (isShutdown.get()) {
return;
}
}
metrics.registerExpiryTime(taskHolder);
if (taskHolder != null) {
// 向其他服务节点发送任务(实例变更、实例事件)
ProcessingResult result = processor.process(taskHolder.getTask());
switch (result) {
case Success:
break;
case Congestion:
case TransientError: // 加入重试任务队列
taskDispatcher.reprocess(taskHolder, result);
break;
case PermanentError:
logger.warn("Discarding a task of {} due to permanent error", workerName);
}
metrics.registerTaskResult(result, 1);
}
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
}
第二种:BatchWorkerRunnable:批量任务Worker线程,默认20个;
// 批量任务Worker线程
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
BatchWorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> acceptorExecutor) {
super(workerName, isShutdown, metrics, processor, acceptorExecutor);
}
@Override
public void run() {
try {
while (!isShutdown.get()) {
// 获取任务
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
// 向其他服务节点发送任务(实例变更、实例事件)
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
// 加入重试任务队列
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
// 从AcceptorExecutor批量任务队列中获取任务
BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
List<TaskHolder<ID, T>> result;
do {
// 没有任务会阻塞1s
result = workQueue.poll(1, TimeUnit.SECONDS);
} while (!isShutdown.get() && result == null);
return (result == null) ? new ArrayList<>() : result;
}
private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
List<T> tasks = new ArrayList<>(holders.size());
for (TaskHolder<ID, T> holder : holders) {
tasks.add(holder.getTask());
}
return tasks;
}
}
3.6 PeerEurekaNodes
PeerEurekaNodes就是集群服务器列表,负责管理所有的节点PeerEurekaNode。
3.6.1 PeerEurekaNodes初始化流程
1)创建一个SingleThreadScheduledExecutor;
2)获取peerUrls,排除自己,初始化PeerEurekaNode集合(由获取到的peerUrls决定);
3)创建一个线程,负责创建和刷新PeerEurekaNode实例,由SingleThreadScheduledExecutor管理,默认间隔10分钟刷新一次;
注意:在spring环境下,配置实例在容器初始化时已经装载好了,所以在spring场景下配置实例不变,刷新逻辑不会执行;
刷新逻辑代码如下:
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
// 没有变化则不操作
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 首先执行要关闭的PeerEurekaNode的shutDown
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 实例化新的PeerEurekaNode
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
3.6.2 服务列表查询
在PeerEurekaNodes中刷新PeerEurekaNode时,每次都会获取最新的服务列表地址,获取服务列表的流程如下:
3.7 TaskProcessor
任务处理接口TaskProcessor<T>提供单任务处理和批任务处理两种处理方法, 接口默认实现是ReplicationTaskProcessor,即将任务ReplicationTask发送给对等的EurekaPeerNode,保证数据最终一致性
package com.netflix.eureka.util.batcher;
import java.util.List;
/**
* An interface to be implemented by clients for task execution.
*
* @author Tomasz Bak
*/
public interface TaskProcessor<T> {
/**
* A processed task/task list ends up in one of the following states:
* <ul>
* <li>{@code Success} processing finished successfully</li>
* <li>{@code TransientError} processing failed, but shall be retried later</li>
* <li>{@code PermanentError} processing failed, and is non recoverable</li>
* </ul>
*/
enum ProcessingResult {
Success, Congestion, TransientError, PermanentError
}
/**
* In non-batched mode a single task is processed at a time.
*/
ProcessingResult process(T task);
/**
* For batched mode a collection of tasks is run at a time. The result is provided for the aggregated result,
* and all tasks are handled in the same way according to what is returned (for example are rescheduled, if the
* error is transient).
*/
ProcessingResult process(List<T> tasks);
}
4 附录
4.1 实例状态
参考:InstanceStatus
UP | 示例已注册成功 |
DOWN | 示例已下线 |
STARTING | 示例开始注册(中间状态) |
OUT_OF_SERVICE | 不提供服务 |
UNKNOWN | 未知状态 |
4.2 实例操作类型
参考:PeerAwareInstanceRegistryImp.Action
Heartbeat | 心跳检查 |
Register | 注册 |
Cancel | 取消 |
StatusUpdate | 状态变更 |
DeleteStatusOverride | 删除 |
4.3 任务处理结果
参考:TaskProcessor.ProcessingResult
Success | 成功(服务端返回200~300,不包含300) |
Congestion | 拥堵(比如服务繁忙503、超时异常) |
TransientError | 处理异常,可重试。主要通过判断是否IOException |
PermanentError | 处理异常 |
Eureka数据同步原理
https://mp.weixin.qq.com/s?__biz=Mzg3NjM1ODM5Ng==&mid=2247483694&idx=1&sn=34072f722204daeda8665afaa91992d8&chksm=cf323ebdf845b7ab76d2f2f161475442f60419e0b49714c74116f86a81daf04266fbe698c212&token=1280653793&lang=zh_CN#rd
更多推荐
所有评论(0)