Eureka 源码解析 —— 应用实例注册发现(六)之全量获取
摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢!本文主要基于 Eureka 1.8.X 版本1. 概述2. Eureka-Client 发起全量获取2.1 初始化全量获取2.2 定时获取2.3 刷新注册信息缓存2.4 发起获取注册信息3. Eureka-Serve
摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Eureka 1.8.X 版本
- 1. 概述
- 2. Eureka-Client 发起全量获取
- 3. Eureka-Server 接收全量获取
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程。
Eureka-Client 获取注册信息,分成全量获取和增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30 秒增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。
本文重点在于全量获取。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
2. Eureka-Client 发起全量获取
本小节调用关系如下:
2.1 初始化全量获取
Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:
// DiscoveryClient.java/*Applications 在本地的缓存*/private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {// ... 省略无关代码// 【3.2.5】初始化应用集合在本地的缓存localRegionApps.set( new Applications());// ... 省略无关代码// 【3.2.12】从 Eureka-Server 拉取注册信息if (clientConfig.shouldFetchRegistry() && !fetchRegistry( false)) {fetchRegistryFromBackup();}// ... 省略无关代码}com.netflix.discovery.shared.Applications
,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:配置
eureka.shouldFetchRegistry = true
,开启从 Eureka-Server 获取注册信息。默认值:true
。- 调用
#fetchRegistry(false)
方法,从 Eureka-Server 全量获取注册信息,在 「2.4 发起获取注册信息」 详细解析。
2.2 定时获取
Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:
// DiscoveryClient.javaDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {// ... 省略无关代码// 【3.2.9】初始化线程池// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool( 2,new ThreadFactoryBuilder().setNameFormat( "DiscoveryClient-%d").setDaemon( true).build());cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat( "DiscoveryClient-CacheRefreshExecutor-%d").setDaemon( true).build()); // use direct handoff// ... 省略无关代码// 【3.2.14】初始化定时任务initScheduledTasks();// ... 省略无关代码}private void initScheduledTasks() {// 向 Eureka-Server 心跳(续租)执行器if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// ... 省略无关代码}- 初始化定时任务代码,和续租的定时任务代码类似,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租
》 有详细解析,这里不重复分享。 com.netflix.discovery.DiscoveryClient.CacheRefreshThread
,注册信息缓存刷新任务,实现代码如下:class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();}}- 调用
#refreshRegistry(false)
方法,刷新注册信息缓存,在 「2.3 刷新注册信息缓存」 详细解析。
- 调用
2.3 刷新注册信息缓存
调用
#refreshRegistry(false)
方法,刷新注册信息缓存,实现代码如下:
// DiscoveryClient.java1: void refreshRegistry() {2: try {3: // TODO 芋艿:TODO[0009]:RemoteRegionRegistry4: boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();5:6: boolean remoteRegionsModified = false;7: // This makes sure that a dynamic change to remote regions to fetch is honored.8: String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();9: if ( null != latestRemoteRegions) {10: String currentRemoteRegions = remoteRegionsToFetch.get();11: if (!latestRemoteRegions.equals(currentRemoteRegions)) {12: // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync13: synchronized (instanceRegionChecker.getAzToRegionMapper()) {14: if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {15: String[] remoteRegions = latestRemoteRegions.split( ",");16: remoteRegionsRef.set(remoteRegions);17: instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);18: remoteRegionsModified = true;19: } else {20: logger.info( "Remote regions to fetch modified concurrently," +21: " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);22: }23: }24: } else {25: // Just refresh mapping to reflect any DNS/Property change26: instanceRegionChecker.getAzToRegionMapper().refreshMapping();27: }28: }29:30: boolean success = fetchRegistry(remoteRegionsModified);31: if (success) {32: // 设置 注册信息的应用实例数33: registrySize = localRegionApps.get().size();34: // 设置 最后获取注册信息时间35: lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();36: }37:38: // 打印日志39: if (logger.isDebugEnabled()) {40: StringBuilder allAppsHashCodes = new StringBuilder();41: allAppsHashCodes.append( "Local region apps hashcode: ");42: allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());43: allAppsHashCodes.append( ", is fetching remote regions? ");44: allAppsHashCodes.append(isFetchingRemoteRegionRegistries);45: for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {46: allAppsHashCodes.append( ", Remote region: ");47: allAppsHashCodes.append(entry.getKey());48: allAppsHashCodes.append( " , apps hashcode: ");49: allAppsHashCodes.append(entry.getValue().getAppsHashCode());50: }51: logger.debug( "Completed cache refresh task for discovery. All Apps hash code is {} ",52: allAppsHashCodes.toString());53: }54: } catch (Throwable e) {55: logger.error( "Cannot fetch registry from server", e);56: }57: }- 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
- 第 30 行 :调用
#fetchRegistry(false)
方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。 第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下:
/*** 注册信息的应用实例数*/private volatile int registrySize = 0;/*** 最后成功从 Eureka-Server 拉取注册信息时间戳*/private volatile long lastSuccessfulRegistryFetchTimestamp = - 1;第 38 至 53 行 :打印调试日志。
- 第 54 至 56 行 :打印异常日志。
2.4 发起获取注册信息
调用
#fetchRegistry(false)
方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:
1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {2: Stopwatch tracer = FETCH_REGISTRY_TIMER.start();3:4: try {5: // 获取 本地缓存的注册的应用实例集合6: // If the delta is disabled or if it is the first time, get all7: // applications8: Applications applications = getApplications();9:10: // 全量获取11: if (clientConfig.shouldDisableDelta() // 禁用增量获取12: || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))13: || forceFullRegistryFetch14: || (applications == null) // 空15: || (applications.getRegisteredApplications().size() == 0) // 空16: || (applications.getVersion() == - 1)) //Client application does not have latest library supporting delta17: {18: logger.info( "Disable delta property : {}", clientConfig.shouldDisableDelta());19: logger.info( "Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());20: logger.info( "Force full registry fetch : {}", forceFullRegistryFetch);21: logger.info( "Application is null : {}", (applications == null));22: logger.info( "Registered Applications size is zero : {}",23: (applications.getRegisteredApplications().size() == 0));24: logger.info( "Application version is -1: {}", (applications.getVersion() == - 1));25: // 执行 全量获取26: getAndStoreFullRegistry();27: } else {28: // 执行 增量获取29: getAndUpdateDelta(applications);30: }31: // 设置 应用集合 hashcode32: applications.setAppsHashCode(applications.getReconcileHashCode());33: // 打印 本地缓存的注册的应用实例数量34: logTotalInstances();35: } catch (Throwable e) {36: logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);37: return false;38: } finally {39: if (tracer != null) {40: tracer.stop();41: }42: }43:44: // Notify about cache refresh before updating the instance remote status45: onCacheRefreshed();46:47: // Update remote status based on refreshed data held in the cache48: updateInstanceRemoteStatus();49:50: // registry was fetched successfully, so return true51: return true;52: }第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下:
public Applications getApplications() {return localRegionApps.get();}第 10 至 26 行 :全量获取注册信息。
- 第 11 行 :配置
eureka.disableDelta = true
,禁用增量获取注册信息。默认值:false
。 - 第 12 行 :只获得一个
vipAddress
对应的应用实例们的注册信息。 - 第 13 行 :方法参数
forceFullRegistryFetch
强制全量获取注册信息。 - 第 14 至 15 行 :本地缓存为空。
- 第 25 至 26 行 :调用
#getAndStoreFullRegistry()
方法,全量获取注册信息,并设置到本地缓存。下文详细解析。
- 第 11 行 :配置
- 第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
- 第 31 至 32 行 :计算应用集合
hashcode
。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。 第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:
private void logTotalInstances() {if (logger.isDebugEnabled()) {int totInstances = 0;for (Application application : getApplications().getRegisteredApplications()) {totInstances += application.getInstancesAsIsFromEureka().size();}logger.debug( "The total number of all instances in the client now is {}", totInstances);}}
第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。
#onCacheRefreshed()
方法,实现代码如下:/*** Eureka 事件监听器*/private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();protected void onCacheRefreshed() {fireEvent( new CacheRefreshedEvent());}protected void fireEvent(final EurekaEvent event) {for (EurekaEventListener listener : eventListeners) {listener.onEvent(event);}}- x
笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:
// 【3.2.12】从 Eureka-Server 拉取注册信息if (clientConfig.shouldFetchRegistry() && !fetchRegistry( false)) {fetchRegistryFromBackup();}
第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。
1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;2:3: private synchronized void updateInstanceRemoteStatus() {4: // Determine this instance's status for this app and set to UNKNOWN if not found5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;6: if (instanceInfo.getAppName() != null) {7: Application app = getApplication(instanceInfo.getAppName());8: if (app != null) {9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());10: if (remoteInstanceInfo != null) {11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();12: }13: }14: }15: if (currentRemoteInstanceStatus == null) {16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;17: }18:19: // Notify if status changed20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);22: lastRemoteInstanceStatus = currentRemoteInstanceStatus;23: }24: }- 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。
第 19 至 23 行 :对比本地缓存和最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存( 注意,只更新该缓存变量,不更新本地当前应用实例的状态(
instanceInfo.status
) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。#onRemoteStatusChanged(...)
实现代码如下:protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {fireEvent( new StatusChangeEvent(oldStatus, newStatus));}- Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因,因为应用实例的覆盖状态,在 《Eureka 源码解析 —— 应用实例注册发现 (八)之覆盖状态》 有详细解析。
2.4.1 全量获取注册信息,并设置到本地缓存
调用
#getAndStoreFullRegistry()
方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:
1: private void getAndStoreFullRegistry() throws Throwable {2: long currentUpdateGeneration = fetchRegistryGeneration.get();3:4: logger.info( "Getting all instance registry info from the eureka server");5:6: // 全量获取注册信息7: Applications apps = null;8: EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null9: ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())10: : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());11: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {12: apps = httpResponse.getEntity();13: }14: logger.info( "The response status is {}", httpResponse.getStatusCode());15:16: // 设置到本地缓存17: if (apps == null) {18: logger.error( "The application is null for some reason. Not storing this information");19: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {20: localRegionApps.set( this.filterAndShuffle(apps));21: logger.debug( "Got full registry with apps hashcode {}", apps.getAppsHashCode());22: } else {23: logger.warn( "Not updating applications as another thread is updating it already");24: }25: }第 6 至 14 行 :全量获取注册信息,实现代码如下:
// AbstractJerseyEurekaHttpClient.javapublic EurekaHttpResponse<Applications> getApplications(String... regions) {return getApplicationsInternal( "apps/", regions);}private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {ClientResponse response = null;String regionsParamValue = null;try {WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);if (regions != null && regions.length > 0) {regionsParamValue = StringUtil.join(regions);webResource = webResource.queryParam( "regions", regionsParamValue);}Builder requestBuilder = webResource.getRequestBuilder();addExtraHeaders(requestBuilder);response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSONApplications applications = null;if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {applications = response.getEntity(Applications.class);}return anEurekaHttpResponse(response.getStatus(), Applications.class).headers(headersOf(response)).entity(applications).build();} finally {if (logger.isDebugEnabled()) {logger.debug( "Jersey HTTP GET {}/{}?{}; statusCode={}",serviceUrl, urlPath,regionsParamValue == null ? "" : "regions=" + regionsParamValue,response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}- 调用
AbstractJerseyEurekaHttpClient#getApplications(...)
方法,GET 请求 Eureka-Server 的apps/
接口,参数为regions
,返回格式为 JSON ,实现全量获取注册信息。
- 调用
第 16 至 24 行 :设置到本地注册信息缓存。
- 第 19 行 :TODO[0025] :并发更新的情况???
- 第 20 行 :调用
#filterAndShuffle(...)
方法,根据配置eureka.shouldFilterOnlyUpInstances = true
( 默认值 :true
) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。
3. Eureka-Server 接收全量获取
3.1 接收全量获取请求
com.netflix.eureka.resources.ApplicationsResource
,处理所有应用的请求操作的 Resource ( Controller )。接收全量获取请求,映射
ApplicationsResource#getContainers()
方法,实现代码如下:1:2: public Response getContainers(@PathParam("version") String version,3: @HeaderParam(HEADER_ACCEPT) String acceptHeader,4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,6: @Context UriInfo uriInfo,7: @Nullable @QueryParam("regions") String regionsStr) {8: // TODO[0009]:RemoteRegionRegistry9: boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();10: String[] regions = null;11: if (!isRemoteRegionRequested) {12: EurekaMonitors.GET_ALL.increment();13: } else {14: regions = regionsStr.toLowerCase().split( ",");15: Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.16: EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();17: }18:19: // 判断是否可以访问20: // Check if the server allows the access to the registry. The server can21: // restrict access if it is not22: // ready to serve traffic depending on various reasons.23: if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {24: return Response.status(Status.FORBIDDEN).build();25: }26:27: // API 版本28: CurrentRequestVersion.set(Version.toEnum(version));29:30: // 返回数据格式31: KeyType keyType = Key.KeyType.JSON;32: String returnMediaType = MediaType.APPLICATION_JSON;33: if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {34: keyType = Key.KeyType.XML;35: returnMediaType = MediaType.APPLICATION_XML;36: }37:38: // 响应缓存键( KEY )39: Key cacheKey = new Key(Key.EntityType.Application,40: ResponseCacheImpl.ALL_APPS,41: keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions42: );43:44: //45: Response response;46: if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {47: response = Response.ok(responseCache.getGZIP(cacheKey))48: .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)49: .header(HEADER_CONTENT_TYPE, returnMediaType)50: .build();51: } else {52: response = Response.ok(responseCache.get(cacheKey))53: .build();54: }55: return response;56: }- 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
- 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。
第 27 至 28 行 :设置 API 版本号。默认最新 API 版本为 V2。实现代码如下:
public enum Version {V1, V2;public static Version toEnum(String v) {for (Version version : Version.values()) {if (version.name().equalsIgnoreCase(v)) {return version;}}//Defaults to v2return V2;}}第 30 至 36 行 :设置返回数据格式,默认 JSON 。
- 第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在 「3.2.1 缓存键」详细解析。
- 第 44 至 55 行 :从响应缓存读取全量注册信息,在 「3.3 缓存读取」详细解析。
3.2 响应缓存 ResponseCache
com.netflix.eureka.registry.ResponseCache
,响应缓存接口,接口代码如下:
public interface ResponseCache {String get(Key key);byte[] getGZIP(Key key);void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);AtomicLong getVersionDelta();AtomicLong getVersionDeltaWithRegions();}其中,
#getVersionDelta()
和#getVersionDeltaWithRegions()
已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码:// Applications.javapublic void setVersion(Long version) {this.versionDelta = version;}// AbstractInstanceRegistry.javapublic Applications getApplicationDeltas() {// ... 省略其它无关代码apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方// ... 省略其它无关代码}#get()
:获得缓存。#getGZIP()
:获得缓存,并 GZIP 。#invalidate()
:过期缓存。
3.2.1 缓存键
com.netflix.eureka.registry.Key
,缓存键。实现代码如下:
public class Key {public enum KeyType {JSON, XML}/* An enum to define the entity that is stored in this cache for this key./public enum EntityType {Application, VIP, SVIP}/实体名/private final String entityName;/TODO[0009]:RemoteRegionRegistry/private final String[] regions;/请求参数类型/private final KeyType requestType;/请求 API 版本号/private final Version requestVersion;/hashKey/private final String hashKey;/实体类型{@link EntityType}/private final EntityType entityType;/{@link EurekaAccept}*/private final EurekaAccept eurekaAccept;public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {this.regions = regions;this.entityType = entityType;this.entityName = entityName;this.requestType = type;this.requestVersion = v;this.eurekaAccept = eurekaAccept;hashKey = this.entityType + this.entityName + ( null != this.regions ? Arrays.toString( this.regions) : "")+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();}public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {this.regions = regions;this.entityType = entityType;this.entityName = entityName;this.requestType = type;this.requestVersion = v;this.eurekaAccept = eurekaAccept;hashKey = this.entityType + this.entityName + ( null != this.regions ? Arrays.toString( this.regions) : "")+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();}public int hashCode() {String hashKey = getHashKey();return hashKey.hashCode();}public boolean equals(Object other) {if (other instanceof Key) {return getHashKey().equals(((Key) other).getHashKey());} else {return false;}}}
3.2.2 响应缓存实现类
com.netflix.eureka.registry.ResponseCacheImpl
,响应缓存实现类。在 ResponseCacheImpl 里,将缓存拆分成两层 :
- 只读缓存(
readOnlyCacheMap
) - 固定过期 + 固定大小的读写缓存(
readWriteCacheMap
)
默认配置下,缓存读取策略如下:
缓存过期策略如下:
- 应用实例注册、下线、过期时,只只只过期
readWriteCacheMap
。 readWriteCacheMap
写入一段时间( 可配置 )后自动过期。- 定时任务对比
readWriteCacheMap
和readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了readOnlyCacheMap
的定时过期。
注意:应用实例注册、下线、过期时,不会很快刷新到
readWriteCacheMap
缓存里。默认配置下,最大延迟在 30 秒。为什么可以使用缓存?
在 CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。
推荐阅读:
3.3 缓存读取
调用
ResponseCacheImpl#get(...)
方法(#getGzip(...)
类似 ),读取缓存,实现代码如下:
1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();2:3: private final LoadingCache<Key, Value> readWriteCacheMap;4:5: public String get(final Key key) {6: return get(key, shouldUseReadOnlyResponseCache);7: }8:9: String get(final Key key, boolean useReadOnlyCache) {10: Value payload = getValue(key, useReadOnlyCache);11: if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {12: return null;13: } else {14: return payload.getPayload();15: }16: }17:18: Value getValue(final Key key, boolean useReadOnlyCache) {19: Value payload = null;20: try {21: if (useReadOnlyCache) {22: final Value currentPayload = readOnlyCacheMap.get(key);23: if (currentPayload != null) {24: payload = currentPayload;25: } else {26: payload = readWriteCacheMap.get(key);27: readOnlyCacheMap.put(key, payload);28: }29: } else {30: payload = readWriteCacheMap.get(key);31: }32: } catch (Throwable t) {33: logger.error( "Cannot get value for key :" + key, t);34: }35: return payload;36: }- 第 5 至 7 行 :调用
#get(key, useReadOnlyCache)
方法,读取缓存。其中shouldUseReadOnlyResponseCache
通过配置eureka.shouldUseReadOnlyResponseCache = true
(默认值 :true
) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了readOnlyCacheMap
,性能会有一定的下降。 第 9 至 16 行 :调用
getValue(key, useReadOnlyCache)
方法,读取缓存。从readOnlyCacheMap
和readWriteCacheMap
变量可以看到缓存值的类为com.netflix.eureka.registry.ResponseCacheImpl.Value
,实现代码如下:public class Value {/*** 原始值*/private final String payload;/*** GZIP 压缩后的值*/private byte[] gzipped;public Value(String payload) {this.payload = payload;if (!EMPTY_PAYLOAD.equals(payload)) {// ... 省略 GZIP 压缩代码gzipped = bos.toByteArray();} else {gzipped = null;}}public String getPayload() {return payload;}public byte[] getGzipped() {return gzipped;}}第 21 至 31 行 :读取缓存。
- 第 21 至 28 行 :先读取
readOnlyCacheMap
。读取不到,读取readWriteCacheMap
,并设置到readOnlyCacheMap
。 - 第 29 至 31 行 :读取
readWriteCacheMap
。 readWriteCacheMap
实现代码如下:this.readWriteCacheMap =CacheBuilder.newBuilder().initialCapacity( 1000).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS).removalListener( new RemovalListener<Key, Value>() {public void onRemoval(RemovalNotification<Key, Value> notification) {// TODO[0009]:RemoteRegionRegistryKey removedKey = notification.getKey();if (removedKey.hasRegions()) {Key cloneWithNoRegions = removedKey.cloneWithoutRegions();regionSpecificKeys.remove(cloneWithNoRegions, removedKey);}}}).build( new CacheLoader<Key, Value>() {public Value load(Key key) throws Exception {// // TODO[0009]:RemoteRegionRegistryif (key.hasRegions()) {Key cloneWithNoRegions = key.cloneWithoutRegions();regionSpecificKeys.put(cloneWithNoRegions, key);}Value value = generatePayload(key);return value;}});readWriteCacheMap
最大缓存数量为 1000 。- 调用
#generatePayload(key)
方法,生成缓存值。
- 第 21 至 28 行 :先读取
#generatePayload(key)
方法,实现代码如下:
1: private Value generatePayload(Key key) {2: Stopwatch tracer = null;3: try {4: String payload;5: switch (key.getEntityType()) {6: case Application:7: boolean isRemoteRegionRequested = key.hasRegions();8:9: if (ALL_APPS.equals(key.getName())) {10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry11: tracer = serializeAllAppsWithRemoteRegionTimer.start();12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));13: } else {14: tracer = serializeAllAppsTimer.start();15: payload = getPayLoad(key, registry.getApplications());16: }17: } else if (ALL_APPS_DELTA.equals(key.getName())) {18: // ... 省略增量获取相关的代码19: } else {20: tracer = serializeOneApptimer.start();21: payload = getPayLoad(key, registry.getApplication(key.getName()));22: }23: break;24: // ... 省略部分代码25: }26: return new Value(payload);27: } finally {28: if (tracer != null) {29: tracer.stop();30: }31: }32: }- 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
- 第 13 至 16 行 :调用
AbstractInstanceRegistry#getApplications()
方法,获得注册的应用集合。后调用#getPayLoad()
方法,将注册的应用集合转换成缓存值。�� 这两个方法代码较多,下面详细解析。 - 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
3.3.1 获得注册的应用集合
调用
AbstractInstanceRegistry#getApplications()
方法,获得注册的应用集合,实现代码如下:
1: // AbstractInstanceRegistry.java2:3: private static final String[] EMPTY_STR_ARRAY = new String[ 0];4:5: public Applications getApplications() {6: boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();7: if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry8: return getApplicationsFromLocalRegionOnly();9: } else {10: return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.11: }12: }13:14: public Applications getApplicationsFromLocalRegionOnly() {15: return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);16: }- 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
第 9 至 16 行 :调用
#getApplicationsFromMultipleRegions(...)
方法,获得注册的应用集合,实现代码如下:1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {2: // TODO[0009]:RemoteRegionRegistry3: boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;4: logger.debug( "Fetching applications registry with remote regions: {}, Regions argument {}",5: includeRemoteRegion, Arrays.toString(remoteRegions));6: if (includeRemoteRegion) {7: GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();8: } else {9: GET_ALL_CACHE_MISS.increment();10: }11: // 获得获得注册的应用集合12: Applications apps = new Applications();13: apps.setVersion( 1L);14: for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {15: Application app = null;16:17: if (entry.getValue() != null) {18: for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {19: Lease<InstanceInfo> lease = stringLeaseEntry.getValue();20: if (app == null) {21: app = new Application(lease.getHolder().getAppName());22: }23: app.addInstance(decorateInstanceInfo(lease));24: }25: }26: if (app != null) {27: apps.addApplication(app);28: }29: }30: // TODO[0009]:RemoteRegionRegistry31: if (includeRemoteRegion) {32: for (String remoteRegion : remoteRegions) {33: RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);34: if ( null != remoteRegistry) {35: Applications remoteApps = remoteRegistry.getApplications();36: for (Application application : remoteApps.getRegisteredApplications()) {37: if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {38: logger.info( "Application {} fetched from the remote region {}",39: application.getName(), remoteRegion);40:41: Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());42: if (appInstanceTillNow == null) {43: appInstanceTillNow = new Application(application.getName());44: apps.addApplication(appInstanceTillNow);45: }46: for (InstanceInfo instanceInfo : application.getInstances()) {47: appInstanceTillNow.addInstance(instanceInfo);48: }49: } else {50: logger.debug( "Application {} not fetched from the remote region {} as there exists a "51: + "whitelist and this app is not in the whitelist.",52: application.getName(), remoteRegion);53: }54: }55: } else {56: logger.warn( "No remote registry available for the remote region {}", remoteRegion);57: }58: }59: }60: // 设置 应用集合 hashcode61: apps.setAppsHashCode(apps.getReconcileHashCode());62: return apps;63: }- 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
- 第 11 至 29 行 :获得获得注册的应用集合。
- 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
- 第 61 行 :计算应用集合
hashcode
。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
3.3.2 转换成缓存值
调用
#getPayLoad()
方法,将注册的应用集合转换成缓存值,实现代码如下:
/*Generate pay load with both JSON and XML formats for all applications.*/private String getPayLoad(Key key, Applications apps) {// 获得编码器EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());String result;try {// 编码result = encoderWrapper.encode(apps);} catch (Exception e) {logger.error( "Failed to encode the payload for all apps", e);return "";}if(logger.isDebugEnabled()) {logger.debug( "New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());}return result;}
3.4 主动过期读写缓存
应用实例注册、下线、过期时,调用
ResponseCacheImpl#invalidate()
方法,主动过期读写缓存(readWriteCacheMap
),实现代码如下:
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {for (Key.KeyType type : Key.KeyType.values()) {for (Version v : Version.values()) {invalidate(new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact));if ( null != vipAddress) {invalidate( new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));}if ( null != secureVipAddress) {invalidate( new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));}}}}调用
#invalidate(keys)
方法,逐个过期每个缓存键值,实现代码如下:public void invalidate(Key... keys) {for (Key key : keys) {logger.debug( "Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());// 过期读写缓存readWriteCacheMap.invalidate(key);// TODO[0009]:RemoteRegionRegistryCollection<Key> keysWithRegions = regionSpecificKeys.get(key);if ( null != keysWithRegions && !keysWithRegions.isEmpty()) {for (Key keysWithRegion : keysWithRegions) {logger.debug( "Invalidating the response cache key : {} {} {} {} {}",key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());readWriteCacheMap.invalidate(keysWithRegion);}}}}
3.5 被动过期读写缓存
读写缓存(
readWriteCacheMap
) 写入后,一段时间自动过期,实现代码如下:
expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())- 配置
eureka.responseCacheAutoExpirationInSeconds
,设置写入过期时长。默认值 :180 秒。
3.6 定时刷新只读缓存
定时任务对比
readWriteCacheMap
和readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了readOnlyCacheMap
的定时过期。实现代码如下:
1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {2: // ... 省略无关代码3:4: long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();5: // ... 省略无关代码6:7: if (shouldUseReadOnlyResponseCache) {8: timer.schedule(getCacheUpdateTask(),9: new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)10: + responseCacheUpdateIntervalMs),11: responseCacheUpdateIntervalMs);12: }13:14: // ... 省略无关代码15: }16:17: private TimerTask getCacheUpdateTask() {18: return new TimerTask() {19:20: public void run() {21: logger.debug( "Updating the client cache from response cache");22: for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键23: if (logger.isDebugEnabled()) {24: Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};25: logger.debug( "Updating the client cache from response cache for key : {} {} {} {}", args);26: }27: try {28: CurrentRequestVersion.set(key.getVersion());29: Value cacheValue = readWriteCacheMap.get(key);30: Value currentCacheValue = readOnlyCacheMap.get(key);31: if (cacheValue != currentCacheValue) { // 不一致时,进行替换32: readOnlyCacheMap.put(key, cacheValue);33: }34: } catch (Throwable th) {35: logger.error( "Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);36: }37: }38: }39: };40: }- 第 7 至 12 行 :初始化定时任务。配置
eureka.responseCacheUpdateIntervalMs
,设置任务执行频率,默认值 :30 * 1000 毫秒。 - 第 17 至 39 行 :创建定时任务。
- 第 22 行 :循环
readOnlyCacheMap
的缓存键。为什么不循环readWriteCacheMap
呢?readOnlyCacheMap
的缓存过期依赖readWriteCacheMap
,因此缓存键会更多。 - 第 28 行 至 33 行 :对比
readWriteCacheMap
和readOnlyCacheMap
的缓存值,若不一致,以前者为主。通过这样的方式,实现了readOnlyCacheMap
的定时过期。
- 第 22 行 :循环
更多推荐
所有评论(0)