SpringCloud 对 Eureka 的扩展

我们先介绍服务端和客户端的核心类,然后再总结执行流程。

1).服务端

作为一个服务注册及发现中心,主要解决一下几个问题

  • 1 . 服务实例如何注册到服务中心。

    本质上就是在服务启动的时候,需要调用Eureka Server 的Rest API 的refister 方法,去注册应用实例信息。

  • 2.服务实例如何从服务中心剔除

    正常情况下,通过钩子函数或其他生命周期方法去调用Eureka Server 的Rest API 的 deRegister 方法。在实例异常情况没用及时删除自身信息的问题,Eureka Server 要求Client 定时进行服务续约。如果超出一定时间没用进行续约操作,Eureka Server 会主动将实例剔除。

    1. 服务实例的一致性问题

    在集群中,Eureka 是peer to peer 架构,副本之间不分主从,任何副本都可以接收写操作,然后每个副本之间相互进行数据更新,Eureka Server 在接收到Client 的请求后,会复制到其他peer节点,其中Eureka Server 在执行复制的时候,使用HEADER_REPLICATION 的http header 来将这个请求与普通的实例的正常请求区分。peer to peer 还需要解决的一个问题就是数据的复制问题,Eureka 使用lastDirtyTImestamp标识进行处理:

    • 如果请求参数的lastDirtyTImestamp大于本地实例的lastDirtyTImestamp,则表示Server 之间出现冲突,返回404,要求应用实例重新进行register操作
    • 如果请求参数的lastDirtyTImestamp小于本地实例的lastDirtyTImestamp,如果是peer 节点的复制请求,则表示数据出现冲突,返回409 ,要求其同步自己最新的数据信息。

在介绍完服务端设计后,我们开始分析SpringCloud对Eureka的扩展。

我们以注解@EnableEurekaServer为切入点

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

@EnableEurekaServer使用了@Import 注解导入EurekaServerMarkerConfiguration类

/**
 * Responsible for adding in a marker bean to activate
 * {@link EurekaServerAutoConfiguration}
 *
 * @author Biju Kunjummen
 */
@Configuration
public class EurekaServerMarkerConfiguration {

	@Bean
	public Marker eurekaServerMarkerBean() {
		return new Marker();
	}

	class Marker {
	}
}

通过EurekaServerMarkerConfiguration上面的注释可知,EurekaServerMarkerConfiguration为标记类用来激活EurekaServerAutoConfiguration。

我们熟悉 SpringBoot 在启动时会扫描所有META-INF 下的Spring.factories 中的所有org.springframework.boot.autoconfigure.EnableAutoConfiguration。我们可以在spring-cloud-netflix-eureka-server下找到Spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

正如我们所料SpringCloudEureka 将配置交给了SpringBoot。接下来看看EurekaServerAutoConfiguration。

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
// 上面有提到过EurekaServerMarkerConfiguration 用来激活 EurekaServerAutoConfiguration ,这里通过ConditionalOnBean实现
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
//  获取服务实例配置
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
      InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    
    /**
	 * 这两个包中包含Jersey需要的资源,Eureka server 是通过Jersey框架暴露各个端口
	 */
	private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
			"com.netflix.eureka" };
    // 实例信息
    @Autowired
	private ApplicationInfoManager applicationInfoManager;
	// 服务端配置
	@Autowired
	private EurekaServerConfig eurekaServerConfig;
	// 客户端配置
	@Autowired
	private EurekaClientConfig eurekaClientConfig;
    
    ....
    
    @Configuration
	protected static class EurekaServerConfigBeanConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
			EurekaServerConfigBean server = new EurekaServerConfigBean();
             // 是否需要想Eureka 注册, 有时候单实例Server 是不需要的
			if (clientConfig.shouldRegisterWithEureka()) {
				// Set a sensible default if we are supposed to replicate
				server.setRegistrySyncRetries(5);
			}
			return server;
		}
	}
    
    // eureka 控制台
    @Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}
    ....
     // PeerAwareInstanceRegistry 服务端用于处理服务通信 , 由上一节的AbstractInstanceRegistry实现,PeerAwareInstanceRegistryImpl扩展集群通信
     @Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}
    // 集群节点
    @Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}
    // 可刷新的PeerEurekaNodes peer节点 
    static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
			implements ApplicationListener<EnvironmentChangeEvent> {

		public RefreshablePeerEurekaNodes(
				final PeerAwareInstanceRegistry registry,
				final EurekaServerConfig serverConfig,
				final EurekaClientConfig clientConfig, 
				final ServerCodecs serverCodecs,
				final ApplicationInfoManager applicationInfoManager) {
			super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
		}
		//  监听 EnvironmentChangeEvent  , 更新 peerEurekaNodes
		@Override
		public void onApplicationEvent(final EnvironmentChangeEvent event) {
			if (shouldUpdate(event.getKeys())) {
				updatePeerEurekaNodes(resolvePeerUrls());
			}
		}
		
		/*
		 * 检查是否需要更新
		 */
		protected boolean shouldUpdate(final Set<String> changedKeys) {
			assert changedKeys != null;
			
			// 读取配置
			if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
				return false;
			}
			
			if (changedKeys.contains("eureka.client.region")) {
				return true;
			}
			
			for (final String key : changedKeys) {
				// property keys are not expected to be null.
				if (key.startsWith("eureka.client.service-url.") ||
					key.startsWith("eureka.client.availability-zones.")) {
					return true;
				}
			}
			
			return false;
		}
	}
    //  保存 EurekaServer 实例的信息 PeerAwareInstanceRegistry 以及 PeerEurekaNodes
    @Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}
    
    //  EurekaServer 启动类
    @Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}
    // 创建  jerseyApplication 并将  EUREKA_PACKAGES 下的Eureka 类资源交给 jerseyApplication
    @Bean
	public javax.ws.rs.core.Application jerseyApplication(Environment environment,
			ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		//
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		//
		Set<Class<?>> classes = new HashSet<>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
						resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}

		// Construct the Jersey ResourceConfig
		//
		Map<String, Object> propsAndFeatures = new HashMap<>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);

		return rc;
	}
    
    
}

由上面代码可以看出 EurekaServerAutoConfiguration 配置了 EurekaServerConfig , PeerAwareInstanceRegistry,PeerEurekaNodes , EurekaServerContext ,EurekaServerBootstrap,jerseyApplication。下面介绍几个核心类 。

EurekaServerContext:

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);

    private final EurekaServerConfig serverConfig;
    private final ServerCodecs serverCodecs;
    private final PeerAwareInstanceRegistry registry;
    private final PeerEurekaNodes peerEurekaNodes;
    private final ApplicationInfoManager applicationInfoManager;

    @Inject
    public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
                               ServerCodecs serverCodecs,
                               PeerAwareInstanceRegistry registry,
                               PeerEurekaNodes peerEurekaNodes,
                               ApplicationInfoManager applicationInfoManager) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.registry = registry;
        this.peerEurekaNodes = peerEurekaNodes;
        this.applicationInfoManager = applicationInfoManager;
    }
	// 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的inti()方法。被@PostConstruct修饰的方法会在构造函数之后,init()方法之前运行。
    @PostConstruct
    @Override
    public void initialize() {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        try {
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }
	// 被@PreDestroy修饰的方法会在服务器卸载Servlet的时候运行,并且只会被服务器调用一次,类似于Servlet的destroy()方法。被@PreDestroy修饰的方法会在destroy()方法之后运行,在Servlet被彻底卸载之前。
    @PreDestroy
    @Override
    public void shutdown() {
        logger.info("Shutting down ...");
        registry.shutdown();
        peerEurekaNodes.shutdown();
        logger.info("Shut down");
    }

    @Override
    public EurekaServerConfig getServerConfig() {
        return serverConfig;
    }

    @Override
    public PeerEurekaNodes getPeerEurekaNodes() {
        return peerEurekaNodes;
    }

    @Override
    public ServerCodecs getServerCodecs() {
        return serverCodecs;
    }

    @Override
    public PeerAwareInstanceRegistry getRegistry() {
        return registry;
    }

    @Override
    public ApplicationInfoManager getApplicationInfoManager() {
        return applicationInfoManager;
    }

}

initialize() 方法会调用PeerEurekaNodesde的start方法 ,更新peer节点路径信息。

PeerEurekaNodes :

public class PeerEurekaNodes {
	
    protected final PeerAwareInstanceRegistry registry;
    protected final EurekaServerConfig serverConfig;
    protected final EurekaClientConfig clientConfig;
    protected final ServerCodecs serverCodecs;
    private final ApplicationInfoManager applicationInfoManager;
	// 保存 peer 节点信息
    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
	//  用于更新 PeerEurekaNodes 定时任务
    private ScheduledExecutorService taskExecutor;
	// 开启更新peer节点信息的定时任务
     public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
    // 关闭定时任务, 清除节点信息
    public void shutdown() {
        taskExecutor.shutdown();
        List<PeerEurekaNode> toRemove = this.peerEurekaNodes;

        this.peerEurekaNodes = Collections.emptyList();
        this.peerEurekaNodeUrls = Collections.emptySet();

        for (PeerEurekaNode node : toRemove) {
            node.shutDown();
        }
    }
    
    /**
    其他方法比较简单
    */
}

EurekaServerBootstrap:

public class EurekaServerBootstrap {
    
    protected EurekaServerConfig eurekaServerConfig;

	protected ApplicationInfoManager applicationInfoManager;

	protected EurekaClientConfig eurekaClientConfig;

	protected PeerAwareInstanceRegistry registry;

	protected volatile EurekaServerContext serverContext;
	protected volatile AwsBinder awsBinder;
    //初始化eureka 环境信息, 设置EurekaServerContextHolder
    public void contextInitialized(ServletContext context) {
		try {
			initEurekaEnvironment();
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}
	//销毁eureka 环境信息, 设置EurekaServerContextHolder
	public void contextDestroyed(ServletContext context) {
		try {
			log.info("Shutting down Eureka Server..");
			context.removeAttribute(EurekaServerContext.class.getName());

			destroyEurekaServerContext();
			destroyEurekaEnvironment();

		}
		catch (Throwable e) {
			log.error("Error shutting down eureka", e);
		}
		log.info("Eureka Service is now shutdown...");
	}
}

Eureka Server 是通过jersey 框架暴露REST 端口的。在eureka-core的resources包中可以看到AppcationResource类和InstanceResource类,可以看到Eureka Server 的API 端口。

@Configuration
public class EurekaServerInitializerConfiguration
      implements ServletContextAware, SmartLifecycle, Ordered {
    .....

	@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//启动eureka server 
		eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");
					// 发布事件通知
					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}
	.....  
}

EurekaServerInitializerConfiguration 实现了SmartLifecycle接口 ,会在ApplicationContext的refresh的finishRefresh方法调用start()方法,执行eurekaServerBootstrap的contextInitialized方法,启动Eureka Server ,这时Eureka Server就可以正常服务了。

2).客户端

同样,我们在Maven仓库找到eureka-client,可以在spring.factories文件钟的配置类:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

我们先介绍EurekaClientAutoConfiguration:

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
      CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
      "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
      "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
    
    ....  
		// eureka client 配置类 
       @Bean
	@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
	public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
		EurekaClientConfigBean client = new EurekaClientConfigBean();
		if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
			// We don't register during bootstrap by default, but there will be another
			// chance later.
			client.setRegisterWithEureka(false);
		}
		return client;
	}
        
    // eureka 实例配置
    @Bean
	@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
	public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
															 ManagementMetadataProvider managementMetadataProvider) {
		String hostname = getProperty("eureka.instance.hostname");
		boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
		String ipAddress = getProperty("eureka.instance.ip-address");
		boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));

		String serverContextPath = env.getProperty("server.context-path", "/");
		int serverPort = Integer.valueOf(env.getProperty("server.port", env.getProperty("port", "8080")));

		Integer managementPort = env.getProperty("management.server.port", Integer.class);// nullable. should be wrapped into optional
		String managementContextPath = env.getProperty("management.server.servlet.context-path");// nullable. should be wrapped into optional
		Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);//nullable
		EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);

		instance.setNonSecurePort(serverPort);
		instance.setInstanceId(getDefaultInstanceId(env));
		instance.setPreferIpAddress(preferIpAddress);
		instance.setSecurePortEnabled(isSecurePortEnabled);
		if (StringUtils.hasText(ipAddress)) {
			instance.setIpAddress(ipAddress);
		}

		if(isSecurePortEnabled) {
			instance.setSecurePort(serverPort);
		}

		if (StringUtils.hasText(hostname)) {
			instance.setHostname(hostname);
		}
		String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
		String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");

		if (StringUtils.hasText(statusPageUrlPath)) {
			instance.setStatusPageUrlPath(statusPageUrlPath);
		}
		if (StringUtils.hasText(healthCheckUrlPath)) {
			instance.setHealthCheckUrlPath(healthCheckUrlPath);
		}

		ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
				serverContextPath, managementContextPath, managementPort);

		if(metadata != null) {
			instance.setStatusPageUrl(metadata.getStatusPageUrl());
			instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
			if(instance.isSecurePortEnabled()) {
				instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
			}
			Map<String, String> metadataMap = instance.getMetadataMap();
			if (metadataMap.get("management.port") == null) {
				metadataMap.put("management.port", String.valueOf(metadata.getManagementPort()));
			}
		} else {
			//without the metadata the status and health check URLs will not be set
			//and the status page and health check url paths will not include the
			//context path so set them here
			if(StringUtils.hasText(managementContextPath)) {
				instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
				instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
			}
		}

		setupJmxPort(instance, jmxPort);
		return instance;
	}
    
   // 客户端服发现类  用于获取服务
    @Bean
	public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
		return new EurekaDiscoveryClient(config, client);
	}
    // 用于注册和取消注册  通过 EurekaRegistration 
    @Bean
	public EurekaServiceRegistry eurekaServiceRegistry() {
		return new EurekaServiceRegistry();
	}
    
    // 可刷新的配置  大部分情况下都是可刷新 
    @Configuration
	@ConditionalOnRefreshScope
	protected static class RefreshableEurekaClientConfiguration {

		@Autowired
		private ApplicationContext context;

		@Autowired
		private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
		
        //  客户端用于服务通信类 (服务发现,注册, 续约 ) 	@Lazy  懒加载
		@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
			manager.getInfo(); // force initialization
			return new CloudEurekaClient(manager, config, this.optionalArgs,
					this.context);
		}
	// 应用信息
		@Bean
		@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
			InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
			return new ApplicationInfoManager(config, instanceInfo);
		}
	//  代表了一个应用服务的实例在服务发现系统中
		@Bean
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@ConditionalOnBean(AutoServiceRegistrationProperties.class)
		@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
		public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
													 CloudEurekaInstanceConfig instanceConfig,
													 ApplicationInfoManager applicationInfoManager,
													 @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
			return EurekaRegistration.builder(instanceConfig)
					.with(applicationInfoManager)
					.with(eurekaClient)
					.with(healthCheckHandler)
					.build();
		}

	}
    
    
    @Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
	public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry,
																	   EurekaRegistration registration) {
		return new EurekaAutoServiceRegistration(context, registry, registration);
	}
    
}

上面代码自动配置的大部分配置实例,注释上有大致作用,希望读者自行研究。

3).客户端启动流程

自动配置类中主要的就是EurekaClientAutoConfiguration 。 在上面EurekaClientAutoConfiguration 类介绍提到过一般都是配置可刷新的,其中EurekaClient为懒加载模式

     	@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
			manager.getInfo(); // force initialization
			return new CloudEurekaClient(manager, config, this.optionalArgs,
					this.context);
		}

CloudEurekaClient 是继承自 DiscoveryClient , 我们上一节讲过 DiscoveryClient 在初始化的时候会配置实例信息 , 向服务器发起注册,并启动服务续约定时任务和缓存刷新任务。由于上面配置的是懒加载模式,在Spring 配置bean 时,不会立即初始化。CloudEurekaClient 初始化在EurekaAutoServiceRegistration,其实从名字就可与看出这个类的作用(这也是Spring的优点)。

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
    .....
        
        @Override
	public void start() {
		// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
		if (this.port.get() != 0) {
			if (this.registration.getNonSecurePort() == 0) {
				this.registration.setNonSecurePort(this.port.get());
			}

			if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
				this.registration.setSecurePort(this.port.get());
			}
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

			this.serviceRegistry.register(this.registration);

			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
			this.running.set(true);
		}
	}
        
    ....
}

看到了我们熟悉的SmartLifecycle接口以及start方法,我们说过实现了SmartLifecycle接口 ,会在ApplicationContext的refresh的finishRefresh方法调用start()方法,这里主要看this.serviceRegistry.register(this.registration);

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
   
    ......
        
    @Override
	public void register(EurekaRegistration reg) {
         // 启动 EurekaClient
		maybeInitializeClient(reg);

		if (log.isInfoEnabled()) {
			log.info("Registering application " + reg.getInstanceConfig().getAppname()
					+ " with eureka with status "
					+ reg.getInstanceConfig().getInitialStatus());
		}

		reg.getApplicationInfoManager()
				.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

		reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
				reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
	}
	
	private void maybeInitializeClient(EurekaRegistration reg) {
		// 强制初始化 。 
		reg.getApplicationInfoManager().getInfo();
		reg.getEurekaClient().getApplications();
	}    
        
    ......
    
}

在这里初始化了DiscoveryClient 配置实例信息 , 向服务器发起注册,并启动服务续约定时任务和缓存刷新任务。客户端执行完成 。

4).服务端执行流程

自动配置类中主要的就是EurekaServerAutoConfiguration。主要看EurekaServerContext , EurekaServerInitializerConfiguration,EurekaServerBootstrap,javax.ws.rs.core.Application(jerseyApplication) Eureka Server 使用jerseyApplication暴露服务端口,进行服务通信。

EurekaServerContext :

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
		....
     //         @PostConstruct  这个注解好像在哪里见过?  没错他会在bean初始化时调用
    @PostConstruct
    @Override
    public void initialize() {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        try {
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }
            
         ....

}

EurekaServerContext 默认实现类 DefaultEurekaServerContext 会在初始化时调用peerEurekaNodes.start() ,以及registry.init(peerEurekaNodes)

PeerEurekaNodes:

@Singleton
public class PeerEurekaNodes {
    .....
public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
    .... 
}

从PeerEurekaNodes#start()方法知道,会先更新Peer节点信息,以及开启更新Peer节点的定时任务。

看updatePeerEurekaNodes()方法:

//  从配置里拿到 peers 的url 
protected List<String> resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) {
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }
	// 得到 需要剔除的peer  以及需要添加的url 
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);

    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
        return;
    }

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }

    // Add new peers
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }

    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}

在eureka 初始化的时候 , 没用需要剔除的url 都需要添加,会调用createPeerEurekaNode()方法。

 protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

通过HttpReplicationClient 向peer 发送请,获得信息后创建PeerEurekaNode。完成peer之间的注册。

EurekaServerInitializerConfiguration:

@Configuration
public class EurekaServerInitializerConfiguration
      implements ServletContextAware, SmartLifecycle, Ordered {
	.....
        @Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}
        
        .....

}

熟悉的SmartLifecycle接口? 直接看start方法吧,启动EurekaServerBootstrap#contextInitialized . 将running设置为true。发布eureka 事件 。

EurekaServerBootstrap :

public class EurekaServerBootstrap {

.....
    public void contextInitialized(ServletContext context) {
		try {
             // 初始化 eureka  环境 
			initEurekaEnvironment();
            //  初始化  eureka  上下文 
			initEurekaServerContext();
			// 设置ServletContext 属性
			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}
    .....

}

Eureka Server 就以及正常启动了(使用jerseyApplication暴露服务端口)。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐