dubbo源码分析:服务引用
服务引用的方式(1)服务直连的方式引用服务(2)基于注册中心引用服务服务引用的入口(1)饿汉式:spring容器启动时调用ReferenceBean#afterPropertiesSet()#getObject(),可以通过dubbo:reference 的 init=true开启饿汉式,默认为懒汉式(2)懒汉式:默认为懒汉式,服务被注入到其他类中时引用,ReferenceBea...
-
服务引用的方式
(1)服务直连的方式引用服务
(2)基于注册中心引用服务 -
服务引用的入口
(1)饿汉式:spring容器启动时调用ReferenceBean#afterPropertiesSet()#getObject(),可以通过dubbo:reference 的 init=true开启饿汉式,默认为懒汉式
(2)懒汉式:默认为懒汉式,服务被注入到其他类中时引用,ReferenceBean#getObject()
从入口分析源码:ReferenceBean#getObject()
ReferenceBean#getObject() --> ReferenceBean#get() --> ReferenceBean#init()
/**
* 整体实现逻辑:
* (1)配置的校验,比如consumer对象是否存在
* (2)从System或配置文件中加载与接口名相对应的配置,并将解析结果赋值给url字段,url字段的作用一般是用于点对点调用
* (3)检查核心配置类是否为空,比如:application、monitor、registries
* (4)将一些配置添加到map中
* (5)处理<dubbo:method> MethodConfig 实例
* (6)创建接口类对应代理类createProxy(map),并赋值给成员变量ref
*/
private void init() {
//省略代码:一些配置的校验
// 创建代理类
ref = createProxy(map);
}
ReferenceConfig#createProxy:
/**
* 创建接口类对应代理对象整体流程:
* (1)根据scope判断是本地引用还是远程引用
* (2)本地引用:构建InjvmInvoker实例
* (3)远程引用
* a. 用户在System或dubbo-resolve.properties文件有指定配置(一般都是直连配置),则使用指定的配置URL
* b. 不满足a,则使用dubbo配置文件中的注册中心URL
* c. 将url都添加到urls中
* d.
* urls列表数量为1,则直接通过自适应Protocol扩展类构建Invoker实例
* urls列表数量大于1,构建每个url对应的invoker并添加到invokers列表,通过Cluster合并多个Invoker
* e. 调用ProxyFactory生成代理类
*
* @param map
* @return
*/
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
isJvmRefer = false;
}
// 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
// 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
// 获取 injvm 配置值
isJvmRefer = isInjvm().booleanValue();
}
// 本地引用
if (isJvmRefer) {
// 生成本地引用 URL,协议为 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 调用 refer 方法构建 InjvmInvoker 实例
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
// 远程引用
else {
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是点对点直连地址,也可能是注册中心URL
// 当需要配置多个 url 时,用分号进行分割,这里会进行切分
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
// 设置接口全限定名为 url 路径
url = url.setPath(interfaceName);
}
// 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
// 最后将合并后的配置设置为 url 查询字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL
//加载获取注册中心url
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 参数到 url 中,并将 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注册中心,抛出异常
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
// 多个注册中心或多个服务提供者,或者两者混合
else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 获取所有的 Invoker
for (URL url : urls) {
// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
// invoker 可用性检查
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建服务代理类
return (T) proxyFactory.getProxy(invoker);
}
以上代码有两个关键点,重点分析这两个点:
(1)refprotocol.refer
(2)proxyFactory.getProxy(invoker)
先来看refprotocol.refer:
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
由于url的protocol为registry,所以调用RegistryProtocol#refer
RegistryProtocol#refer:
/**
* 创建Invoker对象
* (1)url设置协议头protocol,默认为dubbo
* (2)根据url加载对应的Registry实例:DubboRegistry
* (3)从url获取group,根据group决定使用哪个Cluster的实例
* (4)调用doRefer方法生成Invoker
* @param type 扩张接口类Class
* @param url 远程服务的URL地址
* @param <T>
* @return
* @throws RpcException
*/
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-spi&dubbo=2.5.3&pid=35068&refer=application%3Ddubbo-spi%26check%3Dfalse%26dubbo%3D2.5.3%26interface%3Dcom.kl.dubbotest.provider.export.ProviderExport%26methods%3DproviderExport%26pid%3D35068%26retries%3D0%26side%3Dconsumer%26timestamp%3D1564102096811®istry=zookeeper×tamp=1564102096848
// 取 registry 参数值,并将其设置为协议头
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-spi&dubbo=2.5.3&pid=35068&refer=application%3Ddubbo-spi%26check%3Dfalse%26dubbo%3D2.5.3%26interface%3Dcom.kl.dubbotest.provider.export.ProviderExport%26methods%3DproviderExport%26pid%3D35068%26retries%3D0%26side%3Dconsumer%26timestamp%3D1564102096811×tamp=1564102096848
// 获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*" 将 url 查询字符串转为 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 获取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer( getMergeableCluster(), registry, type, url );
}
}
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
继续跟踪:RegistryProtocol#doRefer
/**
* 生成Invoker
* (1)创建RegistryDirectory
* (2)生成消费者链接URL
* (3)向注册中心注册:在consumers目录下新建zk节点
* (4)订阅providers、configurators、routers等节点下的数据
* (5)一个服务会部署在多台机器上,这样就会在providers节点下产生多个节点,就需要Cluster将多个服务节点合并为一个,并生成一个Invoker
* @param cluster
* @param registry
* @param type 扩张接口Class
* @param url
* @param <T>
* @return
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 生成服务消费者链接
//consumer://192.168.0.110/com.kl.dubbotest.provider.export.ProviderExport?application=dubbo-spi&dubbo=2.5.3&interface=com.kl.dubbotest.provider.export.ProviderExport&methods=providerExport&pid=35033&retries=0&side=consumer×tamp=1564101892793
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
// 注册服务消费者,在 consumers 目录下新建节点
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 订阅 providers、configurators、routers 等节点数据, RegistryDirectory会收到这几个节点下的子节点信息
//消费者和提供者建立nio连接也在directory.subscribe完成
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
//一个服务会部署在多台机器上,这样就会在providers产生多个节点,就需要Cluster将多个服务节点合并为一个,并生成一个Invoker
//由于dubbo的aop,所以会调MockClusterWrapper#join
//返回的是MockClusterInvoker<T> (RegistryDirectory, FailoverClusterInvoker)对象
return cluster.join(directory);
}
//Cluster$Adpative#join
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
//通过cluster获取集群策略,默认是failover
//本例是使用failover机制
String extName = url.getParameter("cluster", "failover");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
//通过spi这里得到FailoverCluster对象
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
继续跟踪:cluster.join(directory),由于dubbo的aop,cluster是通过SPI得到的MockClusterWrapper,来看MockClusterWrapper#join
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
//cluster为Cluster$Adpative#join,FailoverClusterInvoker
return new MockClusterInvoker<T>(directory, this.cluster.join(directory));
}
小结:refprotocol#refer
这个方法,会返回一个MockClusterInvoker(FailoverClusterInvoker)
继续来看上边说到的第2个关键点:proxyFactory.getProxy(invoker),其实是通过javaassit生成代理类Proxy01
Proxy01代理类内部封装了:refprotocol#refer生成的MockClusterInvoker(FailoverClusterInvoker),调用Invoker的invoke方法的逻辑
//--------------- Proxy0
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
import java.lang.reflect.InvocationHandler;
public class Proxy0 extends Proxy implements DC {
public Object newInstance(InvocationHandler var1) {
return new proxy01(var1);
}
public Proxy0_my() {
}
}
//--------------- Proxy01
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.rpc.service.EchoService;
import demo.dubbo.api.DemoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy01 implements ClassGenerator.DC, EchoService, DemoService {
public static Method[] methods;
//new InvokerInvocationHandler(invoker), invoker:refprotocol#refer生成的MockClusterInvoker(FailoverClusterInvoker)
private InvocationHandler handler;
//实现了接口方法
public String sayHello(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = null;
try {
var3 = this.handler.invoke(this, methods[1], var2);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (String)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = null;
try {
var3 = this.handler.invoke(this, methods[3], var2);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (Object)var3;
}
public proxy01() {
}
//public 构造函数,这里handler是
//由Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))语句传入的InvokerInvocationHandler对象
public proxy01(InvocationHandler var1) {
this.handler = var1;
}
}
//InvokerInvocationHandler类
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler){
//通过构造函数传入Invoker,Invoker内部封装了服务方法的调用
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//如果是Object类方法
if (method.getDeclaringClass() == Object.class) {
//反射调用
return method.invoke(invoker, args);
}
//对3个特殊方法的调用,做了处理
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//其他业务方法通过invoker.invoke方法调用(***看这里***)
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
上边的分析中好像只有建立消费者的zk节点和监听,并没有看到消费者与提供者建立nio连接,实际上也在消费者监听部分代码中
来看:RegistryDirectory#subscribe --> FailbackRegistry#subscribe --> ZookeeperRegistry#doSubscribe
FailbackRegistry#subscribe:
(1)订阅:ZookeeperRegistry#doSubscribe
(2)订阅失败,则会添加到定时任务中进行重试
ZookeeperRegistry#doSubscribe:
/**
* (1)对providers/routers/configurator三个节点进行创建和子节点监听
* (2)调用notify(url,listener,urls) 将已经可用的列表进行通知
* @param url
* @param listener
*/
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
if (! anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
anyServices.addAll(services);
for (String service : services) {
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
在ZookeeperRegistry#doSubscribe的代理逻辑里边会有“消费者建立与提供者的nio连接”,最终调用的逻辑是DubboProtocol#refer:
DubboProtocol#refer:
/**
* 在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。
* Invoker是由Protocol实现类构建而来
* @param serviceType 扩张接口类Class
* @param url 远程服务的URL地址
* @param <T>
* @return
* @throws RpcException
*/
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker. 创建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
DubboProtocol#getClients:
/**
* ExchangeClient不具备通信能力,底层的NettyClient具备通信能力
* (1)判断是创建共享连接还是多个连接(每个服务每个连接),默认为共享连接
* (2)创建连接ExchangeClient
* @param url
* @return
*/
private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 创建新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
DubboProtocol#getSharedClient:
/**
* 获取共享连接
* (1)从缓存获取ExchangeClient
* (2)缓存获取不到,调initClient(url)创建ExchangeClient,并使用ReferenceCountExchangeClient包装ExchangeClient
* (3)置入缓存
*/
private ExchangeClient getSharedClient(URL url){
String key = url.getAddress();
// 获取带有“引用计数”功能的 ExchangeClient
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
if ( !client.isClosed()){
// 增加引用计数
client.incrementAndGetCount();
return client;
} else {
// logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client));
referenceClientMap.remove(key);
}
}
// 创建 ExchangeClient 客户端
ExchangeClient exchagneclient = initClient(url);
// 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
DubboProtocol#initClient:
/**
* 创建新连接.ExchangeClient实例
*/
private ExchangeClient initClient(URL url) {
// client type setting. 获取客户端类型,默认为 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
// 添加编解码参数到url中
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//添加心跳包参数到url中,默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
// 检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client ;
try {
//设置连接应该是lazy的
// 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url ,requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url ,requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url
+ "): " + e.getMessage(), e);
}
return client;
}
Exchangers#connect:
/**
* 创建ExchangeClient实例
* @param url
* @param handler
* @return
* @throws RemotingException
*/
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger 实例,默认为 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
HeaderExchanger#connect:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 这里包含了多个调用,分别如下:
// 1. 创建 HeaderExchangeHandler 对象
// 2. 创建 DecodeHandler 对象
// 3. 通过 Transporters 构建 Client 实例
// 4. 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporters#connect:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例
return getTransporter().connect(url, handler);
}
NettyTransporter#connect:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 创建 NettyClient 对象
return new NettyClient(url, listener);
}
更多推荐
所有评论(0)