Spring Cloud feign客户端执行流程概述
ReflectiveFeign$FeignInvocationHandler使用者所定义的feign客户端(通过注解@FeignClient)会在应用启动时被发现,会以一个ReflectiveFeign$FeignInvocationHandler动态代理的形式注册到容器。feign客户端中对应远程服务端点功能每个函数会对应一个MethodHandler SynchronousMethod...
概述
应用程序初始化之后,开发人员定义的每个feign
客户端对应一个 ReflectiveFeign$FeignInvocationHandler
实例。实例具体创建的过程可以参考文章"注解 @EnableFeignClients 工作原理"。本文仅分析一个方法请求发生时,一个feign
客户端如何将其转换成最终的远程服务调用并处理响应,涉及以下几个问题点:
feign
方法和服务的对应- 选用哪个服务节点
- 发起服务请求和接收响应
- 处理服务响应
源代码分析
1. feign
方法和服务的对应
实际上,在初始化过程中创建feign
客户端实例时,已经完成了feign
方法和服务对应,从最终生成的实例所包含的信息就可以看得出来。
比如feign
客户端定义如下 :
package xxx;
import com.google.common.collect.Maps;
import xxx.TestModel;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "test-service", path = "/test")
public interface TestService {
@GetMapping(value = "/echo")
TestModel echoGet(@RequestParam("parameter") String parameter);
@PostMapping(value = "/echo/post",consumes = {"application/x-www-form-urlencoded"})
TestModel echoPostForm(Map<String, ?> formParams);
@PostMapping(value = "/echo/post")
TestModel echoPost(@RequestParam("parameter") String parameter);
}
则最终生成的客户端实例会如下所示 :
this = {ReflectiveFeign$FeignInvocationHandler@5249}
target = {Target$HardCodedTarget@5257}
"HardCodedTarget(type=TestService, name=test-service, url=http://test-service/test)"
dispatch = {LinkedHashMap@5258} size = 3
0 = {LinkedHashMap$Entry@5290} "public abstract xxx.TestModel xxx.TestService.echoPostForm(java.util.Map)"
1 = {LinkedHashMap$Entry@5291} "public abstract xxx.TestModel xxx.TestService.echoGet(java.lang.String)"
2 = {LinkedHashMap$Entry@5292} "public abstract xxx.TestModel xxx.TestService.echoPost(java.lang.String)"
这里feign
客户端实例的属性target
指向了目标服务http://test-service/test
,每个feign
客户端方法的注解也指明了更进一步的url
路径,比如方法echoGet
最终会对应到http://test-service/test/echo
,而方法echoPostForm/echoPost
最终会对应到http://test-service/test/echo/post
。而dispatch
是一个Map<Method, MethodHandler>
,key
是feign
客户端的方法,value
实现类则是SynchronousMethodHandler
,它也包含了和属性target
一样的信息以及对应方法上的元数据。
有了上述信息,一旦某个客户端方法被调用,就相当于调用了dispatch
中对应方法的SynchronousMethodHandler
:
// 内部类 ReflectiveFeign$FeignInvocationHandler 方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
// 对 feign client 方法的调用最终转换成dispatch中对应方法的SynchronousMethodHandler的调用
return dispatch.get(method).invoke(args);
}
由此可见,当feign
客户端的某个服务方法被调用时,对应的SynchronousMethodHandler#invoke
会被调用。而SynchronousMethodHandler
已经掌握了定位目标服务所需要的几乎所有的信息。目前尚不清楚的是,如果使用了负载均衡,test-service
会最终对应哪个服务节点呢?
2. 选用哪个服务节点
带着上述问题,我们继续来看 SynchronousMethodHandler#invoke
方法的执行 :
- 构建
RequestTemplate
这里构造
RequestTemplate
使用了一个RequestTemplate.Factory buildTemplateFromArgs
,该对象是通过SynchronousMethodHandler
构造函数传入的 ,所构造的RequestTemplate
对象会包含来自接口方法定义上的path
属性,http method
属性,参数等,但是还不包含目标服务节点信息。对于上面例子中提到的三个方法,生成的
RequestTemplate
的请求分别是 :echoGet : URL : http://test-service/test/echo?parameter=GET%20request Body : [] echoPost : URL : http://test-service/test/echo/post?parameter=POST%20request Body : [] echoPostForm : URL : http://test-service/test/echo/post Content Type : application/x-www-form-urlencoded Body : [parameter=POST+FORM+request]
#executeAndDecode
上面所构建的RequestTemplate
- 应用请求拦截器
RequestInterceptor
和应用目标到RestTemplate
形成一个Request
对象此时
RestTemplate
已经拥有除了具体服务节点之外所有的请求信息了。 - 调用
LoadBalancerFeignClient#execute
这里
LoadBalancerFeignClient
是通过SynchronousMethodHandler
构造函数传入的
- 应用请求拦截器
RetryableException
异常的话进行一次重试处理
上面流程 SynchronousMethodHandler#invoke
最终调用LoadBalancerFeignClient#execute
:
- 构建请求对象
FeignLoadBalancer.RibbonRequest ribbonRequest
- 获取请求配置
IClientConfig requestConfig
IClientConfig requestConfig = getClientConfig(options, clientName)
- 获取针对当前服务名称的负载均衡器
FeignLoadBalancer
(注意这里使用了缓存机制) - 调用所构建负载均衡器
FeignLoadBalancer
的方法executeWithLoadBalancer
而FeignLoadBalancer#executeWithLoadBalancer
方法实现如下 :
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig)
throws ClientException {
// 构建一个LoadBalancerCommand对象
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
// 构建一个ServerOperation并提交到所构建的LoadBalancerCommand
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
// 例子输入 :
// server : localhost:8080 所选定的服务节点
// request.uri : http://test/echo?parameter=value 请求URL含参数,
// 不含主机端口部分
// 例子输出 :
// 最终可以用于发起请求的完整的URL finalUri :
// http://localhost:8080/test/echo?parameter=value
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 发起最终的服务请求和处理相应
// 实现在FeignLoadBalancer基类中,GET/POST等各种方法都会在这里被处理
return Observable.just(
AbstractLoadBalancerAwareClient.this.execute(
requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
上面的方法FeignLoadBalancer#executeWithLoadBalancer
实现中,通过#buildLoadBalancerCommand
方法构建了一个LoadBalancerCommand
,它的#submit
方法会从多个负载均衡服务节点中选出一个供这次调用使用,具体的选择逻辑实现在方法LoadBalancerCommand#selectServer
中。
到此为止,我们就知道最重要使用哪个服务节点了。
接下来,我们看最终服务请求的发起和响应的接收。这部分逻辑实现在FeignLoadBalancer#execute
中:
@Override
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
throws IOException {
Request.Options options;
if (configOverride != null) {
RibbonProperties override = RibbonProperties.from(configOverride);
options = new Request.Options(
override.connectTimeout(this.connectTimeout),
override.readTimeout(this.readTimeout));
}
else {
options = new Request.Options(this.connectTimeout, this.readTimeout);
}
Response response = request.client().execute(request.toRequest(), options);
return new RibbonResponse(request.getUri(), response);
}
3. 发起服务请求和接收响应
从上面代码可以看到,最终请求通过Response response = request.client().execute(request.toRequest(), options)
发起并接收响应。而这部分具体实现在feign.Client
接口的内部类Default
:
package feign;
/**
* Submits HTTP Request requests. Implementations are expected to be thread-safe.
*/
public interface Client {
/**
* Executes a request against its Request#url() url and returns a response.
*
* @param request safe to replay.
* @param options options to apply to this request.
* @return connected response, Response.Body is absent or unread.
* @throws IOException on a network error connecting to Request#url().
*/
Response execute(Request request, Options options) throws IOException;
public static class Default implements Client {
private final SSLSocketFactory sslContextFactory;
private final HostnameVerifier hostnameVerifier;
/**
* Null parameters imply platform defaults.
*/
public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
this.sslContextFactory = sslContextFactory;
this.hostnameVerifier = hostnameVerifier;
}
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
// 对服务节点发起`HTTP`请求
HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
final HttpURLConnection connection =
(HttpURLConnection) new URL(request.url()).openConnection();
if (connection instanceof HttpsURLConnection) {
HttpsURLConnection sslCon = (HttpsURLConnection) connection;
if (sslContextFactory != null) {
sslCon.setSSLSocketFactory(sslContextFactory);
}
if (hostnameVerifier != null) {
sslCon.setHostnameVerifier(hostnameVerifier);
}
}
connection.setConnectTimeout(options.connectTimeoutMillis());
connection.setReadTimeout(options.readTimeoutMillis());
connection.setAllowUserInteraction(false);
connection.setInstanceFollowRedirects(options.isFollowRedirects());
connection.setRequestMethod(request.httpMethod().name());
Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
boolean gzipEncodedRequest =
contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
boolean deflateEncodedRequest =
contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);
boolean hasAcceptHeader = false;
Integer contentLength = null;
for (String field : request.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : request.headers().get(field)) {
if (field.equals(CONTENT_LENGTH)) {
if (!gzipEncodedRequest && !deflateEncodedRequest) {
contentLength = Integer.valueOf(value);
connection.addRequestProperty(field, value);
}
} else {
connection.addRequestProperty(field, value);
}
}
}
// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
connection.addRequestProperty("Accept", "*/*");
}
if (request.body() != null) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
out.write(request.body());
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
}
}
}
return connection;
}
// 接收响应
Response convertResponse(HttpURLConnection connection, Request request) throws IOException {
int status = connection.getResponseCode();
String reason = connection.getResponseMessage();
if (status < 0) {
throw new IOException(format("Invalid status(%s) executing %s %s", status,
connection.getRequestMethod(), connection.getURL()));
}
Map<String, Collection<String>> headers = new LinkedHashMap<String, Collection<String>>();
for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {
// response message
if (field.getKey() != null) {
headers.put(field.getKey(), field.getValue());
}
}
Integer length = connection.getContentLength();
if (length == -1) {
length = null;
}
InputStream stream;
if (status >= 400) {
stream = connection.getErrorStream();
} else {
stream = connection.getInputStream();
}
return Response.builder()
.status(status)
.reason(reason)
.headers(headers)
.request(request)
.body(stream, length)
.build();
}
}
}
4. 处理服务响应
缺省情况下,Spring Cloud
应用中feign
客户端使用一个ResponseEntityDecoder
来解析服务响应 :
package org.springframework.cloud.openfeign.support;
/**
* Decoder adds compatibility for Spring MVC's ResponseEntity to any other decoder via
* composition.
* @author chadjaros
*/
public class ResponseEntityDecoder implements Decoder {
private Decoder decoder;
public ResponseEntityDecoder(Decoder decoder) {
this.decoder = decoder;
}
@Override
public Object decode(final Response response, Type type) throws IOException,
FeignException {
if (isParameterizeHttpEntity(type)) {
type = ((ParameterizedType) type).getActualTypeArguments()[0];
Object decodedObject = decoder.decode(response, type);
return createResponse(decodedObject, response);
}
else if (isHttpEntity(type)) {
return createResponse(null, response);
}
else {
return decoder.decode(response, type);
}
}
private boolean isParameterizeHttpEntity(Type type) {
if (type instanceof ParameterizedType) {
return isHttpEntity(((ParameterizedType) type).getRawType());
}
return false;
}
private boolean isHttpEntity(Type type) {
if (type instanceof Class) {
Class c = (Class) type;
return HttpEntity.class.isAssignableFrom(c);
}
return false;
}
@SuppressWarnings("unchecked")
private <T> ResponseEntity<T> createResponse(Object instance, Response response) {
MultiValueMap<String, String> headers = new LinkedMultiValueMap<>();
for (String key : response.headers().keySet()) {
headers.put(key, new LinkedList<>(response.headers().get(key)));
}
return new ResponseEntity<>((T) instance, headers, HttpStatus.valueOf(response
.status()));
}
}
参考文章
更多推荐
所有评论(0)