Eureka服务端集群数据同步原理

    Eureka作为服务注册中心,在集群部署下,集群内节点之间数据一致性是通过节点之间数据同步来实现的,数据同步采用的是Acceptor - Worker 模式的消息广播机制来完成的,整个过程大致就是:

1)某个节点收到客户端的消息(注册、心跳、下线、状态变更等)后,刷新本地注册信息;

2)遍历所有的节点(会排除自己),将消息转发到其他节点;

    为了实现数据同步(Eureka保证的AP特性),每个几点需要维护一个节点列表,这个节点列表就是PeerEurekaNodes,她负责管理所有的PeerEurekaNode。

1 节点上数据处理流程(消息广播处理流程)

 

2 相关类图

3 详细分析

3.1 PeerEurekaNode

服务器节点对象,负责节点上任务相关线程的初始化和任务的处理;代表的就是一个Eureka Server Node(集群中一个对等节点),向集群中其他节点发送复制事件(Register、Renew、Cancel、Status Change),先来看看该类的定义。

1)构造函数负责初始化TaskDispatcher,包括批量任务处理和非批量任务处理两种;

2)register,cancel,heartbeat,updateStatus等方法将当前节点注册等信息复制到对等服务器节点;

3)PeerEurekaNode对象在Eureka Server服务初始化时根据serverUrl配置或者DNS地址获取所有Zone(如果没有配置Zone,默认是default)下的serviceUrl,每个serviceUrl代表一个节点;

4)PeerEurekaNodes类中在服务初始化时开启了一个定时线程,默认10分钟刷新PeerEurekaNode节点;(如果是基于配置的方式,需要支持配置刷新动态生效,如果是DNS模式,则定时线程会动态解析DNS以达到动态刷新效果);

5)部分参数:

名称类型说明
idString执行器标识,默认“target_${host}"
maxBufferSizeint复制任务最大任务数,默认10000
workloadSizeint每次发送批量任务最大数,默认250
workerCountint工作线程数,默认20
maxBatchDelaylong默认500
congestionRetryDelayMslong默认1000
networkFailureRetryMslong默认100
taskProcessorTaskProcessor<T>任务处理器,向其他节点发送任务请求
acceptorExecutorAcceptorExecutor<ID, T>任务接收器,接收从客户端发来的任务
taskExecutorTaskExecutors<ID, T>调用任务处理器的任务执行器

6)部分源码

public class PeerEurekaNode {
    // 其他代码...
	
    PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                     HttpReplicationClient replicationClient, EurekaServerConfig config,
                                     int batchSize, long maxBatchingDelayMs,
                                     long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
        this.registry = registry;
        this.targetHost = targetHost;
        this.replicationClient = replicationClient;
        this.serviceUrl = serviceUrl;
        this.config = config;
        // 在事件删除之前能被复制的最大保留时间
        this.maxProcessingDelayMs = config.getMaxTimeForReplication();
        String batcherName = getBatcherName();
        // 创建复制任务
        ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
        // 初始化批量任务调度
        this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
                batcherName,
                config.getMaxElementsInPeerReplicationPool(),
                batchSize,
                config.getMaxThreadsForPeerReplication(),
                maxBatchingDelayMs,
                serverUnavailableSleepTimeMs,
                retrySleepTimeMs,
                taskProcessor
        );
        // 初始化非批量任务调度
        this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
                targetHost,
                config.getMaxElementsInStatusReplicationPool(),
                config.getMaxThreadsForStatusReplication(),
                maxBatchingDelayMs,
                serverUnavailableSleepTimeMs,
                retrySleepTimeMs,
                taskProcessor
        );
    }
	
    // 实例注册任务
    public void register(final InstanceInfo info) throws Exception {
        // 任务过期时间,默认90s,获取取续约时间(默认30s)
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }
	
    // 实例下线任务
    public void cancel(final String appName, final String id) throws Exception {
        // 任务过期时间,默认30s
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
        batchingDispatcher.process(
                taskId("cancel", appName, id),
                new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                    @Override
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.cancel(appName, id);
                    }
                    @Override
                    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                        super.handleFailure(statusCode, responseEntity);
                        if (statusCode == 404) {
                            logger.warn("{}: missing entry.", getTaskName());
                        }
                    }
                },
                expiryTime
        );
    }
	
    // 实例续约任务
    public void heartbeat(final String appName, final String id,
                          final InstanceInfo info, final InstanceStatus overriddenStatus,
                          boolean primeConnection) throws Throwable {
        if (primeConnection) {
            // We do not care about the result for priming request.
            replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            return;
        }
        ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
                return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            }
            @Override
            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                super.handleFailure(statusCode, responseEntity);
                if (statusCode == 404) {
                    logger.warn("{}: missing entry.", getTaskName());
                    if (info != null) {
                        logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                                getTaskName(), info.getId(), info.getStatus());
                        register(info);
                    }
                } else if (config.shouldSyncWhenTimestampDiffers()) {
                    InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                    if (peerInstanceInfo != null) {
                        syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                    }
                }
            }
        };
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
    }
	
    public void statusUpdate(final String asgName, final ASGStatus newStatus) {
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
        nonBatchingDispatcher.process(
                asgName,
                new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) {
                    public EurekaHttpResponse<?> execute() {
                        return replicationClient.statusUpdate(asgName, newStatus);
                    }
                },
                expiryTime
        );
    }
	
    public void statusUpdate(final String appName, final String id,
                             final InstanceStatus newStatus, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
        batchingDispatcher.process(
                taskId("statusUpdate", appName, id),
                new InstanceReplicationTask(targetHost, Action.StatusUpdate, info, null, false) {
                    @Override
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.statusUpdate(appName, id, newStatus, info);
                    }
                },
                expiryTime
        );
    }
	
    public void deleteStatusOverride(final String appName, final String id, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
        batchingDispatcher.process(
                taskId("deleteStatusOverride", appName, id),
                new InstanceReplicationTask(targetHost, Action.DeleteStatusOverride, info, null, false) {
                    @Override
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.deleteStatusOverride(appName, id, info);
                    }
                },
                expiryTime);
    }
    
	// 其他代码...
}

3.2 TaskDispatchers

    消息接收任务调度初始化工具类:创建消息接收执行器,由执行器负责调度处理和关闭;

TaskDispatchers主要创建两种TaskDispatcher(批量处理的TaskDispatcher和非批量处理的TaskDispatcher):

public class TaskDispatchers {

    public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
                                                                                int maxBufferSize,
                                                                                int workerCount,
                                                                                long maxBatchingDelay,
                                                                                long congestionRetryDelayMs,
                                                                                long networkFailureRetryMs,
                                                                                TaskProcessor<T> taskProcessor) {
        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
        );
        // workerCount默认1
        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
        return new TaskDispatcher<ID, T>() {
            @Override
            public void process(ID id, T task, long expiryTime) {
                acceptorExecutor.process(id, task, expiryTime);
            }
            @Override
            public void shutdown() {
                acceptorExecutor.shutdown();
                taskExecutor.shutdown();
            }
        };
    }
	
    public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                             int maxBufferSize,
                                                                             int workloadSize,
                                                                             int workerCount,
                                                                             long maxBatchingDelay,
                                                                             long congestionRetryDelayMs,
                                                                             long networkFailureRetryMs,
                                                                             TaskProcessor<T> taskProcessor) {
        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
        );
        // workerCount默认20
        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
        return new TaskDispatcher<ID, T>() {
            @Override
            public void process(ID id, T task, long expiryTime) {
                acceptorExecutor.process(id, task, expiryTime);
            }
			
            @Override
            public void shutdown() {
                acceptorExecutor.shutdown();
                taskExecutor.shutdown();
            }
        };
    }
}

3.3 AcceptorExecutor

消息接收执行器

  • 1)接收从PeerEurekaNode发送过来的任务加入任务管理队列;

  • 2)由线程AcceptorRunner负责将任务在不同队列之间流转,流转过程;

  • 3)AcceptorRunner线程在轮询TaskQueue队列,如果在单次轮询中,没有将新的任务加入WorkQueue,则会睡眠10ms;

  • 4)在将任务加入WorkQueue时存在需要延时情况,会延迟存放,避免瞬时发生高频异常现象;

  • 5)如果TaskPending队列满,会清除reprocessQueue任务,acceptorQueue也会丢弃最老的任务;

  • 6)批量任务指派到WorkQueue时,每次最多加入默认250个任务,TaskPending默认最大10000个任务,批次任务指派时默认加入延时500ms的任务;

class AcceptorExecutor<ID, T> {
    // 任务ID
    private final String id;
    // 即将执行的任务池(pendingTasks)中任务最大任务数,默认值10000
    private final int maxBufferSize;
    // 批量发送时最大批量大小,默认250
    private final int maxBatchingSize;
    // 批量发送最大延时,默认500ms
    private final long maxBatchingDelay;
    // 控制acceptorThread是否中断
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    // acceptorQueue和reprocessQueue负责存放接收消息广播任务
    // 任务接收队列
    private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
    // 需要重新处理的任务队列
    private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
    private final Thread acceptorThread;
    // 即将处理的任务
    private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
    private final Deque<ID> processingOrder = new LinkedList<>();
    private final Semaphore singleItemWorkRequests = new Semaphore(0);
    // 单Worker模式下的任务项队列(即将要交给Workder处理的任务)
    private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
    private final Semaphore batchWorkRequests = new Semaphore(0);
    // 批量Worker模式下的任务项队列(即将要交给Workder处理的任务)
    private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
    // 重试任务整形
    private final TrafficShaper trafficShaper;
	
    AcceptorExecutor(String id,
                     int maxBufferSize,
                     int maxBatchingSize,
                     long maxBatchingDelay,
                     long congestionRetryDelayMs,
                     long networkFailureRetryMs) {
        this.id = id;
        this.maxBufferSize = maxBufferSize;
        this.maxBatchingSize = maxBatchingSize;
        this.maxBatchingDelay = maxBatchingDelay;
        this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
        ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
        // 创建并开启消息接收执行线程
        this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
        this.acceptorThread.setDaemon(true);
        this.acceptorThread.start();
        ... // 监控初始化(监控相关在另外一篇介绍)
    }
	
    // 普通任务入队
    void process(ID id, T task, long expiryTime) {
        acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
        acceptedTasks++;
    }
	
    // 多个重试任务入队
    void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
        reprocessQueue.addAll(holders);
        replayedTasks += holders.size(); // 监控指标参数
        trafficShaper.registerFailure(processingResult);
    }
	
    // 单个重试任务入队
    void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) {
        reprocessQueue.add(taskHolder);
        replayedTasks++; // 监控指标参数
        trafficShaper.registerFailure(processingResult);
    }
	
    // 获取单任务Worker项队列
    BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
        singleItemWorkRequests.release();
        return singleItemWorkQueue;
    }
	
    // 获取批量任务Worker项队列
    BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
        batchWorkRequests.release();
        return batchWorkQueue;
    }
	
    // 停止Worker线程
    void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            Monitors.unregisterObject(id, this);
            acceptorThread.interrupt();
        }
    }
	
    class AcceptorRunner implements Runnable {
	
        @Override
        public void run() {
            long scheduleTime = 0;
            while (!isShutdown.get()) {
                try {
                    drainInputQueues();
                    int totalItems = processingOrder.size();
                    long now = System.currentTimeMillis();
                    if (scheduleTime < now) {
                        // 如果发生了网络故障事件(Congestion或者TransientError),scheduleTime时间需要延迟
                        scheduleTime = now + trafficShaper.transmissionDelay();
                    }
                    if (scheduleTime <= now) {
                        assignBatchWork();
                        assignSingleItemWork();
                    }
                    // If no worker is requesting data or there is a delay injected by the traffic shaper,
                    // sleep for some time to avoid tight loop.
                    if (totalItems == processingOrder.size()) {
                        Thread.sleep(10);
                    }
                } catch (InterruptedException ex) {
                    // Ignore
                } catch (Throwable e) {
                    // Safe-guard, so we never exit this loop in an uncontrolled way.
                    logger.warn("Discovery AcceptorThread error", e);
                }
            }
        }
		
        private boolean isFull() {
            return pendingTasks.size() >= maxBufferSize;
        }
		
        private void drainInputQueues() throws InterruptedException {
            do {
                // 将任务加入pendingTasks集合
                drainReprocessQueue();
                drainAcceptorQueue();
                if (!isShutdown.get()) {
                    // 如果任务队列空了,则在acceptorQueue上阻塞10ms
                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                        TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                        if (taskHolder != null) {
                            appendTaskHolder(taskHolder);
                        }
                    }
                }
            } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
        }
		
        /**
         * acceptor队列“排水”,其实就是将acceptorQueue中任务加入处理队列processingOrder和pendingTasks集合
         */
        private void drainAcceptorQueue() {
            while (!acceptorQueue.isEmpty()) {
                appendTaskHolder(acceptorQueue.poll());
            }
        }
		
        /**
         * reprocess队列“排水”,其实就是将reprocessQueue中任务加入处理队列processingOrder和pendingTasks集合
         */
        private void drainReprocessQueue() {
            long now = System.currentTimeMillis();
            // 只要reprocessQueue队列中有任务且pendingTasks队列未满,则取出reprocessQueue队列中的任务(未过期任务)加入pendingTasks队列
            while (!reprocessQueue.isEmpty() && !isFull()) {
                // 取出reprocessQueue中队尾任务(这里可能是考虑到重试任务队列优化处理新进来的任务)
                TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
                ID id = taskHolder.getId();
                if (taskHolder.getExpiryTime() <= now) {
                    expiredTasks++;
                } else if (pendingTasks.containsKey(id)) {
                    overriddenTasks++;
                } else {
                    pendingTasks.put(id, taskHolder);
                    // 加入队首位置
                    processingOrder.addFirst(id);
                }
            }
            if (isFull()) {
                queueOverflows += reprocessQueue.size();
                // 如果isFull,就会清除掉重试任务队列
                reprocessQueue.clear();
            }
        }
		
        /**
         * 加入任务处理队列,如果任务存在,则记录任务覆盖次数;如果pendingTasks队列满,则移除processingOrder队首任务;
         */
        private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
            if (isFull()) {
                // 如果满了,就从pendingTasks中删除processingOrder最早进入的任务(这样做的目的是丢弃processingOrder中最老的任务
                pendingTasks.remove(processingOrder.poll());
                queueOverflows++;
            }
            TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
            if (previousTask == null) {
                processingOrder.add(taskHolder.getId());
            } else {
                overriddenTasks++;
            }
        }
		
        /**
         * 单任务指派:将任务存放到即将交给worker的队列singleItemWorkQueue
         */
        void assignSingleItemWork() {
            if (!processingOrder.isEmpty()) {
                if (singleItemWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    while (!processingOrder.isEmpty()) {
                        ID id = processingOrder.poll();
                        TaskHolder<ID, T> holder = pendingTasks.remove(id);
                        if (holder.getExpiryTime() > now) {
                            singleItemWorkQueue.add(holder);
                            return;
                        }
                        expiredTasks++;
                    }
                    singleItemWorkRequests.release();
                }
            }
        }
		
        /**
         * 批量任务指派:将任务存放到即将交给worker的队列batchWorkQueue
         */
        void assignBatchWork() {
            if (hasEnoughTasksForNextBatch()) {
                // 获得许可信号量
                if (batchWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    int len = Math.min(maxBatchingSize, processingOrder.size());
                    List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                    // 从pendingTask集合中取出len个任务,放入batchWorkQueue
                    while (holders.size() < len && !processingOrder.isEmpty()) {
                        ID id = processingOrder.poll();
                        TaskHolder<ID, T> holder = pendingTasks.remove(id);
                        if (holder.getExpiryTime() > now) {
                            // 非过期任务
                            holders.add(holder);
                        } else {
                            expiredTasks++;
                        }
                    }
                    if (holders.isEmpty()) {
                        // pendingTasks任务取完则释放许可信号量
                        batchWorkRequests.release();
                    } else {
                        batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                        batchWorkQueue.add(holders);
                    }
                }
            }
        }
		
        /**
         * 判断任务是否满足批量发送
         */
        private boolean hasEnoughTasksForNextBatch() {
            if (processingOrder.isEmpty()) {
                return false;
            }
            if (pendingTasks.size() >= maxBufferSize) {
                return true;
            }
            // 队首任务提交时间是否过了任务延迟时间
            TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
            long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
            return delay >= maxBatchingDelay;
        }
    }
}

3.4 任务调度整形TrafficShaper

在将任务分派给Worker线程之前,提供任务准入控制策略,主要用于在发生网络拥堵(Congestion)、瞬时故障(TransientError)时,进行延迟处理。

  • 1)网络拥堵(Congestion)默认延时1s,最大不超过30s;

  • 2)网络瞬时故障(TransientError)是,最大延时100ms,默认也不超过30s;

  • 3)transmissionDelay方法返回的就是(上次异常发生时间与当前时间之差)距离延迟时间还差多少时间的意思;

class TrafficShaper {
    // 最大延迟时间,30s
    private static final long MAX_DELAY = 30 * 1000;
    // 网络拥堵延时
    private final long congestionRetryDelayMs;
    // 网络故障延时
    private final long networkFailureRetryMs;
    // 记录最近的网络拥堵和故障时间
    private volatile long lastCongestionError;
    private volatile long lastNetworkFailure;
	
    // congestionRetryDelayMs从PeerEurekaNode来,默认1000ms, networkFailureRetryMs默认100ms
    TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) {
        this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs);
        this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs);
    }
	
    void registerFailure(ProcessingResult processingResult) {
        if (processingResult == ProcessingResult.Congestion) {
            lastCongestionError = System.currentTimeMillis();
        } else if (processingResult == ProcessingResult.TransientError) {
            lastNetworkFailure = System.currentTimeMillis();
        }
    }
	
    long transmissionDelay() {
        if (lastCongestionError == -1 && lastNetworkFailure == -1) {
            return 0;
        }
        long now = System.currentTimeMillis();
        if (lastCongestionError != -1) {
            long congestionDelay = now - lastCongestionError;
            if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
                return congestionRetryDelayMs - congestionDelay;
            }
            lastCongestionError = -1;
        }
        if (lastNetworkFailure != -1) {
            long failureDelay = now - lastNetworkFailure;
            if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
                return networkFailureRetryMs - failureDelay;
            }
            lastNetworkFailure = -1;
        }
        return 0;
    }
}

3.5 TaskExecutors

负责管理一些worker线程,worker线程负责从AcceptorExecutor获取workQueue任务,交给TaskProcessor执行。分为两种线程:

第一种:SingleTaskWorkerRunnable:单个任务Worker线程,默认1个;

// 单个任务Worker线程
static class SingleTaskWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {

	SingleTaskWorkerRunnable(String workerName,
							 AtomicBoolean isShutdown,
							 TaskExecutorMetrics metrics,
							 TaskProcessor<T> processor,
							 AcceptorExecutor<ID, T> acceptorExecutor) {
		super(workerName, isShutdown, metrics, processor, acceptorExecutor);
	}
	
	@Override
	public void run() {
		try {
			while (!isShutdown.get()) {
				// 从AcceptorExecutor单个任务队列中获取任务
				BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem();
				TaskHolder<ID, T> taskHolder;
				// 没有任务会阻塞1s
				while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
					if (isShutdown.get()) {
						return;
					}
				}
				metrics.registerExpiryTime(taskHolder);
				if (taskHolder != null) {
					// 向其他服务节点发送任务(实例变更、实例事件)
					ProcessingResult result = processor.process(taskHolder.getTask());
					switch (result) {
						case Success:
							break;
						case Congestion:
						case TransientError: // 加入重试任务队列
							taskDispatcher.reprocess(taskHolder, result);
							break;
						case PermanentError:
							logger.warn("Discarding a task of {} due to permanent error", workerName);
					}
					metrics.registerTaskResult(result, 1);
				}
			}
		} catch (InterruptedException e) {
			// Ignore
		} catch (Throwable e) {
			// Safe-guard, so we never exit this loop in an uncontrolled way.
			logger.warn("Discovery WorkerThread error", e);
		}
	}
}

第二种:BatchWorkerRunnable:批量任务Worker线程,默认20个;

// 批量任务Worker线程
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {

	BatchWorkerRunnable(String workerName,
						AtomicBoolean isShutdown,
						TaskExecutorMetrics metrics,
						TaskProcessor<T> processor,
						AcceptorExecutor<ID, T> acceptorExecutor) {
		super(workerName, isShutdown, metrics, processor, acceptorExecutor);
	}
	
	@Override
	public void run() {
		try {
			while (!isShutdown.get()) {
				// 获取任务
				List<TaskHolder<ID, T>> holders = getWork();
				metrics.registerExpiryTimes(holders);
				List<T> tasks = getTasksOf(holders);
				// 向其他服务节点发送任务(实例变更、实例事件)
				ProcessingResult result = processor.process(tasks);
				switch (result) {
					case Success:
						break;
					case Congestion:
					case TransientError:
						// 加入重试任务队列
						taskDispatcher.reprocess(holders, result);
						break;
					case PermanentError:
						logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
				}
				metrics.registerTaskResult(result, tasks.size());
			}
		} catch (InterruptedException e) {
			// Ignore
		} catch (Throwable e) {
			// Safe-guard, so we never exit this loop in an uncontrolled way.
			logger.warn("Discovery WorkerThread error", e);
		}
	}
	
	private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
		// 从AcceptorExecutor批量任务队列中获取任务
		BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
		List<TaskHolder<ID, T>> result;
		do {
			// 没有任务会阻塞1s
			result = workQueue.poll(1, TimeUnit.SECONDS);
		} while (!isShutdown.get() && result == null);
		return (result == null) ? new ArrayList<>() : result;
	}
	
	private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
		List<T> tasks = new ArrayList<>(holders.size());
		for (TaskHolder<ID, T> holder : holders) {
			tasks.add(holder.getTask());
		}
		return tasks;
	}
}

3.6 PeerEurekaNodes

PeerEurekaNodes就是集群服务器列表,负责管理所有的节点PeerEurekaNode。

3.6.1 PeerEurekaNodes初始化流程

1)创建一个SingleThreadScheduledExecutor;

2)获取peerUrls,排除自己,初始化PeerEurekaNode集合(由获取到的peerUrls决定);

3)创建一个线程,负责创建和刷新PeerEurekaNode实例,由SingleThreadScheduledExecutor管理,默认间隔10分钟刷新一次;

注意:在spring环境下,配置实例在容器初始化时已经装载好了,所以在spring场景下配置实例不变,刷新逻辑不会执行;

刷新逻辑代码如下:

 protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);
        // 没有变化则不操作
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }
        // Remove peers no long available
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
        // 首先执行要关闭的PeerEurekaNode的shutDown
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }
        // 实例化新的PeerEurekaNode
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }  

3.6.2 服务列表查询

在PeerEurekaNodes中刷新PeerEurekaNode时,每次都会获取最新的服务列表地址,获取服务列表的流程如下:

图片

3.7 TaskProcessor

任务处理接口TaskProcessor<T>提供单任务处理和批任务处理两种处理方法, 接口默认实现是ReplicationTaskProcessor,即将任务ReplicationTask发送给对等的EurekaPeerNode,保证数据最终一致性

package com.netflix.eureka.util.batcher;

import java.util.List;

/**
 * An interface to be implemented by clients for task execution.
 *
 * @author Tomasz Bak
 */
public interface TaskProcessor<T> {

    /**
     * A processed task/task list ends up in one of the following states:
     * <ul>
     *     <li>{@code Success} processing finished successfully</li>
     *     <li>{@code TransientError} processing failed, but shall be retried later</li>
     *     <li>{@code PermanentError} processing failed, and is non recoverable</li>
     * </ul>
     */
    enum ProcessingResult {
        Success, Congestion, TransientError, PermanentError
    }

    /**
     * In non-batched mode a single task is processed at a time.
     */
    ProcessingResult process(T task);

    /**
     * For batched mode a collection of tasks is run at a time. The result is provided for the aggregated result,
     * and all tasks are handled in the same way according to what is returned (for example are rescheduled, if the
     * error is transient).
     */
    ProcessingResult process(List<T> tasks);
}

4 附录

4.1 实例状态

参考:InstanceStatus

UP

示例已注册成功

DOWN

示例已下线

STARTING

示例开始注册(中间状态)

OUT_OF_SERVICE

不提供服务

UNKNOWN

未知状态

4.2 实例操作类型

参考:PeerAwareInstanceRegistryImp.Action

Heartbeat

心跳检查

Register

注册

Cancel

取消

StatusUpdate

状态变更

DeleteStatusOverride

删除

4.3 任务处理结果

参考:TaskProcessor.ProcessingResult

Success

成功(服务端返回200~300,不包含300)

Congestion

拥堵(比如服务繁忙503、超时异常)

TransientError

处理异常,可重试。主要通过判断是否IOException

PermanentError

处理异常

Eureka数据同步原理
https://mp.weixin.qq.com/s?__biz=Mzg3NjM1ODM5Ng==&mid=2247483694&idx=1&sn=34072f722204daeda8665afaa91992d8&chksm=cf323ebdf845b7ab76d2f2f161475442f60419e0b49714c74116f86a81daf04266fbe698c212&token=1280653793&lang=zh_CN#rd

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐