dubbo源码:dubbo group解析
文章目录背景源码解析1. 服务端启动2. 消费端启动2.1 引用服务生成MockClusterInvoker过程2.2 生成MockClusterInvoker后的check2.2.1 消费者在订阅服务提供者后,刷新invoker列表之前会构造消费URL对应的服务者提供URL2.2.2 ZookeeperRegistry类中doSubscribe通知刷新invoker列表3.总结背景工程中经常..
文章目录
背景
工程中经常见到不同的环境(如dev、test、pre、prod等),配置示例如下:
// 服务端
<dubbo:application name="debbo-provider" owner="programmer" organization="dubbox"/>
<dubbo:registry address="zookeeper://localhost:2181"/>
<dubbo:protocol name="dubbo" port="20880"/>
<dubbo:service interface="service.DemoService" ref="demoService" protocol="dubbo" group="dubbo_test"/>
<bean id="demoService" class="Impl.DemoServiceImpl"/>
// 消费端
<dubbo:application name="dubbo-consumer" owner="programmer" organization="dubbox"/>
<dubbo:registry address="zookeeper://localhost:2181"/>
<dubbo:reference id="permissionService" interface="service.DemoService" group="dubbo_test"/>
源码解析
1. 服务端启动
服务启动时会把配置信息加载到ServiceConfig中,ServiceConfig继承于AbstractServiceConfig,服务端配置的group属性就存放在AbstractServiceConfig类的 group 属性中(此时group值为"dubbo_test")
暴露本地服务
本地暴露的url是local变量:
injvm://127.0.0.1/service.DemoService?anyhost=true&application=debbo-provider&dubbo=2.5.0&group=dubbo_test&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=4730&side=provider×tamp=1554218524916
启动时会把本地url的serviceKey和url本身用于封装InjvmExporter,其中 local.getServiceKey()值是:
dubbo_test/service.DemoService
暴露远程服务
注册url,即 registryURL 变量值:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=debbo-provider&dubbo=2.5.0&export=dubbo%3A%2F%2F192.168.1.102%3A20880%2Fservice.DemoService%3Fanyhost%3Dtrue%26application%3Ddebbo-provider%26dubbo%3D2.5.0%26group%3Ddubbo_test%26interface%3Dservice.DemoService%26methods%3DgetPermission%26organization%3Ddubbox%26owner%3Dprogrammer%26pid%3D4730%26side%3Dprovider%26timestamp%3D1554218524916&organization=dubbox&owner=programmer&pid=4730®istry=zookeeper×tamp=1554218524735
registryURL 会被封装到 AbstractProxyInvoker 实例类中,RegistryProtocol利用该实例通过export方法重新构造出Exporter对象。
2. 消费端启动
消费端启动时会把消费端配置解析到ReferenceConfig文件中,ReferenceConfig继承于AbstractReferenceConfig,消费端配置的group属性就存放在AbstractReferenceConfig类的 group 属性中(此时group值为"dubbo_test")
2.1 引用服务生成MockClusterInvoker过程
消费端在引用服务时会把配置参数组装到 ReferenceConfig 中的 urls 属性中,值即:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.5.0&organization=dubbox&owner=programmer&pid=4769&refer=application%3Ddubbo-consumer%26dubbo%3D2.5.0%26group%3Ddubbo_test%26interface%3Dservice.DemoService%26methods%3DgetPermission%26organization%3Ddubbox%26owner%3Dprogrammer%26pid%3D4769%26side%3Dconsumer%26timestamp%3D1554220185130®istry=zookeeper×tamp=1554220496811
然后由调用RegistryProtocol的refer方法引用远程服务,源码如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//省略部分代码......
// group="a,b" or group="*"
Map<String, String> qs=StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 获取消费端配置的group,这里是“dubbo_test”
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1 || "*".equals( group ) ) { // 若配置的group形式是 group="a,b" or group="*"
return doRefer( getMergeableCluster(), registry, type, url );
}
}
// 若只配置一个group
return doRefer(cluster, registry, type, url);
}
这里 url.getParameterAndDecoded(Constants.REFER_KEY) 得:
application=dubbo-consumer&dubbo=2.5.0&group=dubbo_test&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=4776&side=consumer×tamp=1554220877806
然后把上面的字符串解析成key-value形式放到map中,即 qs 的变量值是:
这样得到 group 的值即"dubbo_test" 。
若配置的group形式是 group=“a,b” or group="*",则调用getMergeableCluster方法,即MergeableCluster实例,否则(即只配置了一个group)会生成Cluster的Adaptive字节码。doRefer源码如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
doRefer方法中check分组是在cluster的join方法中进行的,此时由于消费端配置的仅一个分组,即"dubbo_test",所以可以直接看cluster的适配类字节码如下:
package com.alibaba.dubbo.rpc.cluster;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
上面字节码会依次调用MockClusterWrapper、FailoverCluster的join方法,FailoverCluster的join方法构造出FailoverClusterInvoker并返回。由此可见,对group的check没有在引用服务生成MockClusterInvoker过程中。
2.2 生成MockClusterInvoker后的check
该过程发生在引用服务生成MockClusterInvoker过程之后,生成代理之前,ReferenceConfig类createProxy方法中,源码如下:
private T createProxy(Map<String, String> map) {
//省略很多代码.......
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
即调用MockClusterInvoker的isAvailable方法时,会直接调用RegistryDirectory的isAvailable方法(源码如下),
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
for (Invoker<T> invoker : new ArrayList<Invoker<T>>(localUrlInvokerMap.values())) {
if (invoker.isAvailable()) {
return true;
}
}
}
return false;
}
RegistryDirectory中的isDestroyed方法是父类AbstractDirectory的方法,但此时RegistryDirectory中的urlInvokerMap为null(RegistryDirectory中的urlInvokerMap用于缓存服务端url和invoker的映射关系,消费端在启动过程中会刷新invoker列表并更新urlInvokerMap中的内容),所以直接返回了false,所以此时check不通过,抛出异常。
2.2.1 消费者在订阅服务提供者后,刷新invoker列表之前会构造消费URL对应的服务者提供URL
消费端在启动过程创建RegistryDirectory并订阅服务提供者,直接看ZookeeperRegistry类的doSubscribe方法源码:
/**
** url:消费端URL,即consumer://10.88.81.54/service.DemoService?application=dubbo-consumer&category=providers,configurators,routers&dubbo=2.5.0&group=dubbo_testt&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=16383&side=consumer×tamp=1554690117459
** listener:RegistryDirectory实例
**/
protected void doSubscribe(final URL url, final NotifyListener listener) {
//......
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
//......
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 重点一
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
/** 重点二,经上面转换后,urls长度为3,内容为:
** [0]:empty://10.88.81.54/service.DemoService?application=dubbo-consumer&category=providers&dubbo=2.5.0&group=dubbo_testt&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=16407&side=consumer×tamp=1554692062537
** [1]:empty://10.88.81.54/service.DemoService?application=dubbo-consumer&category=configurators&dubbo=2.5.0&group=dubbo_testt&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=16407&side=consumer×tamp=1554692062537
** [2]:empty://10.88.81.54/service.DemoService?application=dubbo-consumer&category=routers&dubbo=2.5.0&group=dubbo_testt&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=16407&side=consumer×tamp=1554692062537
** 注册完成后会刷新invoker列表(即根据invokerURL列表转换为invoker列表)
**/
notify(url, listener, urls);
}
private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
if (urls == null || urls.isEmpty()) {
int i = path.lastIndexOf('/');
String category = i < 0 ? path : path.substring(i + 1);
URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
urls.add(empty);
}
return urls;
}
/**
** consumer:consumer://10.88.81.54/service.DemoService?application=dubbo-consumer&category=providers,configurators,routers&dubbo=2.5.0&group=dubbo_testt&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=16407&side=consumer×tamp=1554692062537
** providers长度为1,URL.decode(providers.get(0)):dubbo://10.88.81.54:20880/service.DemoService?anyhost=true&application=debbo-provider&dubbo=2.5.0&group=dubbo_test&interface=service.DemoService&methods=getPermission&organization=dubbox&owner=programmer&pid=16381&side=provider×tamp=1554690111074
*/
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<URL>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
provider = URL.decode(provider);
if (provider.contains("://")) {
URL url = URL.valueOf(provider);
// 重点在这里的匹配
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
return urls;
}
UrlUtils.isMatch:
public static boolean isMatch(URL consumerUrl, URL providerUrl) {
//......
String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
//......
String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
// 返回(其中包含了group的匹配)....
return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
}
2.2.2 ZookeeperRegistry类中doSubscribe通知刷新invoker列表
这里直接看AbstractRegistry类中notify方法源码:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//......
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
// 重点
listener.notify(categoryList);
}
}
RegistryDirectory 中的notify方法源码如下:
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
// 此例子条件不会满足(urls的前缀均为empty,这里条件不成立)
invokerUrls.add(url);
} else {
//.......
}
}
// configurators
if (configuratorUrls != null && configuratorUrls.size() >0 ){
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() >0 ){
List<Router> routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers 此例子invokerUrls为空集合
refreshInvoker(invokerUrls);
}
3.总结
Step 1:生成消费端创建RegistryDirectory并发送订阅请求时,会匹配服务提供者URL,匹配失败时会构造empty URL
Step 2:刷新invoker列表时更新RegistryDirectory中的urlInvokerMap属性
Step 3:生成代理前根据消费端引用的接口对invoker进行check
更多推荐
所有评论(0)