Nacos——服务订阅
Nacos的服务订阅源码分析,结合Spring Cloud Gateway的服务化路由讲解Nacos的服务订阅
Nacos——服务订阅
1. 前提
当服务提供者注册实例到Nacos服务端后,服务消费者就需要订阅提供者服务来进行调用。常见的服务消费者有Dubbo消费者,Spring Cloud消费者,但由于没有使用过Dubbo和Spring Cloud从Nacos中订阅服务,而使用过Spring Cloud Gateway结合Nacos做服务化路由,所以会结合Spring Cloud Gateway
来讲Nacos的服务订阅。
2. maven依赖
spring-cloud-starter-gateway
: 2.2.5.RELEASE
spring-cloud-starter-alibaba-nacos-discovery
: 2.2.4.RELEASE
nacos-client
: 1.4.2
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.4.2</version>
</dependency>
3. 流程图
- 服务提供者注册实例到Nacos服务端集群
- 网关需要服务化路由时,订阅服务提供者的实例集合
- 根据负载均衡策略从实例集合中选择一个实例进行路由
4. 源码分析
-
启动时订阅自己
NacosDiscoveryClientConfiguration
初始化NacosWatch
@Bean @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true) public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties, ObjectProvider<ThreadPoolTaskScheduler> taskExecutorObjectProvider) { return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties, taskExecutorObjectProvider); }
NacosWatch实现了
SmartLifecycle
接口,即在Spring容器生命周期启动和销毁时执行相关操作,对应start()
和stop()
private final AtomicBoolean running = new AtomicBoolean(false); private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16); @Override public void start() { if (this.running.compareAndSet(false, true)) { // 事件监听处理器 EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), event -> new EventListener() { @Override public void onEvent(Event event) { if (event instanceof NamingEvent) { List<Instance> instances = ((NamingEvent) event) .getInstances(); // 筛选出当前实例 Optional<Instance> instanceOptional = selectCurrentInstance( instances); // 更新元数据 instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); }); } } }); // 获取NamingService NamingService namingService = nacosServiceManager .getNamingService(properties.getNacosProperties()); try { // 订阅自己 namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (Exception e) { log.error("namingService subscribe failed, properties:{}", properties, e); } // 发布HeartbeatEvent事件 this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( this::nacosServicesWatch, this.properties.getWatchDelay()); } } /** * 从实例集合中筛选出当前ip和端口的实例 */ private Optional<Instance> selectCurrentInstance(List<Instance> instances) { return instances.stream() .filter(instance -> properties.getIp().equals(instance.getIp()) && properties.getPort() == instance.getPort()) .findFirst(); } /** * 订阅到的实例元数据不同则更新 */ private void resetIfNeeded(Instance instance) { if (!properties.getMetadata().equals(instance.getMetadata())) { properties.setMetadata(instance.getMetadata()); } } /** * 发布HeartbeatEvent事件用于消费者进行更新操作 * 比如 Spring Cloud Gateway中的RouteRefreshListener */ public void nacosServicesWatch() { // nacos doesn't support watch now , publish an event every 30 seconds. this.publisher.publishEvent( new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); } @Override public void stop() { if (this.running.compareAndSet(true, false)) { // 关闭线程池 if (this.watchFuture != null) { // shutdown current user-thread, // then the other daemon-threads will terminate automatic. this.taskScheduler.shutdown(); this.watchFuture.cancel(true); } EventListener eventListener = listenerMap.get(buildKey()); try { NamingService namingService = nacosServiceManager .getNamingService(properties.getNacosProperties()); // 注销订阅 namingService.unsubscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (Exception e) { log.error("namingService unsubscribe failed, properties:{}", properties, e); } } }
start()
逻辑:声明一个监听处理器eventListener,调用Nacos服务端订阅自己,订阅完后触发eventListener中的逻辑更新自身的元数据,并发布HeartbeatEvent事件。接着看看
NamingService.subscribe()
NacosNamingService
/** * 订阅服务 */ @Override public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener); }
HostReactor
private final Map<String, ServiceInfo> serviceInfoMap; private final InstancesChangeNotifier notifier; public void subscribe(String serviceName, String clusters, EventListener eventListener) { // 注册监听器 notifier.registerListener(serviceName, clusters, eventListener); // 更新服务 getServiceInfo(serviceName, clusters); } /** * 从serviceInfoMap中获取服务信息 */ public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 从serviceInfoMap中获取Service信息 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // 创建服务 if (null == serviceObj) { // 初始化ServiceInfo serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); // 从远程拉取数据进行更新 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // 更新服务 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { // 休眠5秒 serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 定时更新 scheduleUpdateIfAbsent(serviceName, clusters); // 从serviceInfoMap中获取 return serviceInfoMap.get(serviceObj.getKey()); } private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key); } private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } public void updateService(String serviceName, String clusters) throws NacosException { // 从serviceInfoMap中获取Service信息做为旧的信息 ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 远程调用Nacos服务端接口GET /instance/list获取服务信息 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 处理结果更新到serviceInfoMap中 processServiceJson(result); } } finally { // 唤醒旧数据 if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private final String clusters; private final String serviceName; /** * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty */ private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (failCount == limit) { return; } failCount++; } private void resetFailCount() { failCount = 0; } @Override public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { // 更新服务 updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!notifier.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } } }
InstancesChangeNotifier
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>(); /** * 注册监听器 */ public void registerListener(String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); // 双重检测锁 if (eventListeners == null) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet<EventListener>(); // 加入到listenerMap listenerMap.put(key, eventListeners); } } } // 对应服务的监听器集合加入传入的监听器 eventListeners.add(listener); } @Override public void onEvent(InstancesChangeEvent event) { String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters()); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (CollectionUtils.isEmpty(eventListeners)) { return; } for (final EventListener listener : eventListeners) { final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { ((AbstractEventListener) listener).getExecutor().execute(new Runnable() { @Override public void run() { listener.onEvent(namingEvent); } }); continue; } listener.onEvent(namingEvent); } }
HostReactor.subscribe()
逻辑图这里有个疑问:为什么要订阅自己?只是为了实时更新元数据?
-
订阅指定服务
在
NacosDiscoveryClientConfiguration
中实例化了DiscoveryClient
的实现类NacosDiscoveryClient
@Bean public DiscoveryClient nacosDiscoveryClient( NacosServiceDiscovery nacosServiceDiscovery) { return new NacosDiscoveryClient(nacosServiceDiscovery); }
NacosDiscoveryClient中主要有两个方法
getInstances()
和getServices()
,分别用来获取服务实例集合和获取Nacos组下所有服务名,而这两个方法实际是调用了NacosServiceDiscovery
private NacosServiceDiscovery serviceDiscovery; @Override public List<ServiceInstance> getInstances(String serviceId) { try { return serviceDiscovery.getInstances(serviceId); } catch (Exception e) { throw new RuntimeException( "Can not get hosts from nacos server. serviceId: " + serviceId, e); } } @Override public List<String> getServices() { try { return serviceDiscovery.getServices(); } catch (Exception e) { log.error("get service name from nacos server fail,", e); return Collections.emptyList(); } }
NacosServiceDiscovery
/** * 返回Nacos组下所有服务 */ public List<String> getServices() throws NacosException { String group = discoveryProperties.getGroup(); ListView<String> services = namingService().getServicesOfServer(1, Integer.MAX_VALUE, group); return services.getData(); } /** * 获取Service下所有实例 */ public List<ServiceInstance> getInstances(String serviceId) throws NacosException { String group = discoveryProperties.getGroup(); List<Instance> instances = namingService().selectInstances(serviceId, group, true); return hostToServiceInstanceList(instances, serviceId); } /** * 将Instance对象转换为ServiceInstance */ public static List<ServiceInstance> hostToServiceInstanceList( List<Instance> instances, String serviceId) { List<ServiceInstance> result = new ArrayList<>(instances.size()); for (Instance instance : instances) { ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId); if (serviceInstance != null) { result.add(serviceInstance); } } return result; } private NamingService namingService() { return nacosServiceManager .getNamingService(discoveryProperties.getNacosProperties()); }
可以看出也是调用NamingService中的方法,那我们看看方法中的具体逻辑
NacosNamingService.getServicesOfServer()
: 通过调用Nacos服务端接口GET /service/list
获取组下所有的服务名称@Override public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException { // Http调用Nacos API获取所有服务 return serverProxy.getServiceList(pageNo, pageSize, groupName, selector); }
NacosNamingService.selectInstances()
: 像上面订阅自己一样订阅指定服务,并返回过滤后的实例@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 是否订阅 if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } // 过滤实例 return selectInstances(serviceInfo, healthy); } private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) { List<Instance> list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList<Instance>(); } Iterator<Instance> iterator = list.iterator(); while (iterator.hasNext()) { Instance instance = iterator.next(); if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) { iterator.remove(); } } return list; }
这两个重要方法的逻辑图:
不管是订阅自己还是订阅指定服务,都是开启定时任务去远程拉取实例,然后存入
HostReactor
中的serviceInfoMap
中。 -
Spring Cloud Gateway服务化路由
上边讲了订阅自己和订阅指定服务的方法,数据源都是serviceInfoMap,那按常理Gateway的服务化路由应该从serviceInfoMap这个数据源中获取实例然后路由。答案是间接用到了serviceInfoMap数据源。
Gateway的服务化路由默认使用的是
Ribbon
,流程图如下:
流程中详细的源码就不在这篇文章说了,后面会写在Spring Cloud Gateway
主题的文章中,敬请期待。
流程的步骤:
- Gateway启动时会定时执行更新allServerList的任务,任务内容是调用NacosNamingService.selectInstances()方法获取实例更新到allServerList变量中,所以说这里间接用到了serviceInfoMap数据源;
- 当Gateway收到请求时会通过LoadBalancerClientFilter,而这个过滤器使用的就是Ribbon的负载均衡逻辑,最后会拿取allServerList。
这里有个扩展点IRule
接口,默认的实现类是PredicateBasedRule,Spring Cloud Alibaba提供了Nacos的负载均衡策略NacosRule
,需要使用的话只要初始化该Bean即可。
NacosRule.choose()
@Override
public Server choose(Object key) {
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
String group = this.nacosDiscoveryProperties.getGroup();
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
String name = loadBalancer.getName();
NamingService namingService = nacosServiceManager
.getNamingService(nacosDiscoveryProperties.getNacosProperties());
// 关键点
List<Instance> instances = namingService.selectInstances(name, group, true);
if (CollectionUtils.isEmpty(instances)) {
LOGGER.warn("no instance in service {}", name);
return null;
}
List<Instance> instancesToChoose = instances;
if (StringUtils.isNotBlank(clusterName)) {
List<Instance> sameClusterInstances = instances.stream()
.filter(instance -> Objects.equals(clusterName, instance.getClusterName()))
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
}
else {
LOGGER.warn(
"A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}",
name, clusterName, instances);
}
}
Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
return new NacosServer(instance);
}
catch (Exception e) {
LOGGER.warn("NacosRule error", e);
return null;
}
}
在使用Nacos负载均衡策略的情况下,Gateway的服务化路由是直接通过调用NacosNamingService.selectInstances()
来获取实例的。
5. 总结
在看Nacos服务发现源码时,可能会出现逻辑之间连不通,因为这里用到了大量的定时任务和Spring订阅发布。但万变不离其宗,无非就是不断地从服务端拉取数据存在本地,然后消费者对此进行调用。但也有疑惑,为什么要订阅自己?只是为了实时更新元数据吗?Nacos做为注册中心是用拉取的方式进行服务订阅,所以用了大量的定时任务,要注意性能问题。
谢谢阅读,就分享到这,未完待续…
欢迎同频共振的那一部分人
作者公众号:Tarzan写bug
更多推荐
所有评论(0)