关于@LoadBalanced注解的使用

上一篇说到, 关于RestTemplate ,如何让其具备负载均衡的能力, 答案是, 在其注册到容器时添加@LoadBalanced注解即可, 关于@LoadBalanced注解又是怎么实现该功能的呢?

1 关于@LoadBalanced注解简介

@LoadBalanced注解的代码, 很简单,其中@Qualifier注解, 该注解的作用是通过名称进行精确注入.

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

关于客户端负载均衡器的自动转配类LoadBalancerAutoConfiguration(可以在spring-cloud-commons的jar包中,从找到META-INF/spring.factories)

1 从其类上的注解 @ConditionalOnClass(RestTemplate.class)和@ConditionalOnBean(LoadBalancerClient.class)可知, 自动配置生效需要类路径有RestTemplate存在, 且容器中有LoadBalancerClient对象的Bean.

2 类中生成SmartInitializingSingleton的Bean对象, LoadBalancerAutoConfiguration.this.restTemplates指的是所有被@LoadBalanced注解的RestTemplate , 而 其中的RestTemplateCustomizer, 根据下方的restTemplateCustomizer方法, 会给RestTemplate设置一个loadBalancerInterceptor拦截器.

/**
 * Auto-configuration for Ribbon (client-side load balancing).
 * 对Ribbon的自动配置
 * @author Spencer Gibb
 * @author Dave Syer
 * @author Will Tran
 * @author Gang Li
 */
@Configuration
// 只对RestTemplate生效
@ConditionalOnClass(RestTemplate.class)
// LoadBalancerClient对象Bean存在才会生效
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
	}

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
	}

	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {
		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
		}
	}
      
}    

当RestTemplate发起一个请求, 请求就会被LoadBalancerInterceptor拦截, 从拦截方法中得知,实际是由LoadBalancerClient发起的请求.

LoadBalancerInterceptor类

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;
	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}
}

LoadBalancerClient类

public interface LoadBalancerClient extends ServiceInstanceChooser {

    // 使用从负载均衡器中挑选出来的服务实例执行请求
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    // 使用从负载均衡器中挑选出来的服务实例执行请求
	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

    // 为系统构建一个合适
	URI reconstructURI(ServiceInstance instance, URI original);
    
    // 继承父类的接口
    // 根据传入的服务名id, 从客户端负载均衡器中挑选一个对应的服务实例
     ServiceInstance choose(String serviceId);
}

RibbonLoadBalancerClient类

是LoadBalancerClient的一个实现类. 在execute方法中, 根据服务id获取到ILoadBalancer对象,在调用getServer(ILoadBalancer,hint)方法, 获取服务实例, 而该方法里面又是调用loadBalancer的chooseServer方法.

public class RibbonLoadBalancerClient implements LoadBalancerClient {	
	private SpringClientFactory clientFactory;

	public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
		this.clientFactory = clientFactory;
    }

	@Override
	public ServiceInstance choose(String serviceId) {
	    return choose(serviceId, null);
	}

	/**
	 * New: Select a server using a 'key'.
	 */
	public ServiceInstance choose(String serviceId, Object hint) {
		Server server = getServer(getLoadBalancer(serviceId), hint);
		if (server == null) {
			return null;
		}
		return new RibbonServer(serviceId, server, isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
	}

	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
	    return execute(serviceId, request, null);
	}

	/**
	 * New: Execute a request by selecting server using a 'key'.
	 * The hint will have to be the last parameter to not mess with the `execute(serviceId, ServiceInstance, request)`
	 * method. This somewhat breaks the fluent coding style when using a lambda to define the LoadBalancerRequest.
	 */
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}

	@Override
	public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
		Server server = null;
		if(serviceInstance instanceof RibbonServer) {
			server = ((RibbonServer)serviceInstance).getServer();
		}
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}

		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);
		RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

		try {
			T returnVal = request.apply(serviceInstance);
			statsRecorder.recordStats(returnVal);
			return returnVal;
		}
		// catch IOException and rethrow so RestTemplate behaves correctly
		catch (IOException ex) {
			statsRecorder.recordStats(ex);
			throw ex;
		}
		catch (Exception ex) {
			statsRecorder.recordStats(ex);
			ReflectionUtils.rethrowRuntimeException(ex);
		}
		return null;
	}
    
    
    protected Server getServer(ILoadBalancer loadBalancer) {
	    return getServer(loadBalancer, null);
	}

	protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
		if (loadBalancer == null) {
			return null;
		}
		// Use 'default' on a null hint, or just pass it on?
		return loadBalancer.chooseServer(hint != null ? hint : "default");
	}

    protected ILoadBalancer getLoadBalancer(String serviceId) {
		return this.clientFactory.getLoadBalancer(serviceId);
	}
    
}

ILoadBalancer类

其实现类中, 主要的是DynamicServerListLoadBalancer和ZoneAwareLoadBalancer做了相关的扩展, 而SpringCloud中使用的是ZoneAwareLoadBalancer类. 从Ribbon客户端配置类RibbonClientConfiguration中ribbonLoadBalancer方法返回值可知.

public interface ILoadBalancer {

	// 向负载均衡器实例列表中添加服务实例
	public void addServers(List<Server> newServers);
	
    // 通过传入可以, 根据某种策略,从负载均衡器中挑选出一个服务实例
	public Server chooseServer(Object key);
	
    // 标识某个服务下线,停止使用,
	public void markServerDown(Server server);
	
	@Deprecated
	public List<Server> getServerList(boolean availableOnly);
	
    // 获取当前正常工作的实例列表
    public List<Server> getReachableServers();

    // 获取所有的实例列表(正常服务和已下线服务)
	public List<Server> getAllServers();
}

RibbonClientConfiguration类

其中ILoadBalancer对象的创建, 默认是使用ZoneAwareLoadBalancer对象.

	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}

再看RibbonLoadBalancerClient类中execute的执行方法, 首先获取一个Server对象, 再封装成一个RibbonServer对象, 最后调用重载的execute方法,里面调用到了LoadBalancerRequest的apply方法,向一个具体的服务实例发送请求.关于请求的参数ServiceInstance

ServiceInstance类

他的实现类有RibbonServer等, 接口中主要定义了一些服务实例的相关信息.

public interface ServiceInstance {
    default String getInstanceId() {
        return null;
    }

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();

    default String getScheme() {
        return null;
    }
}

LoadBalancerRequest类

此类是一个接口, 且没有相关的实现类, 根据传入的request的来源, 发现是在LoadBalancerInterceptor拦截器中添加的.

public interface LoadBalancerRequest<T> {
	T apply(ServiceInstance instance) throws Exception;
}

LoadBalancerInterceptor类

其中LoadBalancerRequest的创建是requestFactory.createRequest方法完成的.

    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }

LoadBalancerRequestFactory类

	public LoadBalancerRequest<ClientHttpResponse> createRequest(
			final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) {
		return instance -> {
			HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
					this.loadBalancer);
			if (this.transformers != null) {
				for (LoadBalancerRequestTransformer transformer : this.transformers) {
					serviceRequest = transformer.transformRequest(serviceRequest,
							instance);
				}
			}
			return execution.execute(serviceRequest, body);
		};
	}

ServiceRequestWrapper类

该类是HttpRequestWrapper子类, 主要是重写了父类的getURI方法

public class ServiceRequestWrapper extends HttpRequestWrapper {

	private final ServiceInstance instance;

	private final LoadBalancerClient loadBalancer;

	public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
			LoadBalancerClient loadBalancer) {
		super(request);
		this.instance = instance;
		this.loadBalancer = loadBalancer;
	}

	@Override
	public URI getURI() {
		URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
		return uri;
	}
}

关于RibbonLoadBalancerClient类的reconstructURI方法说明, 根据服务实例来构造了Server对象, 再调用reconstructURIWithServer方法生成实例的URI.

    public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
        URI uri;
        Server server;
        if (instance instanceof RibbonLoadBalancerClient.RibbonServer) {
            RibbonLoadBalancerClient.RibbonServer ribbonServer = (RibbonLoadBalancerClient.RibbonServer)instance;
            server = ribbonServer.getServer();
            uri = RibbonUtils.updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = this.serverIntrospector(serviceId);
            uri = RibbonUtils.updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);
        }

        return context.reconstructURIWithServer(server, uri);
    }

LoadBalancerContext类

通过服务和以服务名为host的URI对象,构建了一个最终访问的实例地址.

    public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server.getPort();
        String scheme = server.getScheme();
        if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) {
            return original;
        } else {
            if (scheme == null) {
                scheme = original.getScheme();
            }

            if (scheme == null) {
                scheme = (String)this.deriveSchemeAndPortFromPartialUri(original).first();
            }

            try {
                StringBuilder sb = new StringBuilder();
                sb.append(scheme).append("://");
                if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                    sb.append(original.getRawUserInfo()).append("@");
                }

                sb.append(host);
                if (port >= 0) {
                    sb.append(":").append(port);
                }

                sb.append(original.getRawPath());
                if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                    sb.append("?").append(original.getRawQuery());
                }

                if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                    sb.append("#").append(original.getRawFragment());
                }

                URI newURI = new URI(sb.toString());
                return newURI;
            } catch (URISyntaxException var8) {
                throw new RuntimeException(var8);
            }
        }
    }

关于LoadBalancerRequestFactory类中生成LoadBalancerRequest对象, 是调用了ClientHttpRequestExecution接口中的execute方法.

ClientHttpRequestExecution类

该类有一个唯一的实现类InterceptingRequestExecution类.

@FunctionalInterface
public interface ClientHttpRequestExecution {
    ClientHttpResponse execute(HttpRequest var1, byte[] var2) throws IOException;
}

InterceptingRequestExecution类

通过请求工厂常见了一个请求对象,此时的getURI()是已经被重写的URI, 最后发送客户端请求.

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            } else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> {
                    delegate.getHeaders().addAll(key, value);
                });
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
                        streamingOutputMessage.setBody((outputStream) -> {
                            StreamUtils.copy(body, outputStream);
                        });
                    } else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }

                return delegate.execute();
            }
        }

2 总结

关于@LoadBalanced注解, 主要是用于构建RestTemplate对象时,让其在发送请求时,具备负载均衡的功能.添加该注解后, 请求会被内部的LoadBalancerInterceptor拦截器拦截,拦截后会将请求的服务名转换为具体的访问地址,再发起请求.

参考资料:

https://www.hxstrive.com/article/948.htm

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐