在这里插入图片描述

博主 默语带您 Go to New World.
个人主页—— 默语 的博客👦🏻 优秀内容
《java 面试题大全》
《java 专栏》
《idea技术专区》
《spring boot 技术专区》
《MyBatis从入门到精通》
《23种设计模式》
《经典算法学习》
《spring 学习》
《MYSQL从入门到精通》数据库是开发者必会基础之一~
🍩惟余辈才疏学浅,临摹之作或有不妥之处,还请读者海涵指正。☕🍭
🪁 吾期望此文有资助于尔,即使粗浅难及深广,亦备添少许微薄之助。苟未尽善尽美,敬请批评指正,以资改进。!💻⌨


默语是谁?

大家好,我是 默语,别名默语博主,擅长的技术领域包括Java、运维和人工智能。我的技术背景扎实,涵盖了从后端开发到前端框架的各个方面,特别是在Java 性能优化、多线程编程、算法优化等领域有深厚造诣。

目前,我活跃在CSDN、掘金、阿里云和 51CTO等平台,全网拥有超过10万的粉丝,总阅读量超过1400 万。统一 IP 名称为 默语 或者 默语博主。我是 CSDN 博客专家、阿里云专家博主和掘金博客专家,曾获博客专家、优秀社区主理人等多项荣誉,并在 2023 年度博客之星评选中名列前 50。我还是 Java 高级工程师、自媒体博主,北京城市开发者社区的主理人,拥有丰富的项目开发经验和产品设计能力。希望通过我的分享,帮助大家更好地了解和使用各类技术产品,在不断的学习过程中,可以帮助到更多的人,结交更多的朋友.


我的博客内容涵盖广泛,主要分享技术教程、Bug解决方案、开发工具使用、前沿科技资讯、产品评测与使用体验。我特别关注云服务产品评测、AI 产品对比、开发板性能测试以及技术报告,同时也会提供产品优缺点分析、横向对比,并分享技术沙龙与行业大会的参会体验。我的目标是为读者提供有深度、有实用价值的技术洞察与分析。


Netflix Ribbon

Netflix Ribbon是一个客户端负载均衡组件,用于将用户请求根据负载均衡算法负载到后端不同的服务集群节点上,从而降低单点服务器压力。

这里从代码层次了解底层原理

Netflix Ribbon引入方式为:在spring-cloud-commons的META-INF/spring.factories中引入自动配置类:LoadBalancerAutoConfiguration

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {...}

自动配置类上的condition注解是引入条件,LoadBalancerAutoConfiguration会类路径下存在RestTemplateIOC中存在LoadBalancerClient的条件下开启

LoadBalancerAutoConfiguration开启后具有以下功能

  • 在类路径下不存在 RetryTemplate注入LoadBalancerInterceptor、RestTemplateCustomizer
  • 对IOC中所有的带有@LoadBalanced注解的RestTemplate的Bean调用其customize方法
  • 如果开发者未注入LoadBalancerRequestFactory,则自动注入默认的LoadBalancerRequestFactory
  • 针对RetryTemplate进行自动配置与支持

对带有@LoadBalanced注解的RestTemplate的Bean调用其customize方法十分关键,细节如下

@Configuration
// 缺失RetryTemplate时开启以下功能
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
	LoadBalancerInterceptorConfig() {
	}

	// 注入负载均衡拦截器
	@Bean
	public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
		return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
	}

    // 注入一个RestTemplateCustomizer,配合loadBalancedRestTemplateInitializerDeprecated
	@Bean
	@ConditionalOnMissingBean
	public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
		return (restTemplate) -> {
			List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
			list.add(loadBalancerInterceptor);
			// 给restTemplate添加拦截器链
			restTemplate.setInterceptors(list);
		};
	}
}

Netflix Ribbon是基于RestTemplate的,因此需要先了解下与RestTemplate相关的细节

RestTemplate:负载均衡拦截

RestTemplate存在以下继承关系,其中父类InterceptingHttpAccessor支持ClientHttpRequestInterceptor拦截器

HttpAccessor (org.springframework.http.client.support)
    InterceptingHttpAccessor (org.springframework.http.client.support)
        RestTemplate (org.springframework.web.client)

RestTemplate发起http请求的流程主要为:execute–>doExecute–>createRequest->request.execute->handleResponse

HttpAccessor#createRequest

其中createRequest对Http请求进行了封装

protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
    // 使用工厂构造,该工厂被InterceptingHttpAccessor重写
	ClientHttpRequest request = getRequestFactory().createRequest(url, method);
	// 初始化
	initialize(request);
	return request;
}

getRequestFactory()默认为:SimpleClientHttpRequestFactory,因为注入了ClientHttpRequestInterceptor的子类LoadBalancerInterceptor,则会包装为InterceptingClientHttpRequestFactory,构建的Request=InterceptingClientHttpRequest

InterceptingRequestExecution#execute

调用方法封装为executeInternal,如果存在拦截器则返回的是拦截器的结果,细节如下

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
	// 链式调用  
	if (this.iterator.hasNext()) {
		ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
		// 返回拦截调用结果
		return nextInterceptor.intercept(request, body, this);
	}
	else {
		// 拦截器处理完毕则通过策略模式发起请求
		HttpMethod method = request.getMethod();
		Assert.state(method != null, "No standard HTTP method");
		// RestTemplate的requestFactory构建底层HTTP请求客户端
		ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
		request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
		if (body.length > 0) {
			if (delegate instanceof StreamingHttpOutputMessage) {
				StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
				streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
			}
			else {
				StreamUtils.copy(body, delegate.getBody());
			}
		}
		// ClientHttpRequest发起请求
		return delegate.execute();
	}
}

这里的拦截器为:LoadBalancerInterceptor,拦截细节如下:

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
	URI originalUri = request.getURI();
	String serviceName = originalUri.getHost();
	Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
	// 调用负载均衡器的execute方法
	return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

这里loadBalancer=LoadBalancerClient,requestFactory=LoadBalancerRequestFactory

Ribbon 负载均衡

在spring-cloud-starter-netflix-ribbon包中的META-INF/spring.factories中引入自动配置类:RibbonAutoConfiguration

@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {...}

RibbonAutoConfiguration中注册的Bean会在LoadBalancerAutoConfiguration之前进行注入

默认注册一个客户端

@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
	return new RibbonLoadBalancerClient(springClientFactory());
}

使用的客户端工厂

@Bean
public SpringClientFactory springClientFactory() {
	SpringClientFactory factory = new SpringClientFactory();
	factory.setConfigurations(this.configurations);
	return factory;
}

以上引入与LoadBalancerAutoConfiguration中使用RestTemplate进行负载均衡相关,其它暂不描述

LoadBalancerClient

具有以下功能

public interface ServiceInstanceChooser {
    // 根据服务ID获取服务实例 
    ServiceInstance choose(String serviceId);
}
public interface LoadBalancerClient extends ServiceInstanceChooser {
	// 根据服务ID,进行请求
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
	// 根据服务ID,转换为对应服务实例的请求URI(ip:port//xxx)
    URI reconstructURI(ServiceInstance instance, URI original);
}

核心方法:execute

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
	return execute(serviceId, request, null);
}

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
	// 1.根据配置信息获取负载均衡实例
	ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
	// 2. 获取可用的服务
	Server server = getServer(loadBalancer, hint);
	if (server == null) {
		throw new IllegalStateException("No instances available for " + serviceId);
	}
	// 3. RibbonServer构建
	RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
			serviceId), serverIntrospector(serviceId).getMetadata(server));
	// 4.执行
	return execute(serviceId, ribbonServer, request);
}

明确serviceId=serviceName=Host,hint=null=“default”,RestTemplate中调用的方法为execute

ILoadBalancer

默认实现为:ZoneAwareLoadBalancer

AbstractLoadBalancer (com.netflix.loadbalancer)
    NoOpLoadBalancer (com.netflix.loadbalancer)
    BaseLoadBalancer (com.netflix.loadbalancer)
        DynamicServerListLoadBalancer (com.netflix.loadbalancer)
            ZoneAwareLoadBalancer (com.netflix.loadbalancer)

其中顶级父类BaseLoadBalancer的构造器中设置了核心功能实现

// 默认负载均衡规则:线性轮询规则
private final static IRule DEFAULT_RULE = new RoundRobinRule()
// 默认名称
private static final String DEFAULT_NAME = "default";
// ping策略
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
public BaseLoadBalancer() {
	this.name = DEFAULT_NAME;
	this.ping = null;
	// 负载均衡规则  
	setRule(DEFAULT_RULE);
	// ping检测调度
	setupPingTask();
	// 负载均衡状态
	lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
IRule

负载均衡规则:整体分为3种:1.轮询,2.随机,3.重试

AbstractLoadBalancerRule (com.netflix.loadbalancer)
    ClientConfigEnabledRoundRobinRule (com.netflix.loadbalancer)
        BestAvailableRule (com.netflix.loadbalancer)
        PredicateBasedRule (com.netflix.loadbalancer)
            ZoneAvoidanceRule (com.netflix.loadbalancer)
            AvailabilityFilteringRule (com.netflix.loadbalancer)
    RoundRobinRule (com.netflix.loadbalancer)
        WeightedResponseTimeRule (com.netflix.loadbalancer)
        ResponseTimeWeightedRule (com.netflix.loadbalancer)
    RandomRule (com.netflix.loadbalancer)
    RetryRule (com.netflix.loadbalancer)
Server

负载均衡的服务实例,获取方式为调用负载均衡客户端的chooseServer,在顶级父类BaseLoadBalancer进行了实现

public Server chooseServer(Object key) {
	if (counter == null) {
		// 计数器
		counter = createCounter();
	}
	counter.increment();
	if (rule == null) {
		return null;
	} else {
		try {
			// 由负载均衡规则实现
			return rule.choose(key);
		} catch (Exception e) {
			logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
			return null;
		}
	}
}

以线性轮询规则为例:

public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        setLoadBalancer(lb);
    }

    /**
	 * 负载均衡实现方案
	 */ 
    public Server choose(ILoadBalancer lb, Object key) {
	    // 负载均衡对象,整个调用链上是一个负载均衡实例
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
		// 默认在10次尝试中获取到Server
        while (server == null && count++ < 10) {
		    // 获取可达的Server 
            List<Server> reachableServers = lb.getReachableServers();
			// 获取所有的Server
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();
            // 可达Server数量必须大于0,否则说明所有Server都已经挂掉了
            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
            // 获取目标Server的下标
            int nextServerIndex = incrementAndGetModulo(serverCount);
			// 获取Server
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }
            // 保证Server是可达的
            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

    /* 线性轮询的实现方案 */ 
    private int incrementAndGetModulo(int modulo) {
	    // 自旋加CAS
        for (;;) {
		    // 原子Integer获取
            int current = nextServerCyclicCounter.get();
			// % 求模运算,每次自增1,然后首尾循环
            int next = (current + 1) % modulo;
			// CAS 保证原子性
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

回到LoadBalancerClient客户端调用的execute方法,getServer之后,构建了一个RibbonServer对象,然后继续调用execute方法。

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
	Server server = null;
	// 这一步为true
	if(serviceInstance instanceof RibbonServer) {
		server = ((RibbonServer)serviceInstance).getServer();
	}
	if (server == null) {
		throw new IllegalStateException("No instances available for " + serviceId);
	}
    // 构建一个上下文
	RibbonLoadBalancerContext context = this.clientFactory
			.getLoadBalancerContext(serviceId);
	RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

	try {
	    // apply调用
		T returnVal = request.apply(serviceInstance);
		// 返回状态
		statsRecorder.recordStats(returnVal);
		// 调用完毕
		return returnVal;
	}
	// catch IOException and rethrow so RestTemplate behaves correctly
	catch (IOException ex) {
		statsRecorder.recordStats(ex);
		throw ex;
	}
	catch (Exception ex) {
		statsRecorder.recordStats(ex);
		ReflectionUtils.rethrowRuntimeException(ex);
	}
	return null;
}

将请求LoadBalancerRequest通过服务实例去apply,因此具体的请求流程与该Request的构建有关,而request最初来源是LoadBalancerAutoConfiguration中的自动注入的LoadBalancerRequestFactory构建,在LoadBalancerInterceptor#createRequest中实例化用通过函数式接口实现

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
		final byte[] body, final ClientHttpRequestExecution execution) {
	// apply方法
	return instance -> {
		// request包装,封装一些负载均衡信息
		HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
		if (transformers != null) {
			for (LoadBalancerRequestTransformer transformer : transformers) {
				serviceRequest = transformer.transformRequest(serviceRequest, instance);
			}
		}
		// 调用execution方法,这个execution是RestTemplate中的内部类
		return execution.execute(serviceRequest, body);
	};
}

这里的execution为InterceptingRequestExecution,这里又回到了InterceptingClientHttpRequest中execute的地方

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
	// 链式调用  
	if (this.iterator.hasNext()) {
		// 拦截器调用:返回ribbon拦截的结果
	}
	// 拦截器执行完毕后会回归到正常调用流程
	else {
		// 拦截器处理完毕则通过策略模式发起请求
		HttpMethod method = request.getMethod();
		Assert.state(method != null, "No standard HTTP method");
		// RestTemplate的SimpleClientHttpRequestFactory构建底层HTTP请求客户端
		// 根据是否具有bufferRequestBody选择不同的策略
		ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
		request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
		if (body.length > 0) {
			if (delegate instanceof StreamingHttpOutputMessage) {
				StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
				streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
			}
			else {
				StreamUtils.copy(body, delegate.getBody());
			}
		}
		// 发起请求
		return delegate.execute();
	}
}

这里其实就是正常的RestTemplate的请求流程了

在这里插入图片描述


🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥

如对本文内容有任何疑问、建议或意见,请联系作者,作者将尽力回复并改进📓;(联系微信:Solitudemind )

点击下方名片,加入IT技术核心学习团队。一起探索科技的未来,共同成长。

在这里插入图片描述

更多推荐