在dubbo中服务的暴露的逻辑起源于ServiceBean这个类,而这个类的注册激活在spring中可以通过标签也可以通过xml的配置方式进行。下面讲解这两种方式不同之处。

1.xml的方式

在这里讲解GitHub上面最新的版本的Dubbo4.3.16版本
 对spring研究过的肯定知道,spring可以通过自定xsd文件来扩展自定义标签的。对应的标签解析类定义在spring.handlers文件中。关于自定义XSD扩展在前面的Spring源码解析中讲解过,也进行过举例,自定义标签的使用

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

 Dubbo也对Spring进行兼容,所以我们能通过配置文件找到了DubboNamespaceHandler类

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    
    //校验是否有多个不同版本的dubbo包
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    //在这里在注册不同的配置元素对应的解析器。
    @Override
    public void init() {
       ....
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
          ....
    }

}
2.标签的方式

在前面已经讲解过@EnableDubbo这个标签已经对应的@Service和@Reference标签的解析,而在@service标签的解析的过程中,有提到过注册service时候注册的Bean的类型是ServiceBean,不了解的可以看看。Service标签的解析。这里贴一下代码

public class ServiceAnnotationBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
        ResourceLoaderAware, BeanClassLoaderAware {
        ...
    private AbstractBeanDefinition buildServiceBeanDefinition(Service service, Class<?> interfaceClass,
                                                              String annotatedServiceBeanName) {

        BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
        ...
    }
        ... 
        }

可以看到这个时候就对ServiceBean进行了注入,并且在创建的时候设置了其他重要的信息。

1.什么时候进行的服务导出


既然是服务的导出,这里就进入ServiceBean这个类进行分析。ServiceBean实现了ApplicationListener类。ApplicationListener是Spring的一个上下文监听器的接口类,作用是当完成Spring完成某项流程的时候就会发布一个广播事件,如果对这个事件进行了监听就会触发对应的逻辑。也就是监听器模式。

    public void onApplicationEvent(ContextRefreshedEvent event) {
     //如果还没有导出服务并且服务没有被取消导出
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            //进行服务导出操作
            export();
        }
    }

 在上面我们看到了onApplicationEvent方法传入的对象是ContextRefreshedEvent。这个对象是当Spring的上下文被刷新或者加载完毕的时候触发的。因此服务就是在Spring的上下文刷新后进行导出操作的。其中是否延迟导出和服务是否被取消导出都可以进行设置。

2.进行服务导出

 ServiceBean继承于ServiceConfig类(这个类是我们进行service配置的属性的所在的类)。并对export方法进行了重载

    @Override
    public void export() {
        //调用父类的export方法
        super.export();
        // 服务导出完毕之后发布事件 ServiceBeanExportedEvent
        publishExportEvent();
    }

ServiceConfig类

    public synchronized void export() {
        //1.服务发布之前的参数检查和更新
        checkAndUpdateSubConfigs();
        //2.检查是否进行服务的导出
        if (!shouldExport()) {
            return;
        }
        //3.是否需要对服务进行延迟导出,就是使用定时任务对4步骤进行调用
        if (shouldDelay()) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            //4.如果不需要进行延迟发布则现在就进行导出
            doExport();
        }
    }

 现在对上面可以看到整个导出流程进行解析

1.1服务发布之前的参数检查和更新
    public void checkAndUpdateSubConfigs() {
        // 使用在全局配置中明确定义的默认配置,主要是对信息的补全可以理解为,
        // 在我们使用的时候很多配置都是没有配置,他会使用默认的配置对我们没有配置的位置进行补全
        completeCompoundConfigs();
        // 先启动配置中心,这个配置中心是ConfigManager类,其中保存了所有的服务导出和服务引用的配置
        startConfigCenter();
        //检查服务提供配置如果还没有创建就进行创建,并刷新
        checkDefault();
        //检查AppLication配置是否还没有创建,如果还没有创建就进行创建,,并刷新
        checkApplication();
        //同上面一样这里是检查注册列表配置
        checkRegistry();
        //检查协议配置
        checkProtocol();
        //进行刷新
        this.refresh();
        //元数据报告配置
        checkMetadataReport();
        //简介服务的接口名是不是空
        /*
            <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/>
            <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/>
           */
        if (StringUtils.isEmpty(interfaceName)) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
        //如果服务的实现类是GenericService的实现类,
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else {
            //如果服务的实现类不是GenericService的实现类
            try {
                //根据配置的接口地址获取接口的Class对象
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            //检查远程服务接口和方法是否符合Dubbo的要求。主要检查配置文件中配置的方法是否包含在远程服务接口中
            checkInterfaceAndMethods(interfaceClass, methods);
            //reference引用不应该为null,并且是给定接口的实现,即配置中ref指向的Bean是interface的实现类
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        //local指的是服务接口的本地impl类名
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                //获取实现类的Class,是对Class.forName()的一种替换
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(localClass)) {
                throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
            }
        }
        //stub 服务接口类的本地代理类名,用于在客户端执行本地逻辑,如本地缓存等,该本地代理类的构造函数必须允许传入远程代理对象。local 和 stub 在功能应该是一致的,用于配置本地存根
        if (stub != null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub";
            }
            Class<?> stubClass;
            try {
                //获取代理类的Class,
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(stubClass)) {
                throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
            }
        }
        //检查实现类和代理类是不是服务接口类的实现
        checkStubAndLocal(interfaceClass);
        //
        checkMock(interfaceClass);
    }

可以看到这里的逻辑主要就是对配置的读取和设置,和一些类关系的检查

2和3. 对服务导出相关的检查就是对配置的读取

 这里的检查主要是对配置的获取

    private boolean shouldExport() {
        Boolean shouldExport = getExport();
        if (shouldExport == null && provider != null) {
            shouldExport = provider.getExport();
        }

        // default value is true
        if (shouldExport == null) {
            return true;
        }

        return shouldExport;
    }
    
    private boolean shouldDelay() {
        Integer delay = getDelay();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        }
        return delay != null && delay > 0;
    }    
4.服务的导出

 在 doExportUrls 方法中对多协议,多注册中心进行了支持。

     protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }  
    private void doExportUrls() {
        //1.加载注册中心链接
        List<URL> registryURLs = loadRegistries(true);
        //遍历 protocols,并在每个协议下导出服务
        for (ProtocolConfig protocolConfig : protocols) {
            //拼接path,group跟version组成一个pathKey(serviceName)
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            //将相关的信息封装比如:接口类的Class对象,实现类对象,pathKey(serviceName)跟暴露的服务方法methodsToExport
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            //将providerModel保存到providedServices中
            ApplicationModel.initProviderModel(pathKey, providerModel);
            //2.组装URL
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
4.1加载注册中心链接loadRegistries方法

 这个方法作用是加载注册表并将其转换为URL类型,优先级顺序为:system property> dubbo registry config。话不多说,直接上代码

    protected List<URL> loadRegistries(boolean provider) {
        // check && override if necessary
        List<URL> registryList = new ArrayList<URL>();
        if (CollectionUtils.isNotEmpty(registries)) {
            //遍历配置中心的配置列表
            for (RegistryConfig config : registries) {
                //获取配置中心地址
                String address = config.getAddress();
                //如果address是空的则用默认的0.0.0.0
                if (StringUtils.isEmpty(address)) {
                    address = Constants.ANYHOST_VALUE;
                }
                //如果address是合法的;NO_AVAILABLE="N/A"
                if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    Map<String, String> map = new HashMap<String, String>();
                    // 添加 ApplicationConfig 中的字段信息到 map 中
                    appendParameters(map, application);
                    // 添加 RegistryConfig 字段信息到 map 中
                    appendParameters(map, config);
                    // 添加 path到 map 中
                    map.put(Constants.PATH_KEY, RegistryService.class.getName());
                    //添加dubbo版本,release版本,系统时间timestamp和pid到map中
                    appendRuntimeParameters(map);
                    //如果没有定义protocol类型,则使用默认的dubbo协议
                    if (!map.containsKey(Constants.PROTOCOL_KEY)) {
                        map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL);
                    }
                    // 解析得到 URL 列表,address 可能包含多个注册中心 ip,
                    // 因此解析得到的是一个 URL 列表
                    List<URL> urls = UrlUtils.parseURLs(address, map);
                    //便利URl
                    for (URL url : urls) {
                        // 将 URL 协议的protocol设置为 registry
                        url = URLBuilder.from(url)
                                .addParameter(Constants.REGISTRY_KEY, url.getProtocol())
                                .setProtocol(Constants.REGISTRY_PROTOCOL)
                                .build();
                        // 通过判断条件,决定是否添加 url 到 registryList 中,条件满足以下两个之一就可以:
                        //  1. (服务提供者 && register = true 或 null)
                        // 2. (非服务提供者 && subscribe = true 或 null)
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        return registryList;
    }
4.2组装URL并导出服务

 URL是Dubbo配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递。因此URL组装的过程比较复杂

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //获取协议的类型名
        String name = protocolConfig.getName();
        //没有则为默认的dubbo
        if (StringUtils.isEmpty(name)) {
            name = Constants.DUBBO;
        }
        Map<String, String> map = new HashMap<String, String>();
         添加 side、版本、时间戳以及进程号等信息到 map 中,通过反射将对象的字段信息添加到 map 中
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        appendRuntimeParameters(map);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        // methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
        if (CollectionUtils.isNotEmpty(methods)) {
            for (MethodConfig method : methods) {
                //将method封装到map中
                appendParameters(map, method, method.getName());
                //获取method中的retry属性
                String retryKey = method.getName() + ".retry";
                //如果map中包含retry,则移除,如果设置retry为false则设置retries属性为0
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                //如果方法的参数配置列表不为空
                if (CollectionUtils.isNotEmpty(arguments)) {
                    //获取 ArgumentConfig 列表
                    for (ArgumentConfig argument : arguments) {
                        // 参数类型不是null并且不是空字符串
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            //通过反射获取 interfaceClass 的方法列表
                            Method[] methods = interfaceClass.getMethods();
                            // 如果方法不为空
                            if (methods != null && methods.length > 0) {
                                //便利所有的方法
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // 用反射得到的方法名对比MethodConfig中的method,查找目标方法,
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        //如果参数列表不是空
                                        if (argument.getIndex() != -1) {
                                            //根据获取的方法的参数类型跟ArgumentConfig中的参数类型比较,如果相同就将ArgumentConfig 字段信息到 map 中,或抛出异常
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // 如果存在多个回调方法
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                // 从参数类型列表中查找类型名称为 argument.type 的参数,然后封装
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                            // 用户未配置 type 属性,但配置了 index 属性,且 index != -1
                        } else if (argument.getIndex() != -1) {
                            // 添加 ArgumentConfig 字段信息到 map 中
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }
        //当前服务是不是基础服务类是的则添加不同的信息
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            //
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
            //为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            // 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 method = init,destroy
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                // 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        //添加 token 到 map 中
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }
        // 获取注册中心的ip和服务注册的ip,并保存到map中
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        // 获取服务注册的端口,并保存到map中
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        //根据map中封装的信息组装成一个URL
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(Constants.SCOPE_KEY);
        // 如果 scope = none,则什么都不做
        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
            // scope != remote,导出到本地
            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            // scope != local,导出到远程
            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        // 加载监视器链接
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            // 将监视器链接作为参数添加到 url 中
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }
                        // 为服务提供类(ref)生成 Invoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 导出服务,并生成 Exporter
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    //导出服务
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }

4.2.1到处服务到本地

 服务导出到本地的时候,即配置的scope不是remote的时候。

private void exportLocal(URL url) {
    // 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
            .setProtocol(Constants.LOCAL_PROTOCOL)    // 设置协议头为 injvm
            .setHost(LOCALHOST)
            .setPort(0);
        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
        // 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
        Exporter<?> exporter = protocol.export(
            proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
    }
}

 exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 创建 InjvmExporter
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
4.2.2服务导出前invoker的创建

 这里引用Dubbo官方文档中的说明:Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        //为目标类创建 Wrapper,但是包装器无法正确处理此方案:类名包含'$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
               // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    
//------Wrapper类
public static Wrapper getWrapper(Class<?> c) {
        //如果是动态代理的类,则获取他的父类
        while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
        {
            c = c.getSuperclass();
        }
        //如果是Object类,则按照Object类来创建
        if (c == Object.class) {
            return OBJECT_WRAPPER;
        }
        //从缓存中获取 Wrapper 实例
        Wrapper ret = WRAPPER_MAP.get(c);
        if (ret == null) {
            // 缓存未命中,创建 Wrapper
            ret = makeWrapper(c);
            // 写入缓存
            WRAPPER_MAP.put(c, ret);
        }
        return ret;
    }
//----------Wrapper类
    private static Wrapper makeWrapper(Class<?> c) {
        //检测 c 是否为基本类型,若是则抛出异常
        if (c.isPrimitive()) {
            throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
        }
        String name = c.getName();
        ClassLoader cl = ClassHelper.getClassLoader(c);
        // c1 用于存储 setPropertyValue 方法代码
        StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
        // c2 用于存储 getPropertyValue 方法代码
        StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
        // c3 用于存储 invokeMethod 方法代码
        StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
        // 生成类型转换代码及异常捕捉代码,比如:
        c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
        c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
        c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
        // pts 用于存储成员变量名和类型
        Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types>
        // ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例
        Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance>
        // mns 为方法名列表
        List<String> mns = new ArrayList<>(); // method names.
        //dmns 用于存储“定义在当前类中的方法”的名称
        List<String> dmns = new ArrayList<>(); // declaring method names.
        //  获取 public 访问级别的字段,并为所有字段生成条件判断语句
        for (Field f : c.getFields()) {
            String fn = f.getName();
            Class<?> ft = f.getType();
            // 忽略关键字 static 或 transient 修饰的变量
            if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
                continue;
            }
            // 生成条件判断及赋值语句,比如:
            // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
            // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
            c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
            // 生成条件判断及返回语句,比如:
            // if( $2.equals("name") ) { return ($w)w.name; }
            c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
            // 存储 <字段名, 字段类型> 键值对到 pts 中
            pts.put(fn, ft);
        }

        Method[] methods = c.getMethods();
        // 检测 c 中是否包含在当前类中声明的方法
        boolean hasMethod = hasMethods(methods);
        if (hasMethod) {
            c3.append(" try{");
            for (Method m : methods) {
                // 忽略 Object 中定义的方法
                if (m.getDeclaringClass() == Object.class) {
                    continue;
                }
                String mn = m.getName();
                // 生成方法名判断语句,比如:
                // if ( "sayHello".equals( $2 )
                c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
                int len = m.getParameterTypes().length;
                // 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
                // && $3.length == 2
                c3.append(" && ").append(" $3.length == ").append(len);
                boolean override = false;
                for (Method m2 : methods) {
                    // 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
                    if (m != m2 && m.getName().equals(m2.getName())) {
                        override = true;
                        break;
                    }
                }
                // 对重载方法进行处理,考虑下面的方法:
                //    1. void sayHello(Integer, String)
                //    2. void sayHello(Integer, Integer)
                // 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
                // 需要进一步判断方法的参数类型
                if (override) {
                    if (len > 0) {
                        for (int l = 0; l < len; l++) {
                            // 生成参数类型进行检测代码,比如:
                            // && $3[0].getName().equals("java.lang.Integer")
                            //    && $3[1].getName().equals("java.lang.String")
                            c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
                                    .append(m.getParameterTypes()[l].getName()).append("\")");
                        }
                    }
                }
                // 添加 ) {,完成方法判断语句,此时生成的代码可能如下(已格式化):
                // if ("sayHello".equals($2)
                //     && $3.length == 2
                //     && $3[0].getName().equals("java.lang.Integer")
                //     && $3[1].getName().equals("java.lang.String")) {
                c3.append(" ) { ");
                // 根据返回值类型生成目标方法调用语句
                if (m.getReturnType() == Void.TYPE) {
                    // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
                    c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
                } else {
                    // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
                    c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
                }
                // 添加 }, 生成的代码形如(已格式化):
                // if ("sayHello".equals($2)
                //     && $3.length == 2
                //     && $3[0].getName().equals("java.lang.Integer")
                //     && $3[1].getName().equals("java.lang.String")) {
                //
                //     w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
                //     return null;
                // }
                c3.append(" }");
                // 添加方法名到 mns 集合中
                mns.add(mn);
                // 检测当前方法是否在 c 中被声明的
                if (m.getDeclaringClass() == c) {
                    // 若是,则将当前方法名添加到 dmns 中
                    dmns.add(mn);
                }
                ms.put(ReflectUtils.getDesc(m), m);
            }
            // 添加异常捕捉语句
            c3.append(" } catch(Throwable e) { ");
            c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
            c3.append(" }");
        }
        // 添加 NoSuchMethodException 异常抛出代码
        c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

        // 处理 get/set 方法
        Matcher matcher;
        for (Map.Entry<String, Method> entry : ms.entrySet()) {
            String md = entry.getKey();
            Method method = entry.getValue();
            // 匹配以 get 开头的方法
            if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
                // 获取属性名
                String pn = propertyName(matcher.group(1));
                // 生成属性判断以及返回语句,示例如下:
                // if( $2.equals("name") ) { return ($w).w.getName(); }
                c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
                pts.put(pn, method.getReturnType());
                // 匹配以 is/has/can 开头的方法
            } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
                String pn = propertyName(matcher.group(1));
                // 生成属性判断以及返回语句,示例如下:
                // if( $2.equals("dream") ) { return ($w).w.hasDream(); }
                c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
                pts.put(pn, method.getReturnType());
                // 匹配以 set 开头的方法
            } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
                Class<?> pt = method.getParameterTypes()[0];
                String pn = propertyName(matcher.group(1));
                // 生成属性判断以及 setter 调用语句,示例如下:
                // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
                c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
                pts.put(pn, pt);
            }
        }
        // 添加 NoSuchPropertyException 异常抛出代码
        c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
        c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
        long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
        // 创建类生成器
        ClassGenerator cc = ClassGenerator.newInstance(cl);
        // 设置类名及超类
        cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
        cc.setSuperClass(Wrapper.class);
    // 添加默认构造方法
        cc.addDefaultConstructor();
        // 添加字段
        cc.addField("public static String[] pns;"); // property name array.
        cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
        cc.addField("public static String[] mns;"); // all method name array.
        cc.addField("public static String[] dmns;"); // declared method name array.
        for (int i = 0, len = ms.size(); i < len; i++) {
            cc.addField("public static Class[] mts" + i + ";");
        }
        // 添加方法代码
        cc.addMethod("public String[] getPropertyNames(){ return pns; }");
        cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
        cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
        cc.addMethod("public String[] getMethodNames(){ return mns; }");
        cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
        cc.addMethod(c1.toString());
        cc.addMethod(c2.toString());
        cc.addMethod(c3.toString());

        try {
            // 生成类
            Class<?> wc = cc.toClass();
             // 设置字段值
            wc.getField("pts").set(null, pts);
            wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
            wc.getField("mns").set(null, mns.toArray(new String[0]));
            wc.getField("dmns").set(null, dmns.toArray(new String[0]));
            int ix = 0;
            for (Method m : ms.values()) {
                wc.getField("mts" + ix++).set(null, m.getParameterTypes());
            }
            // 创建 Wrapper 实例
            return (Wrapper) wc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Throwable e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            cc.release();
            ms.clear();
            mns.clear();
            dmns.clear();
        }
    }

 其中cc.toClass()这个方法是对Javassist包的toClass方法的封装。对于Javassist可以去网上搜索相关的说明。

4.2.3服务导出

export方法定义在Protocol接口中,实现类很多。直接进入到RegistryProtocol类中。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //从Invoker中获取注册中心的URl
        URL registryUrl = getRegistryUrl(originInvoker);
        // 从Invoker中服务提供的URL
        URL providerUrl = getProviderUrl(originInvoker);
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the  subscription information to cover.
        //获取服务 URL,保存新的服务提供者url,此时可能会有覆盖的情况,订阅是带有服务名称的缓存密钥,它会导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        // 创建监听器
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        //在服务导出之前获取现有配置规则并覆盖提供者URL
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //1.导出服务--------------------
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        //根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 获取已注册的服务提供者 URL,比如:
        // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        // 向服务提供者与消费者注册表中注册服务提供者
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //获取 register 参数
        boolean register = registeredProviderUrl.getParameter("register", true);
        // 根据 register 的值决定是否注册服务
        if (register) {
            // 向注册中心注册服务
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }
        // 向注册中心进行订阅 override 数据
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //设置注册中心的URL
        exporter.setRegisterUrl(registeredProviderUrl);
        //设置新的服务提供的URL
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

 上面逻辑主要有以下几步:


1.从用配置的ref对象生成的Invoker中获取,注册中心的URL跟服务提供方的URL;


2.调用doLocalExport导出服务;


3.向注册中心注册服务;


4.向注册中心进行订阅 override 数据;


5.创建并返回 DestroyableExporter

1.进入到doLocalExport方法进行分析
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            // 创建 Invoker 为委托类对象
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            // 调用 protocol 的 export 方法导出服务
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

 这里调用的export方法会根据使用的协议类型来进入不同的Protocol实现类进行调用,我们常用的是dubbo协议,我们就直接进入DubboProtocol类进行分析。逻辑比较复杂。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //从包装过的invoker中获取providerUrl
        URL url = invoker.getUrl();
        // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
        // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        String key = serviceKey(url);
        // 创建 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 将 <key, exporter> 键值对放入缓存中
        exporterMap.put(key, exporter);
        // 保存相关的信息一份到本地内存中
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        // 启动服务器
        openServer(url);
        // 优化序列化
        optimizeSerialization(url);

        return exporter;
    }
//--------------
    private void openServer(URL url) {
        // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        //服务提供方是服务的时候才能够导出
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            // 访问缓存
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    // 创建服务器实例
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                // 服务器已创建,则根据 url 中的配置重置服务器
                server.reset(url);
            }
        }
    }
    
//----------
    private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                //添加服务器关闭时发送readonly事件,默认情况下启用
                .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                // 添加心跳检测配置到 url 中
                .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))
                // 添加编码解码器参数
                .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
                .build();
        // 获取 server 参数,当没有指定的时候使用默认值默认为 netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
        // 通过 SPI 检测是否存在 server 参数所代表的 Transporter(传输层) 拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 创建 ExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        // 获取 client 参数,可指定 netty,mina
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
            // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }
//----------Exchangers类
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        // 获取 Exchanger,默认为 HeaderExchanger。
        // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
//------HeaderExchanger类
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
        //   1. new HeaderExchangeHandler(handler)
        //   2. new DecodeHandler(new HeaderExchangeHandler(handler))
        //   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
//--------Transporters类
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 获取自适应 Transporter 实例,并调用实例方法
        return getTransporter().bind(url, handler);
    }
//-------由于默认的使用的是netty所以这里进入NettyTransporter,这个类有两个包分别是对netty不同版本的支持,我们选择对netty4.0支持的类
    
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    //创建NettyServer
        return new NettyServer(url, listener);
    }
    
//----最终调用父类的AbstractServer类的AbstractServer方法
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        //设置基本属性
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        // 获取 ip 和端口
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            // 设置 ip 为 0.0.0.0
            bindIp = Constants.ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 获取最大可接受连接数
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            // 调用模板方法 doOpen 启动服务器
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
//---------doOpen方法
    protected void doOpen() throws Throwable {
        // 创建 ServerBootstrap
        bootstrap = new ServerBootstrap();
        // 创建 boss 和 worker 线程池
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        //  绑定到指定的 ip 和端口上
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }
4.2.4服务注册

 在RegistryProtocol的export方法中有一个 向注册中心注册服务的方法register

    public void register(URL registryUrl, URL registeredProviderUrl) {
        // 获取 Registry
        Registry registry = registryFactory.getRegistry(registryUrl);
        // 注册服务
        registry.register(registeredProviderUrl);
    }

 第一个方法getRegistry方法在AbstractRegistryFactory类中实现

   public Registry getRegistry(URL url) {
        //创建URL
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            // 访问缓存
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 缓存未命中,创建 Registry 实例
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            // 写入缓存
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

 其中createRegistry有很多实现,这里选择ZookeeperRegistryFactory

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    // zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive

    private ZookeeperTransporter zookeeperTransporter;

    /**
     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

&esmp;进入ZookeeperRegistry方法

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //获取组名,如果为空则为默认值dubbo
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        //如果组名不是以“/”开头则需要加上
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            // group = "/" + group
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
        zkClient = zookeeperTransporter.connect(url);
        // 添加状态监听器
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

 进入AbstractZookeeperTransporter类的connect方法

    public ZookeeperClient connect(URL url) {
        ZookeeperClient zookeeperClient;
        List<String> addressList = getURLBackupAddress(url);
        // The field define the zookeeper server , including protocol, host, port, username, password
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        // avoid creating too many connections, so add lock
        synchronized (zookeeperClientMap) {
            if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
                logger.info("find valid zookeeper client from the cache for address: " + url);
                return zookeeperClient;
            }

            zookeeperClient = createZookeeperClient(toClientURL(url));
            logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
            writeToClientMap(addressList, zookeeperClient);
        }
        return zookeeperClient;
    }

都比较容易理解就是创建zk的连接,先从缓存拿,拿不到就创建,然后继续查看CuratorZookeeperClient这个创建方法

    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            // 创建 CuratorFramework 构造器
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(timeout);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            // 构建 CuratorFramework 实例
            client = builder.build();
            // 添加监听器
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            // 启动客户端
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

也可以去官网查看解析

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐