Nacos——服务订阅

1. 前提

当服务提供者注册实例到Nacos服务端后,服务消费者就需要订阅提供者服务来进行调用。常见的服务消费者有Dubbo消费者,Spring Cloud消费者,但由于没有使用过Dubbo和Spring Cloud从Nacos中订阅服务,而使用过Spring Cloud Gateway结合Nacos做服务化路由,所以会结合Spring Cloud Gateway来讲Nacos的服务订阅。

2. maven依赖

spring-cloud-starter-gateway: 2.2.5.RELEASE

spring-cloud-starter-alibaba-nacos-discovery: 2.2.4.RELEASE

nacos-client: 1.4.2

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <exclusions>
        <exclusion>
            <groupId>com.alibaba.nacos</groupId>
            <artifactId>nacos-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.4.2</version>
</dependency>

3. 流程图

在这里插入图片描述

  • 服务提供者注册实例到Nacos服务端集群
  • 网关需要服务化路由时,订阅服务提供者的实例集合
  • 根据负载均衡策略从实例集合中选择一个实例进行路由

4. 源码分析

  • 启动时订阅自己

    NacosDiscoveryClientConfiguration初始化NacosWatch

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
                           matchIfMissing = true)
    public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
                                 NacosDiscoveryProperties nacosDiscoveryProperties,
                                 ObjectProvider<ThreadPoolTaskScheduler> taskExecutorObjectProvider) {
        return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties,
                              taskExecutorObjectProvider);
    }
    

    NacosWatch实现了SmartLifecycle接口,即在Spring容器生命周期启动和销毁时执行相关操作,对应start()stop()

    private final AtomicBoolean running = new AtomicBoolean(false);
    private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
    
    @Override
    public void start() {
        if (this.running.compareAndSet(false, true)) {
            // 事件监听处理器
            EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
                         event -> new EventListener() {
                             @Override
                             public void onEvent(Event event) {
                                 if (event instanceof NamingEvent) {
                                     List<Instance> instances = ((NamingEvent) event)
                                         .getInstances();
                                     // 筛选出当前实例
                                     Optional<Instance> instanceOptional = selectCurrentInstance(
                                         instances);
                                     // 更新元数据
                                     instanceOptional.ifPresent(currentInstance -> {
                                         resetIfNeeded(currentInstance);
                                     });
                                 }
                             }
                         });
    
            // 获取NamingService
            NamingService namingService = nacosServiceManager
                .getNamingService(properties.getNacosProperties());
            try {
                // 订阅自己
                namingService.subscribe(properties.getService(), properties.getGroup(),
                                        Arrays.asList(properties.getClusterName()), eventListener);
            }
            catch (Exception e) {
                log.error("namingService subscribe failed, properties:{}", properties, e);
            }
    
            // 发布HeartbeatEvent事件
            this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
                this::nacosServicesWatch, this.properties.getWatchDelay());
        }
    }
    
    /**
     * 从实例集合中筛选出当前ip和端口的实例
     */
    private Optional<Instance> selectCurrentInstance(List<Instance> instances) {
        return instances.stream()
            .filter(instance -> properties.getIp().equals(instance.getIp())
                    && properties.getPort() == instance.getPort())
            .findFirst();
    }
    
    /**
     * 订阅到的实例元数据不同则更新
     */
    private void resetIfNeeded(Instance instance) {
        if (!properties.getMetadata().equals(instance.getMetadata())) {
            properties.setMetadata(instance.getMetadata());
        }
    }
    
    /**
     * 发布HeartbeatEvent事件用于消费者进行更新操作
     * 比如 Spring Cloud Gateway中的RouteRefreshListener
     */
    public void nacosServicesWatch() {
    
        // nacos doesn't support watch now , publish an event every 30 seconds.
        this.publisher.publishEvent(
            new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
    
    }
    
    @Override
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            // 关闭线程池
            if (this.watchFuture != null) {
                // shutdown current user-thread,
                // then the other daemon-threads will terminate automatic.
                this.taskScheduler.shutdown();
                this.watchFuture.cancel(true);
            }
    
            EventListener eventListener = listenerMap.get(buildKey());
            try {
                NamingService namingService = nacosServiceManager
                    .getNamingService(properties.getNacosProperties());
                // 注销订阅
                namingService.unsubscribe(properties.getService(), properties.getGroup(),
                                          Arrays.asList(properties.getClusterName()), eventListener);
            }
            catch (Exception e) {
                log.error("namingService unsubscribe failed, properties:{}", properties,
                          e);
            }
        }
    }
    

    start()逻辑:声明一个监听处理器eventListener,调用Nacos服务端订阅自己,订阅完后触发eventListener中的逻辑更新自身的元数据,并发布HeartbeatEvent事件。

    接着看看NamingService.subscribe()

    NacosNamingService

    /**
     * 订阅服务
     */
    @Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
        hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
    }
    

    HostReactor

    
    private final Map<String, ServiceInfo> serviceInfoMap;
    
    private final InstancesChangeNotifier notifier;
    
    public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        // 注册监听器
        notifier.registerListener(serviceName, clusters, eventListener);
        // 更新服务
        getServiceInfo(serviceName, clusters);
    }
    
    /**
     * 从serviceInfoMap中获取服务信息
     */
    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
    
        // 从serviceInfoMap中获取Service信息
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    
        // 创建服务
        if (null == serviceObj) {
            // 初始化ServiceInfo
            serviceObj = new ServiceInfo(serviceName, clusters);
    
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    
            updatingMap.put(serviceName, new Object());
            // 从远程拉取数据进行更新
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
    
        } else if (updatingMap.containsKey(serviceName)) {
            // 更新服务
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        // 休眠5秒
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
    
        // 定时更新
        scheduleUpdateIfAbsent(serviceName, clusters);
        // 从serviceInfoMap中获取
        return serviceInfoMap.get(serviceObj.getKey());
    }
    
    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
    
        String key = ServiceInfo.getKey(serviceName, clusters);
    
        return serviceInfoMap.get(key);
    }
    
    private void updateServiceNow(String serviceName, String clusters) {
        try {
            updateService(serviceName, clusters);
        } catch (NacosException e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }
    
    public void updateService(String serviceName, String clusters) throws NacosException {
        // 从serviceInfoMap中获取Service信息做为旧的信息
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            // 远程调用Nacos服务端接口GET /instance/list获取服务信息
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
    
            if (StringUtils.isNotEmpty(result)) {
                // 处理结果更新到serviceInfoMap中
                processServiceJson(result);
            }
        } finally {
            // 唤醒旧数据
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
    
        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
    
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }
    
    public class UpdateTask implements Runnable {
    
        long lastRefTime = Long.MAX_VALUE;
    
        private final String clusters;
    
        private final String serviceName;
    
        /**
             * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
             */
        private int failCount = 0;
    
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }
    
        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }
    
        private void resetFailCount() {
            failCount = 0;
        }
    
        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;
    
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    
                if (serviceObj == null) {
                    updateService(serviceName, clusters);
                    return;
                }
    
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    // 更新服务
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }
    
                lastRefTime = serviceObj.getLastRefTime();
    
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                    .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
    

    InstancesChangeNotifier

    private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
    
    /**
     * 注册监听器
     */
    public void registerListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        // 双重检测锁
        if (eventListeners == null) {
            synchronized (lock) {
                eventListeners = listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = new ConcurrentHashSet<EventListener>();
                    // 加入到listenerMap
                    listenerMap.put(key, eventListeners);
                }
            }
        }
        // 对应服务的监听器集合加入传入的监听器
        eventListeners.add(listener);
    }
    
    
    @Override
    public void onEvent(InstancesChangeEvent event) {
        String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        for (final EventListener listener : eventListeners) {
            final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
            if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
                ((AbstractEventListener) listener).getExecutor().execute(new Runnable() {
                    @Override
                    public void run() {
                        listener.onEvent(namingEvent);
                    }
                });
                continue;
            }
            listener.onEvent(namingEvent);
        }
    }
    

    HostReactor.subscribe()逻辑图

    在这里插入图片描述

    这里有个疑问:为什么要订阅自己?只是为了实时更新元数据?

  • 订阅指定服务

    NacosDiscoveryClientConfiguration中实例化了DiscoveryClient的实现类NacosDiscoveryClient

    @Bean
    public DiscoveryClient nacosDiscoveryClient(
        NacosServiceDiscovery nacosServiceDiscovery) {
        return new NacosDiscoveryClient(nacosServiceDiscovery);
    }
    

    NacosDiscoveryClient中主要有两个方法getInstances()getServices(),分别用来获取服务实例集合和获取Nacos组下所有服务名,而这两个方法实际是调用了NacosServiceDiscovery

    private NacosServiceDiscovery serviceDiscovery;
    
    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
        try {
            return serviceDiscovery.getInstances(serviceId);
        }
        catch (Exception e) {
            throw new RuntimeException(
                "Can not get hosts from nacos server. serviceId: " + serviceId, e);
        }
    }
    
    @Override
    public List<String> getServices() {
        try {
            return serviceDiscovery.getServices();
        }
        catch (Exception e) {
            log.error("get service name from nacos server fail,", e);
            return Collections.emptyList();
        }
    }
    

    NacosServiceDiscovery

    /**
     * 返回Nacos组下所有服务
     */
    public List<String> getServices() throws NacosException {
        String group = discoveryProperties.getGroup();
        ListView<String> services = namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);
        return services.getData();
    }
    
    /**
     * 获取Service下所有实例
     */
    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
        String group = discoveryProperties.getGroup();
        List<Instance> instances = namingService().selectInstances(serviceId, group, true);
        return hostToServiceInstanceList(instances, serviceId);
    }
    
    /**
     * 将Instance对象转换为ServiceInstance
     */
    public static List<ServiceInstance> hostToServiceInstanceList(
        List<Instance> instances, String serviceId) {
        List<ServiceInstance> result = new ArrayList<>(instances.size());
        for (Instance instance : instances) {
            ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);
            if (serviceInstance != null) {
                result.add(serviceInstance);
            }
        }
        return result;
    }
    
    private NamingService namingService() {
        return nacosServiceManager
            .getNamingService(discoveryProperties.getNacosProperties());
    }
    

    可以看出也是调用NamingService中的方法,那我们看看方法中的具体逻辑

    NacosNamingService.getServicesOfServer(): 通过调用Nacos服务端接口GET /service/list获取组下所有的服务名称

    @Override
    public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
        // Http调用Nacos API获取所有服务
        return serverProxy.getServiceList(pageNo, pageSize, groupName, selector);
    }
    

    NacosNamingService.selectInstances(): 像上面订阅自己一样订阅指定服务,并返回过滤后的实例

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
    
        ServiceInfo serviceInfo;
        // 是否订阅
        if (subscribe) {
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                  StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                   StringUtils.join(clusters, ","));
        }
        // 过滤实例
        return selectInstances(serviceInfo, healthy);
    }
    
    private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
    
        Iterator<Instance> iterator = list.iterator();
        while (iterator.hasNext()) {
            Instance instance = iterator.next();
            if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
                iterator.remove();
            }
        }
    
        return list;
    }
    

    这两个重要方法的逻辑图:
    在这里插入图片描述

    不管是订阅自己还是订阅指定服务,都是开启定时任务去远程拉取实例,然后存入HostReactor中的serviceInfoMap中。

  • Spring Cloud Gateway服务化路由

    上边讲了订阅自己和订阅指定服务的方法,数据源都是serviceInfoMap,那按常理Gateway的服务化路由应该从serviceInfoMap这个数据源中获取实例然后路由。答案是间接用到了serviceInfoMap数据源。

    Gateway的服务化路由默认使用的是Ribbon,流程图如下:

在这里插入图片描述

流程中详细的源码就不在这篇文章说了,后面会写在Spring Cloud Gateway主题的文章中,敬请期待。

流程的步骤:

  1. Gateway启动时会定时执行更新allServerList的任务,任务内容是调用NacosNamingService.selectInstances()方法获取实例更新到allServerList变量中,所以说这里间接用到了serviceInfoMap数据源;
  2. 当Gateway收到请求时会通过LoadBalancerClientFilter,而这个过滤器使用的就是Ribbon的负载均衡逻辑,最后会拿取allServerList。

这里有个扩展点IRule接口,默认的实现类是PredicateBasedRule,Spring Cloud Alibaba提供了Nacos的负载均衡策略NacosRule,需要使用的话只要初始化该Bean即可。

NacosRule.choose()

@Override
public Server choose(Object key) {
    try {
        String clusterName = this.nacosDiscoveryProperties.getClusterName();
        String group = this.nacosDiscoveryProperties.getGroup();
        DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
        String name = loadBalancer.getName();

        NamingService namingService = nacosServiceManager
            .getNamingService(nacosDiscoveryProperties.getNacosProperties());
        // 关键点
        List<Instance> instances = namingService.selectInstances(name, group, true);
        if (CollectionUtils.isEmpty(instances)) {
            LOGGER.warn("no instance in service {}", name);
            return null;
        }

        List<Instance> instancesToChoose = instances;
        if (StringUtils.isNotBlank(clusterName)) {
            List<Instance> sameClusterInstances = instances.stream()
                .filter(instance -> Objects.equals(clusterName, instance.getClusterName()))
                .collect(Collectors.toList());
            if (!CollectionUtils.isEmpty(sameClusterInstances)) {
                instancesToChoose = sameClusterInstances;
            }
            else {
                LOGGER.warn(
                    "A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}",
                    name, clusterName, instances);
            }
        }

        Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);

        return new NacosServer(instance);
    }
    catch (Exception e) {
      LOGGER.warn("NacosRule error", e);
        return null;
    }
}

在使用Nacos负载均衡策略的情况下,Gateway的服务化路由是直接通过调用NacosNamingService.selectInstances()来获取实例的。

5. 总结

在看Nacos服务发现源码时,可能会出现逻辑之间连不通,因为这里用到了大量的定时任务和Spring订阅发布。但万变不离其宗,无非就是不断地从服务端拉取数据存在本地,然后消费者对此进行调用。但也有疑惑,为什么要订阅自己?只是为了实时更新元数据吗?Nacos做为注册中心是用拉取的方式进行服务订阅,所以用了大量的定时任务,要注意性能问题。



谢谢阅读,就分享到这,未完待续…

欢迎同频共振的那一部分人

作者公众号:Tarzan写bug

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐