前言:

    Ribbon作为客户端负载均衡的一种手段,被广泛应用在微服务项目中。

    有关于Ribbon的介绍和使用方式,读者可参考笔者的另一篇文章 https://blog.csdn.net/qq_26323323/article/details/78668776  

    本文主要介绍基于@LoadBalanced的RestTemplate来实现的负载均衡的源码解析

 

1.@LoadBalanced分析

    maven引入ribbon依赖后,

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-ribbon</artifactId>
    <version>1.3.1.RELEASE</version>
</dependency>

    只需要在RestTemplate的bean上面加入@LoadBalanced注解即可在使用RestTemplate发送HTTP请求时,自动实现负载均衡调用

@LoadBalanced
@Bean
public RestTemplate restTemplate(){
    return new RestTemplate();
}

    由此可见,主要的实现应该就在@LoadBalanced上,那我们就来分析下其源码

    @LoadBalanced源码如下:

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

    再次让大家失望了,这个注解没有什么特别之处,也没有像往常的注解引入其他类。按照常规惯例,我们就到引入的jar包下看下META-INF/spring.factories文件有没有引入什么特别的类。我们就到@LoadBalanced所在的jar包spring-cloud-commons-1.2.2.RELEASE-sources.jar来看下其spring.factories文件

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\
org.springframework.cloud.client.serviceregistry.ServiceRegistryAutoConfiguration,\
org.springframework.cloud.commons.util.UtilAutoConfiguration,\
org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration


# Environment Post Processors
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.client.HostInfoEnvironmentPostProcessor

    AutoConfiguration引入的类都会在项目启动时被添加到Spring容器中,与LoadBalanced相关的的有两个,直觉上LoadBalancerAutoConfiguration更像我们需要看的类(当然,不是直觉,是笔者看了类源码之后写的决定,哈哈),那么我们来详细分析下这个类,看看是否有惊喜

 

2.LoadBalancerAutoConfiguration

 * Copyright 2013-2017 the original author or authors.

package org.springframework.cloud.client.loadbalancer;
...
@Configuration
@ConditionalOnClass(RestTemplate.class)// 当前环境需要有RestTemplate.class
@ConditionalOnBean(LoadBalancerClient.class)// 需要当前环境有LoadBalancerClient接口的实现类
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)// 初始化赋值LoadBalancerRetryProperties
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
    // 1.@AutoWired也会自动装载集合类list,会将合适的RestTemplate添加到restTemplates中
    // 而至于加载哪些RestTemplate,就是标注了@LoadBalanced的RestTemplate
    // 上面我们看到@LoadBalanced有一个@Qualifier就是特殊标注的含义,所以普通的没有添加@LoadBalanced
    // 则不会被添加到restTemplates中的
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Bean
    // 2.SmartInitializingSingleton接口的实现类会在项目初始化之后被调用其afterSingletonsInstantiated方法
	public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
			final List<RestTemplateCustomizer> customizers) {
		return new SmartInitializingSingleton() {
			@Override
			public void afterSingletonsInstantiated() {
				for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
					for (RestTemplateCustomizer customizer : customizers) {
						customizer.customize(restTemplate);
					}
				}
			}
		};
	}

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	@ConditionalOnMissingBean
    // 3.LoadBalancerRequestFactory被创建
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
	}

	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {
		@Bean
        // 4.将LoadBalancerClient接口的实现类和3方法中创建的LoadBalancerRequestFactory
        // 注入到该方法中,同时成为LoadBalancerInterceptor的参数
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
        // 5. 方法4中创建的LoadBalancerInterceptor会被作为方法参数注入进来
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return new RestTemplateCustomizer() {
				@Override
                // 5.1 customize方法会被2方法中的afterSingletonsInstantiated()遍历调用
				public void customize(RestTemplate restTemplate) {
					List<ClientHttpRequestInterceptor> list = new ArrayList<>(
							restTemplate.getInterceptors());
					list.add(loadBalancerInterceptor);
					restTemplate.setInterceptors(list);
				}
			};
		}
	}

    // 有关于RetryTemplate相关的bean在该例中不会被加载进来
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	static class RetryAutoConfiguration {
		@Bean
		public RetryTemplate retryTemplate() {
			RetryTemplate template =  new RetryTemplate();
			template.setThrowLastExceptionOnExhausted(true);
			return template;
		}

		@Bean
		@ConditionalOnMissingBean
		public LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory() {
			return new LoadBalancedRetryPolicyFactory.NeverRetryFactory();
		}

		@Bean
		public RetryLoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
				LoadBalancedRetryPolicyFactory lbRetryPolicyFactory,
				LoadBalancerRequestFactory requestFactory) {
			return new RetryLoadBalancerInterceptor(loadBalancerClient, retryTemplate(), properties,
					lbRetryPolicyFactory, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
			return new RestTemplateCustomizer() {
				@Override
				public void customize(RestTemplate restTemplate) {
					List<ClientHttpRequestInterceptor> list = new ArrayList<>(
							restTemplate.getInterceptors());
					list.add(loadBalancerInterceptor);
					restTemplate.setInterceptors(list);
				}
			};
		}
	}
}

    总结:经过以上的一堆注释可知,该类的主要作用就是给添加了@LoadBalanced注解的RestTemplate类,添加拦截器LoadBalancerInterceptor,该拦截器拦截到请求后将请求重新处理,就在这个拦截器中实现了负载均衡的相关功能。

 

    疑问:上文中一直有有LoadBalancerClient,那么这个接口的实现类是什么呢?读者可先自行思考下

 

 

3.RestTemplate.getForObject(String url, Class<T> responseType, Object... uriVariables)

    经过上面的一堆解释,我们知道,新创建的RestTemplate在项目启动完成之后,会被添加一个新的拦截器,在我们发出具体的HTTP请求时,拦截器会拦截该请求,并真正发挥负载均衡的作用,那么我们来看下,RestTemplate发出get请求的具体操作

 

    getForObject()源码如下:

public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
    RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    HttpMessageConverterExtractor<T> responseExtractor =
        new HttpMessageConverterExtractor<T>(responseType, getMessageConverters(), logger);
    return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
}

// execute
public <T> T execute(String url, HttpMethod method, RequestCallback requestCallback,
                     ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

    URI expanded = getUriTemplateHandler().expand(url, uriVariables);
    return doExecute(expanded, method, requestCallback, responseExtractor);
}

//doExecute
protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,
                          ResponseExtractor<T> responseExtractor) throws RestClientException {

    ...
    ClientHttpResponse response = null;
    try {
        // 1.创一个request请求
        ClientHttpRequest request = createRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        // 2.执行该请求
        response = request.execute();
        // 3.对响应结果进行封装
        handleResponse(url, method, response);
        if (responseExtractor != null) {
            return responseExtractor.extractData(response);
        }
        else {
            return null;
        }
    }
    ...
}

    主要就分三步,下面逐个来分析下

 

    1)createRequest(url, method)创一个request请求

protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
    ClientHttpRequest request = getRequestFactory().createRequest(url, method);// 重点在这里
    if (logger.isDebugEnabled()) {
        logger.debug("Created " + method.name() + " request for \"" + url + "\"");
    }
    return request;
}

//getRequestFactory
public ClientHttpRequestFactory getRequestFactory() {
    ClientHttpRequestFactory delegate = super.getRequestFactory();
    // 我们的RestTemplate拦截器不为空
    if (!CollectionUtils.isEmpty(getInterceptors())) {
        return new InterceptingClientHttpRequestFactory(delegate, getInterceptors());
    }
    else {
        return delegate;
    }
}

// InterceptingClientHttpRequestFactory.createRequest(url, method)
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
    // 最终返回的就是这个request
    return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}

    2)request.execute()执行该请求

    当前的request为InterceptingClientHttpRequest

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    // 1.iterator即拦截器集合
    // private final Iterator<ClientHttpRequestInterceptor> iterator;
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        // 2.逐个拦截器来执行,我们就看最重要的LoadBalancerInterceptor
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
        for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
            List<String> values = entry.getValue();
            for (String value : values) {
                delegate.getHeaders().add(entry.getKey(), value);
            }
        }
        if (body.length > 0) {
            StreamUtils.copy(body, delegate.getBody());
        }
        return delegate.execute();
    }
}

    总结2):重点就是执行拦截器的intercept方法,下面我们来看下LoadBalancerInterceptor.intercept()方法

 

 

4.LoadBalancerInterceptor.intercept()执行负载均衡拦截器方法

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                    final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 真正执行的方法
    // private LoadBalancerClient loadBalancer; LoadBalancerClient默认实现类为RibbonLoadBalancerClient
    return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}

// RibbonLoadBalancerClient.execute()
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
    // 1.根据用户请求的serviceId来获取具体的LoadBalanced
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);

    // 2.获取具体的server(也就是定位到哪台服务器的哪个端口号的具体服务信息)
    Server server = getServer(loadBalancer);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                                                                             serviceId), serverIntrospector(serviceId).getMetadata(server));

    // 3.执行HTTP请求
    return execute(serviceId, ribbonServer, request);
}

    注意:通过这个方法的分析可以看到,里面通过一系列的算法根据用户输入的serviceId(也就是服务名)来获取到具体的服务所在host、port,然后重新封装HTTP请求,最后执行该HTTP请求即可

    下面逐个方法来分析

 

    1)getLoadBalancer()

protected ILoadBalancer getLoadBalancer(String serviceId) {
    return this.clientFactory.getLoadBalancer(serviceId);
}

// SpringClientFactory.getLoadBalancer()
public ILoadBalancer getLoadBalancer(String name) {
    return getInstance(name, ILoadBalancer.class);
}

// SpringClientFactory.getInstance()
public <C> C getInstance(String name, Class<C> type) {
    C instance = super.getInstance(name, type);
    if (instance != null) {
        return instance;
    }
    IClientConfig config = getInstance(name, IClientConfig.class);
    return instantiateWithConfig(getContext(name), type, config);
}

// NamedContextFactory.getInstance()
public <T> T getInstance(String name, Class<T> type) {
    AnnotationConfigApplicationContext context = getContext(name);
    if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
                                                            type).length > 0) {
        // 主要就是这句,从容器中获取ILoadBalancer的实现,目前默认实现为
        // ZoneAwareLoadBalancer
        return context.getBean(type);
    }
    return null;
}

    总结1):通过上述一连串的方法调用可知,在最终是从容器中获取ILoadBalancer的实现,目前默认实现为ZoneAwareLoadBalancer

    疑问:ZoneAwareLoadBalancer是什么时候被加载到容器中呢?读者可自行思考下

 

 

    2)getServer(loadBalancer)

protected Server getServer(ILoadBalancer loadBalancer) {
    if (loadBalancer == null) {
        return null;
    }
    // 具体执行为ZoneAwareLoadBalancer.chooseServer()
    return loadBalancer.chooseServer("default"); // TODO: better handling of key
}

// ZoneAwareLoadBalancer.chooseServer()
public Server chooseServer(Object key) {
    // 1.由于笔者测试的server,可用的zone为1个,所以会直接走super.chooseServer()
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    //如果是多region,则会走下面的方法,暂时注释掉
    ...
}
        
    //BaseLoadBalancer.chooseServer()
   public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                // rule为ZoneAvoidanceRule
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
        
//PredicateBasedRule.choose(key)
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        // 重点在这里,从所有的server中根据对应的rule来获取一个具体的server
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

    总结2):getServer()的主要功能就是根据具体的rule来选择特定的Server,重要的实现实际都在这个方法里

    鉴于篇幅,笔者没有分析其他的规则使用,只分析当前案例中的规则

 

 

    3)RibbonLoadBalancerClient.execute(serviceId, ribbonServer, request)执行HTTP请求

public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
    ...
    RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {
        // 核心方法在这里,是一个回调方法,
        // 具体就是回调LoadBalancerRequestFactory.createRequest()中的apply()方法
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);
        return returnVal;
    }
    ...
}

    // 回调方法
    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
                                                                 final byte[] body, final ClientHttpRequestExecution execution) {
        return new LoadBalancerRequest<ClientHttpResponse>() {

            @Override
            // 回调方法在这里
            public ClientHttpResponse apply(final ServiceInstance instance)
            throws Exception {
                HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
                if (transformers != null) {
                    for (LoadBalancerRequestTransformer transformer : transformers) {
                        serviceRequest = transformer.transformRequest(serviceRequest, instance);
                    }
                }
                // 真正要执行的方法
                return execution.execute(serviceRequest, body);
            }

        };
    }

    //InterceptingRequestExecution.execute()
    public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
        if (this.iterator.hasNext()) {
            ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
            return nextInterceptor.intercept(request, body, this);
        }
        // 注意:此时已经没有iterator,直接执行request请求
        else {
            // 1.根据URI获得请求,并封装头部
            ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
            for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
                List<String> values = entry.getValue();
                for (String value : values) {
                    delegate.getHeaders().add(entry.getKey(), value);
                }
            }
            if (body.length > 0) {
                StreamUtils.copy(body, delegate.getBody());
            }
            // 2.本质就是对HttpURLConnection的执行
            return delegate.execute();
        }
    }
}

    到此为止,URI的请求基本已经结束了。

 

 

总结:

    1)用户创建RestTemplate

    2)添加了ribbon依赖后,会在项目启动的时候自动往RestTemplate中添加LoadBalancerInterceptor拦截器

    3)用户根据RestTemplate发起请求时,会将请求转发到LoadBalancerInterceptor去执行,该拦截器会根据指定的负载均衡方式获取该次请求对应的应用服务端IP、port

    4)根据获取到的IP、port重新封装请求,发送HTTP请求,返回具体响应

 

 

参考:SpringCloud微服务实战

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐