关于@LoadBalanced注解的使用
关于@LoadBalanced注解的使用1 关于@LoadBalanced注解简介2 总结上一篇说到, 关于RestTemplate ,如何让其具备负载均衡的能力, 答案是, 在其注册到容器时添加@LoadBalanced注解即可, 关于@LoadBalanced注解又是怎么实现该功能的呢?1 关于@LoadBalanced注解简介@LoadBalanced注解的代码, 很简单,其中@Qualif
关于@LoadBalanced注解的使用
上一篇说到, 关于RestTemplate ,如何让其具备负载均衡的能力, 答案是, 在其注册到容器时添加@LoadBalanced注解即可, 关于@LoadBalanced注解又是怎么实现该功能的呢?
1 关于@LoadBalanced注解简介
@LoadBalanced注解的代码, 很简单,其中@Qualifier注解, 该注解的作用是通过名称进行精确注入.
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
关于客户端负载均衡器的自动转配类LoadBalancerAutoConfiguration(可以在spring-cloud-commons的jar包中,从找到META-INF/spring.factories)
1 从其类上的注解 @ConditionalOnClass(RestTemplate.class)和@ConditionalOnBean(LoadBalancerClient.class)可知, 自动配置生效需要类路径有RestTemplate存在, 且容器中有LoadBalancerClient对象的Bean.
2 类中生成SmartInitializingSingleton的Bean对象, LoadBalancerAutoConfiguration.this.restTemplates指的是所有被@LoadBalanced
注解的RestTemplate , 而 其中的RestTemplateCustomizer, 根据下方的restTemplateCustomizer方法, 会给RestTemplate设置一个loadBalancerInterceptor拦截器.
/**
* Auto-configuration for Ribbon (client-side load balancing).
* 对Ribbon的自动配置
* @author Spencer Gibb
* @author Dave Syer
* @author Will Tran
* @author Gang Li
*/
@Configuration
// 只对RestTemplate生效
@ConditionalOnClass(RestTemplate.class)
// LoadBalancerClient对象Bean存在才会生效
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
}
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
当RestTemplate发起一个请求, 请求就会被LoadBalancerInterceptor拦截, 从拦截方法中得知,实际是由LoadBalancerClient发起的请求.
LoadBalancerInterceptor类
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
}
LoadBalancerClient类
public interface LoadBalancerClient extends ServiceInstanceChooser {
// 使用从负载均衡器中挑选出来的服务实例执行请求
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
// 使用从负载均衡器中挑选出来的服务实例执行请求
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
// 为系统构建一个合适
URI reconstructURI(ServiceInstance instance, URI original);
// 继承父类的接口
// 根据传入的服务名id, 从客户端负载均衡器中挑选一个对应的服务实例
ServiceInstance choose(String serviceId);
}
RibbonLoadBalancerClient类
是LoadBalancerClient的一个实现类. 在execute方法中, 根据服务id获取到ILoadBalancer对象,在调用getServer(ILoadBalancer,hint)方法, 获取服务实例, 而该方法里面又是调用loadBalancer的chooseServer方法.
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}
/**
* New: Select a server using a 'key'.
*/
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
return execute(serviceId, request, null);
}
/**
* New: Execute a request by selecting server using a 'key'.
* The hint will have to be the last parameter to not mess with the `execute(serviceId, ServiceInstance, request)`
* method. This somewhat breaks the fluent coding style when using a lambda to define the LoadBalancerRequest.
*/
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
protected Server getServer(ILoadBalancer loadBalancer) {
return getServer(loadBalancer, null);
}
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
}
ILoadBalancer类
其实现类中, 主要的是DynamicServerListLoadBalancer和ZoneAwareLoadBalancer做了相关的扩展, 而SpringCloud中使用的是ZoneAwareLoadBalancer类. 从Ribbon客户端配置类RibbonClientConfiguration中ribbonLoadBalancer方法返回值可知.
public interface ILoadBalancer {
// 向负载均衡器实例列表中添加服务实例
public void addServers(List<Server> newServers);
// 通过传入可以, 根据某种策略,从负载均衡器中挑选出一个服务实例
public Server chooseServer(Object key);
// 标识某个服务下线,停止使用,
public void markServerDown(Server server);
@Deprecated
public List<Server> getServerList(boolean availableOnly);
// 获取当前正常工作的实例列表
public List<Server> getReachableServers();
// 获取所有的实例列表(正常服务和已下线服务)
public List<Server> getAllServers();
}
RibbonClientConfiguration类
其中ILoadBalancer对象的创建, 默认是使用ZoneAwareLoadBalancer对象.
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
再看RibbonLoadBalancerClient类中execute的执行方法, 首先获取一个Server对象, 再封装成一个RibbonServer对象, 最后调用重载的execute方法,里面调用到了LoadBalancerRequest的apply方法,向一个具体的服务实例发送请求.关于请求的参数ServiceInstance
ServiceInstance类
他的实现类有RibbonServer等, 接口中主要定义了一些服务实例的相关信息.
public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
default String getScheme() {
return null;
}
}
LoadBalancerRequest类
此类是一个接口, 且没有相关的实现类, 根据传入的request的来源, 发现是在LoadBalancerInterceptor拦截器中添加的.
public interface LoadBalancerRequest<T> {
T apply(ServiceInstance instance) throws Exception;
}
LoadBalancerInterceptor类
其中LoadBalancerRequest的创建是requestFactory.createRequest方法完成的.
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
LoadBalancerRequestFactory类
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}
ServiceRequestWrapper类
该类是HttpRequestWrapper子类, 主要是重写了父类的getURI方法
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
LoadBalancerClient loadBalancer) {
super(request);
this.instance = instance;
this.loadBalancer = loadBalancer;
}
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
}
关于RibbonLoadBalancerClient类的reconstructURI方法说明, 根据服务实例来构造了Server对象, 再调用reconstructURIWithServer方法生成实例的URI.
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonLoadBalancerClient.RibbonServer) {
RibbonLoadBalancerClient.RibbonServer ribbonServer = (RibbonLoadBalancerClient.RibbonServer)instance;
server = ribbonServer.getServer();
uri = RibbonUtils.updateToSecureConnectionIfNeeded(original, ribbonServer);
} else {
server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = this.serverIntrospector(serviceId);
uri = RibbonUtils.updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}
LoadBalancerContext类
通过服务和以服务名为host的URI对象,构建了一个最终访问的实例地址.
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server.getPort();
String scheme = server.getScheme();
if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) {
return original;
} else {
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = (String)this.deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException var8) {
throw new RuntimeException(var8);
}
}
}
关于LoadBalancerRequestFactory类中生成LoadBalancerRequest对象, 是调用了ClientHttpRequestExecution接口中的execute方法.
ClientHttpRequestExecution类
该类有一个唯一的实现类InterceptingRequestExecution类.
@FunctionalInterface
public interface ClientHttpRequestExecution {
ClientHttpResponse execute(HttpRequest var1, byte[] var2) throws IOException;
}
InterceptingRequestExecution类
通过请求工厂常见了一个请求对象,此时的getURI()是已经被重写的URI, 最后发送客户端请求.
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
return nextInterceptor.intercept(request, body, this);
} else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> {
delegate.getHeaders().addAll(key, value);
});
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
streamingOutputMessage.setBody((outputStream) -> {
StreamUtils.copy(body, outputStream);
});
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
2 总结
关于@LoadBalanced注解, 主要是用于构建RestTemplate对象时,让其在发送请求时,具备负载均衡的功能.添加该注解后, 请求会被内部的LoadBalancerInterceptor拦截器拦截,拦截后会将请求的服务名转换为具体的访问地址,再发起请求.
参考资料:
https://www.hxstrive.com/article/948.htm
更多推荐
所有评论(0)