SpringCloud之Eureka的定时任务详解(Server)
EureakServer的定时任务详解,了解Eureka服务发现组件内部的运行机制
1.EurekaServer内定时更新集群内其他Server节点
public class PeerEurekaNodes {
/**
* Eureka-Server 集群节点数组
*/
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
/**
* Eureka-Server 服务地址数组
*/
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
/**
* 启动 Eureka-Server 集群节点集合(复制)
*/
public void start() {
......
// 初始化 集群节点信息
updatePeerEurekaNodes(resolvePeerUrls());
// 初始化 初始化固定周期更新集群节点信息的任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 每隔10分钟更新集群节点
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
......
}
/**
* Resolve peer URLs. 获取Server集群的所有serviceUrl,不包括自身
*
* @return peer URLs with node's own URL filtered out
*/
protected List<String> resolvePeerUrls() {
// 获得 Eureka-Server 集群服务地址数组
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
// 获取相同Region下的所有serviceUrl
List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
// 移除自己(避免向自己同步)
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
}
EurekaServer在初始化时会根据配置的Server集群url地址,来实例化集群内其他Server节点的交互实例,用于集群间的数据同步。
EurekaServer在获取Server URL时用的还是EurekaClient,也就是环境里eureka.client开头的配置,客户端配置一模一样。
其实EurekaServer集成了EurekaClient,Client的配置都可以用到Server上,只不过很多可以略去,比如是否需要向Server发送心跳registerWithEureka,可以设置为false等。
Client里的serviceUrl的含义是可以向哪些Server节点注册和拉取服务信息;而Server里的serviceUrl的含义是集群里有哪些Server节点,当自身节点有服务操作是需要向哪些节点同步。
Client正常情况下只合Server集群中的一个交互,而Server在有服务操作时会同步至所有其他的节点。
应用的配置信息是可能发生变化的,所以Client和Server才需要定时的刷新集群节点信息,关闭那些不再连接的Server节点,初始化新增的节点。
2.每隔一分钟统计最近一分钟内所有Client的续约次数,也就是接收到的心跳次数,以此来作为是否触发服务信息回收的依据之一
public class MeasuredRate {
/**
* 间隔, 默认60S
*/
private final long sampleInterval;
public synchronized void start() {
if (!isActive) {
// 每隔一分钟执行一次定时任务, 更新最新的总的续约次数, 这样就能计算一分钟内续约的次数, 以此来判断续约次数是否低于阈值
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket. 将 currentBucket 赋值给lastBucket, 然后重置 currentBucket
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
}
EurekaServer每隔1分钟执行一次服务信息回收,回收那些超过90S没有发送心跳,也就是续约的服务信息,当然前提是EurekaServer开启租约过期功能,且未触发自我保护临界值。
所谓自我保护,就是指最近一分钟内的续约总数 > 预估的续约总数 * 0.85。 近似的来讲,也就是一分钟内有超过85%的应用信息发送了心跳。如果这个条件未满足,那么不会执行服务回收操作。
比如当Server节点的网络不稳定,丢失了部分心跳信息,如果超过了15,那么就不会触发自我保护,停止服务信息的回收,而这也是我们希望服务发现组件应该具备的功能,强调可用性。
从这点上看其和Zookeeper的服务发现机制有很大不同。
3.EurekaServer每隔一分钟执行一次服务信息的回收
/**
* 租约过期任务
*/
/* visible for testing */
class EvictionTask extends TimerTask {
/**
* 上一次执行清理任务的时间
*/
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);
@Override
public void run() {
try {
// 获取 补偿时间毫秒数, 计算这次执行距离上次执行的时间差,与60S的距离
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
// 清理过期租约逻辑
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0L) {
return 0L;
}
// 此次执行与上次执行的时间差
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
// 查看时间间隔是否比60S大
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
// 如果未超过60S, 返回; 否则返回超过的时间差
return compensationTime <= 0L ? 0L : compensationTime;
}
long getCurrentTimeNano() { // for testing
return System.nanoTime();
}
}
EurekaServer每隔60S执行一次服务信息的回收任务,移除那些超过90S未更新租约信息的服务。
当然能够回收的前提是开启了租约回收功能,而且未触发自我保护。所谓的自我保护机制,就是最近一分钟内的实际续约次数比例超过期望总数的85%,如果未超过,那么认为是Server出现了问题,不进行服务回收。
4.定时更新续约次数的期望值和自我保护的临界值
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
/**
* Schedule the task that updates <em>renewal threshold</em> periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
*/
private void scheduleRenewalThresholdUpdateTask() {
// 15分钟后更新续约阈值,之后每隔15分分钟执行一次
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
/**
* 更新续约阈值,也就是每分钟期望续约的次数,以及触发自我保护的最低续约次数
* Updates the <em>renewal threshold</em> based on the current number of
* renewals. The threshold is a percentage as specified in
* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
* received per minute {@link #getNumOfRenewsInLastMin()}.
*/
private void updateRenewalThreshold() {
try {
// 计算 应用实例数
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
// 计算 expectedNumberOfRenewsPerMin 、 numberOfRenewsPerMinThreshold 参数
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
// 不会一次性的把续约次数将至85%以下,也就是只有在存活应用信息数量超过总数的85%时才能更新,这样就不会修改续约的自我保护的临界值
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int)((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
}
服务自身取消,会相应的降低续约期望总数和自我保护临界值,每取消一个,数值均减2。但因为续约时间超时而被动移除的服务信息,不会相应的减少期望总值和临界值。
如果不定时的更新期望总值和临界值,那么当服务逐渐的因心跳超时而被移除时,很容易就触发保护临界值,之后就不能再移除那些心跳超时的服务信息。
但是在更新总值和临界值时,如果当前Server处于自我保护状态,那么也不能强制的改变临界值,这会强制的退出自我保护状态。所以更新总值和临界值的前提是当前Server不处于自我保护状态,也就是上一分钟的续约总数的比例超过85%。
5.服务信息增量缓存更新任务
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
/**
* 最近租约变更记录队列
*/
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
......
// 30S后每隔30S执行一次, 移除3分钟前发生的续约记录
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(), // 30S
serverConfig.getDeltaRetentionTimerIntervalInMs()); // 30S
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
RecentlyChangedItem item = it.next();
// 如果某个续约任务是3分钟前发生的,那么移除它
if (item.getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
}
EurekaClient在初始化时进行一次全量拉取,之后每隔30S执行一次增量拉取,也就是会返回recentlyChangedQueue里的记录,EurekaClient根据记录的操作类型和服务信息,相应的更新自身持有的可用服务信息。
recentlyChangedQueue 是一个有序队列,当Client向Server执行操作时,比如注册,状态变更,取消等(续约不会记录),那么会记录操作的时间,类型和相应的服务信息。
通过增量信息来保持同步,能够极大的减少Server和Client之间的数据的传输,降低IO消耗。
6.每隔30S执行一次,更新只读响应缓存
public class ResponseCacheImpl implements ResponseCache {
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
......
// 仅使用只读缓存, 因此每隔30S执行更新缓存任务
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
}
/**
* 缓存更新任务, 每隔30S执行一次
*
* @return
*/
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键
if (logger.isDebugEnabled()) {
Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) { // 不一致时,进行替换
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
}
EurekaServer会缓存数据信息,根据Key的不同值缓存相应的结果,当Client获取信息时,优先用只读缓存的数据返回,如果只读缓存不存在,那么从读写缓存处获取,然后存入只读缓存,最后返回结果。
读写缓存借助guava的CacheBuilder来实现缓存淘汰,在写入180S后失效,这样当只读缓存定期更新时,如果发现读写缓存的值和只读缓存的不一致时,进行替换。
当Client进行相应操作,比如注册,状态变更,取消等操作时,会时对应的缓存立即失效,保证Client获取到的是有效的服务信息。
更多推荐
所有评论(0)