深入理解Spring cloud源码篇之Eureka源码
Spring Colud是一个
1.eureka功能分析
首先,eureka在springcloud中充当服务注册功能,相当于dubbo+zk里面得zk,但是比zk要简单得多,zk可以做得东西太多了,包括分布式锁,分布式队列都是基于zk里面得四种节点加watch机制通过长连接来实现得,但是eureka不一样,eureka是基于HTTPrest来实现的,就是把服务的信息放到一个ConcurrentHashMap中,然后服务启动的时候去读取这个map,来把所有服务关联起来,然后服务器之间调用的时候通过信息,进行http调用。eureka包括两部分,一部分就是服务提供者(对于eureka来说就是客户端),一部分是服务端,客户端需要每个读取每个服务的信息,然后注册到服务端,很明显了,这个服务端就是接受客户端提供的自身的一些信息。
2.eureka客户端源码分析
如果看spring的源码的话我们一般会找到Spring 源码包里面的META-INF文件夹下面的spring.handlers文件,然后直接找到XXXHandler的源码文件,紧着着就会分析springxml里面的各种标签解析。在看cloud源码的时候,我们则是找到META-INF文件下的spring.factories,找到里面的类去分析功能。
我们根据上面的描述首先找到eureka-client(1.4.0)包下面的spring.factories文件中的EurekaClientAutoConfiguration配置类。我们知道一个eureka客户端最重要的功能也就是四点:
- 2.1读取该项目的ip,instance_id,端口号,注册到服务端
- 2.2服务下架
- 2.3心跳机制
- 2.4获取其他服务器信息
2.1服务注册
基于这个思想,我们先找到第一个配置就是在哪读取的application.properties文件,我们看到eurekaInstanceConfigBean()方法,就是读取配置文件到EurekaInstanceConfigBean对象中,并且有@bean注册到ioc的容器中。EurekaInstanceConfigBean对象就包括客户端的ip,instance_id,端口号等等信息。我们看到以下代码是对EurekaInstanceConfigBean的一个包装:
@Bean
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {//上文说的eurekaInstanceConfigBean是EurekaInstanceConfig的实现类
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
接着就是服务注册了:
@Bean(destroyMethod = "shutdown")
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
manager.getInfo(); // force initialization
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
我们直接看到super里面的方法,在initScheduledTasks();之上就是创建一些线程池,initScheduledTasks里面开启了一个线程heartbeat,我们看到了:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
在renew方法里,如果返回为404的话,则会调用register()方法去注册,这个发送心跳的时间间隔也可配置,在配置源码的定时器里可以找到,跟读源码的时候发现调用这个register方法除了renew还有InstanceInfoReplicator线程里面的run方法,这个定时器的时间间隔是40秒,在服务启动的时候也会去设置条件合适去执行定时器,这个定时器的作用就是当配置信息改变的时候去调用register,当初次启动的时候也会去调用一下,因为调用了refreshInstanceInfo(),所以isInstanceInfoDirty的值就变成了true,所以,初次注册的时候也会注册到这里,之后除了特殊情况其他的的都不会走register().特殊情况包括:IP的改变,某些配置文件参数的改变,从下面代码可以看出来:
public void refreshDataCenterInfoIfRequired() {
String existingAddress = instanceInfo.getHostName();
String newAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh data center info, and return up to date address
newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
} else {
newAddress = config.getHostName(true);
}
String newIp = config.getIpAddress();
if (newAddress != null && !newAddress.equals(existingAddress)) {
logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
// :( in the legacy code here the builder is acting as a mutator.
// This is hard to fix as this same instanceInfo instance is referenced elsewhere.
// We will most likely re-write the client at sometime so not fixing for now.
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
instanceInfo.setIsDirty();//设置isInstanceInfoDirty为true,lastDirtyTimestamp为当前时间
}
}
public void refreshLeaseInfoIfRequired() {
LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
if (leaseInfo == null) {
return;
}
int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {//配置参数变了
LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(currentLeaseRenewal)
.setDurationInSecs(currentLeaseDuration)
.build();
instanceInfo.setLeaseInfo(newLeaseInfo);
instanceInfo.setIsDirty();
}
}
以上就是eureka客户端的注册。
2.2服务下架
我们看EurekaClient接口,里面有个shutdown,我们看到@PreDestroy当servlet关闭的时候就会触发。
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();//关闭心跳,服务替换,缓存刷新等定时器
// If APPINFO was registered
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//设置状态为down
unregister();//通知服务端客户端下线
}
logger.info("Completed shut down of DiscoveryClient");
}
2.3心跳机制
2.1我们分析了服务注册,设计到了renew()当返回404的时候是服务注册,200的时候就是发送心跳的机制默认30秒发送一次。
2.4服务获取
当eureka客户端启动的时候会注册到eureka服务端上,其他客户端也需要感知该eureka启动,从而读取配置信息,服务之间的信息获取也是通过定时器获取的,在initScheduledTasks();方法中,我们看到启动了一个CacheRefreshThread线程,时间间隔默认为30秒,我们直接看该线程里面的fetchRegistry(boolean forceFullRegistryFetch);方法,这里有两种拉取,一种是全量拉取,一种是增量拉取。全量拉取方法为getAndStoreFullRegistry()代码:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())//rest请求服务器获得实例信息
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));//存放到DiscoveryClient对象的localRegionApps的AtomicReference对象中
} else {
}
}
3.eureka服务端源码分析
分析eureka客户端功能的时候我们发现客户端是通过httprest请求来注册/拉取信息的,那么eureka服务端一定是一个类似spring MVC的项目结构。找到EurekaServerAutoConfiguration类,看到jerseyApplication()方法,在容器中存放了一个jerseyApplication对象,jerseyApplication()方法里的东西和Spring源码里扫描@Component逻辑类似,扫描@Path和@Provider标签,然后封装成beandefinition,封装到Application的set容器里。通过filter过滤器来过滤url进行映射到对象的Controller。
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();//核心是一个filter
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));//拦截/eureka开头的所有请求
return bean;
}
以上是对jersey的初步介绍,通过分析eureka客户端,我们大概知道客户端有这几个功能
- 服务接受请求(认识jersey)
- 接受客户端注册/心跳/下架请求并处理
- 服务剔除
以下是eureka服务端自身高可用层面的功能点
- 自我保护
- 服务之间的信息同步
3.1服务怎么接受请求
上面介绍了jersey和eureka怎么集成jersey,这里就不多说。
3.2接受客户端注册/心跳/下架请求并处理
服务端接受客户端的注册
在eurekawiki上https://github.com/Netflix/eureka/wiki/Eureka-REST-operations我们我们知道注册到服务端是调用的POST /eureka/v2/apps/appID 接口,找到了ApplicationsResource类中调用了ApplicationResource的addInstance()方法,找到register()方法
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;//默认有效时长90m
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);//实例注册,下面具体看这个
//同步到其他服务
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();//线程安全的一个服务实例map,name为cloud项目中的实例名字,嵌套里面的map是以key为instanceId,Lease对象为value的一个map
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//appname就为cloud配置里的spring.application.name
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//如果第一个实例注册进来的时候会给registryput进去一个空的lease
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//这个id就是instanceId
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//put了一个lease
gMap.put(registrant.getId(), lease);
} finally {
read.unlock();
}
}
服务端接受客户端的续约(心跳)
接口在InstanceResource#renewLease()。服务续约其实就是维护实例状态,更新一下最后更新时间,然后同步到其他服务端。直接看renew()方法
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到实例对应的lease对象
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);//得到实例
}
if (leaseToRenew == null) {//error
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
...
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
instanceInfo.setStatus(overriddenInstanceStatus);//修改实例状态
}
}
renewsLastMin.increment();
leaseToRenew.renew();//更新组后更新时间
return true;
}
}
服务端接受客户端要下架请求
服务下架接口在InstanceResource#cancelLease()方法,直接看internalCancel()方法
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到所有实例
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);//从map中移除掉下架实例
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
}
}
3.3服务剔除
当客户端长时间(默认90秒)没有给服务端发送请求的时候,就说明客户端down了,看过Spring源码得都明白,Spring源码比较重要得方法就在AbstractApplicationContext#refresh()方法,里面从扫描了xml/java文件到扫描注解,到进行DI到ioc容器然后再到销毁bean,最后有一个finishRefresh();方法,这是Spring所有工作做完之后调用得方法,一直调到了DefaultLifecycleProcessor#onRefresh()下得#startBeans(true);下的#start();下的doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);下的start()方法。在这里会调用到实现了Lifecycle接口的所有的start()方法,而在EurekaServerAutoConfiguration类中,我们看到import了一个实现了Lifecycle接口的EurekaServerInitializerConfiguration类,在start方法里初始化了一个单独的EurekaServerContext的上下文。在initEurekaServerContext()方法中,
执行了registry.openForTraffic(applicationInfoManager, registryCount);最后一句调用了AbstractInstanceRegistry#postInit()方法,在此方法里开启了一个每60秒调用一次EvictionTask#evict()的定时器。
public void evict(long additionalLeaseMs) {
if (!isLeaseExpirationEnabled()) {//如果开启自我保护,则不自动剔除。默认开启
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);//如果过期了,加入到expiredLeases的list中
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);//移除服务器缓存,同步其他服务器
}
}
}
3.4服务自我保护模式
客户端长时间不发送续约(心跳),服务端默认每一分钟会进行一次服务剔除,3.3里又一个isLeaseExpirationEnabled()方法:
/**
* 期望 最大 每分钟 续租 次数。 计算公式 当前注册的应用实例数 x 2
*/
protected volatile int expectedNumberOfRenewsPerMin ;
/**
* 期望 最小 每分钟 续租 次数。 计算公式 expectedNumberOfRenewsPerMin * 续租百分比( eureka.renewalPercentThreshold )
*/
protected volatile int numberOfRenewsPerMinThreshold ;
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {//默认打开自我保护,false则关闭自我保护
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;//每分钟心跳数大于期望最小每分钟续租次数代表这个实例还活着
}
3.5服务之间信息同步
上面说到的服务注册,服务剔除,服务续约等功能的时候在修改完本地业务之后会调用PeerAwareInstanceRegistryImpl#replicateToPeers()方法,同步到其他服务器。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {//
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
Eureka二次传播问题:当eureka A配置到B,B配置到C的时候,客户端注册到服务器A,这个时候服务器A,B会有客户端信息,C则没有。代码分析结果如下:
当客户端注册到服务端A的时候A上有客户端信息,这个时候会同步一遍B服务端,则,B同步到C的时候isReplication则为false,就不会同步过去了。
更多推荐
所有评论(0)