概述

应用程序初始化之后,开发人员定义的每个feign客户端对应一个 ReflectiveFeign$FeignInvocationHandler实例。实例具体创建的过程可以参考文章"注解 @EnableFeignClients 工作原理"。本文仅分析一个方法请求发生时,一个feign客户端如何将其转换成最终的远程服务调用并处理响应,涉及以下几个问题点:

  1. feign方法和服务的对应
  2. 选用哪个服务节点
  3. 发起服务请求和接收响应
  4. 处理服务响应

源代码分析

1. feign方法和服务的对应

实际上,在初始化过程中创建feign客户端实例时,已经完成了feign方法和服务对应,从最终生成的实例所包含的信息就可以看得出来。

比如feign客户端定义如下 :

package xxx;

import com.google.common.collect.Maps;
import xxx.TestModel;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(name = "test-service", path = "/test")
public interface TestService {
    @GetMapping(value = "/echo")
    TestModel echoGet(@RequestParam("parameter") String parameter);

    @PostMapping(value = "/echo/post",consumes = {"application/x-www-form-urlencoded"})
    TestModel echoPostForm(Map<String, ?> formParams);

    @PostMapping(value = "/echo/post")
    TestModel echoPost(@RequestParam("parameter") String parameter);
}

则最终生成的客户端实例会如下所示 :

this = {ReflectiveFeign$FeignInvocationHandler@5249} 
 target = {Target$HardCodedTarget@5257} 
 	"HardCodedTarget(type=TestService, name=test-service, url=http://test-service/test)"
 dispatch = {LinkedHashMap@5258}  size = 3
  0 = {LinkedHashMap$Entry@5290} "public abstract xxx.TestModel xxx.TestService.echoPostForm(java.util.Map)" 
  1 = {LinkedHashMap$Entry@5291} "public abstract xxx.TestModel xxx.TestService.echoGet(java.lang.String)"
  2 = {LinkedHashMap$Entry@5292} "public abstract xxx.TestModel xxx.TestService.echoPost(java.lang.String)"

这里feign客户端实例的属性target指向了目标服务http://test-service/test,每个feign客户端方法的注解也指明了更进一步的url路径,比如方法echoGet最终会对应到http://test-service/test/echo,而方法echoPostForm/echoPost最终会对应到http://test-service/test/echo/post。而dispatch是一个Map<Method, MethodHandler>,keyfeign客户端的方法,value实现类则是SynchronousMethodHandler,它也包含了和属性target一样的信息以及对应方法上的元数据。

有了上述信息,一旦某个客户端方法被调用,就相当于调用了dispatch中对应方法的SynchronousMethodHandler:

 // 内部类 ReflectiveFeign$FeignInvocationHandler 方法
 @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      if ("equals".equals(method.getName())) {
        try {
          Object otherHandler =
              args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
          return equals(otherHandler);
        } catch (IllegalArgumentException e) {
          return false;
        }
      } else if ("hashCode".equals(method.getName())) {
        return hashCode();
      } else if ("toString".equals(method.getName())) {
        return toString();
      }

	  // 对 feign client 方法的调用最终转换成dispatch中对应方法的SynchronousMethodHandler的调用
      return dispatch.get(method).invoke(args);
    }

由此可见,当feign客户端的某个服务方法被调用时,对应的SynchronousMethodHandler#invoke会被调用。而SynchronousMethodHandler已经掌握了定位目标服务所需要的几乎所有的信息。目前尚不清楚的是,如果使用了负载均衡,test-service会最终对应哪个服务节点呢?

2. 选用哪个服务节点

带着上述问题,我们继续来看 SynchronousMethodHandler#invoke方法的执行 :

  1. 构建RequestTemplate

    这里构造RequestTemplate使用了一个RequestTemplate.Factory buildTemplateFromArgs,该对象是通过SynchronousMethodHandler构造函数传入的 ,所构造的RequestTemplate对象会包含来自接口方法定义上的path属性,http method属性,参数等,但是还不包含目标服务节点信息。

    对于上面例子中提到的三个方法,生成的RequestTemplate的请求分别是 :

    echoGet :
    	URL : http://test-service/test/echo?parameter=GET%20request
    	Body : []
    echoPost : 
    	URL : http://test-service/test/echo/post?parameter=POST%20request
    	Body : []
    echoPostForm : 
    	URL : http://test-service/test/echo/post
    	Content Type : application/x-www-form-urlencoded
    	Body : [parameter=POST+FORM+request]
    
  2. #executeAndDecode上面所构建的RequestTemplate
    1. 应用请求拦截器RequestInterceptor和应用目标到RestTemplate形成一个Request对象

      此时RestTemplate已经拥有除了具体服务节点之外所有的请求信息了。

    2. 调用LoadBalancerFeignClient#execute

      这里LoadBalancerFeignClient 是通过SynchronousMethodHandler构造函数传入的

  3. RetryableException异常的话进行一次重试处理

上面流程 SynchronousMethodHandler#invoke最终调用LoadBalancerFeignClient#execute

  1. 构建请求对象FeignLoadBalancer.RibbonRequest ribbonRequest
  2. 获取请求配置 IClientConfig requestConfig
     IClientConfig requestConfig = getClientConfig(options, clientName)
    
  3. 获取针对当前服务名称的负载均衡器FeignLoadBalancer (注意这里使用了缓存机制)
  4. 调用所构建负载均衡器FeignLoadBalancer 的方法executeWithLoadBalancer

FeignLoadBalancer#executeWithLoadBalancer方法实现如下 :

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) 
    	throws ClientException {
    	//  构建一个LoadBalancerCommand对象
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
        	// 构建一个ServerOperation并提交到所构建的LoadBalancerCommand
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        // 例子输入 : 
                        //  server : localhost:8080 所选定的服务节点
                        // request.uri : http://test/echo?parameter=value 请求URL含参数,
                        // 不含主机端口部分
                        // 例子输出 :  
                        //  最终可以用于发起请求的完整的URL finalUri :
                        //  http://localhost:8080/test/echo?parameter=value                    
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                        	// 发起最终的服务请求和处理相应
                        	// 实现在FeignLoadBalancer基类中,GET/POST等各种方法都会在这里被处理
                            return Observable.just(
                            AbstractLoadBalancerAwareClient.this.execute(
                            					requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }

上面的方法FeignLoadBalancer#executeWithLoadBalancer实现中,通过#buildLoadBalancerCommand方法构建了一个LoadBalancerCommand,它的#submit方法会从多个负载均衡服务节点中选出一个供这次调用使用,具体的选择逻辑实现在方法LoadBalancerCommand#selectServer中。

到此为止,我们就知道最重要使用哪个服务节点了。

接下来,我们看最终服务请求的发起和响应的接收。这部分逻辑实现在FeignLoadBalancer#execute中:


	@Override
	public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
			throws IOException {
		Request.Options options;
		if (configOverride != null) {
			RibbonProperties override = RibbonProperties.from(configOverride);
			options = new Request.Options(
					override.connectTimeout(this.connectTimeout),
					override.readTimeout(this.readTimeout));
		}
		else {
			options = new Request.Options(this.connectTimeout, this.readTimeout);
		}
		Response response = request.client().execute(request.toRequest(), options);
		return new RibbonResponse(request.getUri(), response);
	}

3. 发起服务请求和接收响应

从上面代码可以看到,最终请求通过Response response = request.client().execute(request.toRequest(), options)发起并接收响应。而这部分具体实现在feign.Client接口的内部类Default:

package feign;


/**
 * Submits HTTP Request requests. Implementations are expected to be thread-safe.
 */
public interface Client {

  /**
   * Executes a request against its Request#url() url and returns a response.
   *
   * @param request safe to replay.
   * @param options options to apply to this request.
   * @return connected response, Response.Body is absent or unread.
   * @throws IOException on a network error connecting to Request#url().
   */
  Response execute(Request request, Options options) throws IOException;

  public static class Default implements Client {

    private final SSLSocketFactory sslContextFactory;
    private final HostnameVerifier hostnameVerifier;

    /**
     * Null parameters imply platform defaults.
     */
    public Default(SSLSocketFactory sslContextFactory, HostnameVerifier hostnameVerifier) {
      this.sslContextFactory = sslContextFactory;
      this.hostnameVerifier = hostnameVerifier;
    }

    @Override
    public Response execute(Request request, Options options) throws IOException {
      HttpURLConnection connection = convertAndSend(request, options);
      return convertResponse(connection, request);
    }

	// 对服务节点发起`HTTP`请求
    HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
      final HttpURLConnection connection =
          (HttpURLConnection) new URL(request.url()).openConnection();
      if (connection instanceof HttpsURLConnection) {
        HttpsURLConnection sslCon = (HttpsURLConnection) connection;
        if (sslContextFactory != null) {
          sslCon.setSSLSocketFactory(sslContextFactory);
        }
        if (hostnameVerifier != null) {
          sslCon.setHostnameVerifier(hostnameVerifier);
        }
      }
      connection.setConnectTimeout(options.connectTimeoutMillis());
      connection.setReadTimeout(options.readTimeoutMillis());
      connection.setAllowUserInteraction(false);
      connection.setInstanceFollowRedirects(options.isFollowRedirects());
      connection.setRequestMethod(request.httpMethod().name());

      Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
      boolean gzipEncodedRequest =
          contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
      boolean deflateEncodedRequest =
          contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);

      boolean hasAcceptHeader = false;
      Integer contentLength = null;
      for (String field : request.headers().keySet()) {
        if (field.equalsIgnoreCase("Accept")) {
          hasAcceptHeader = true;
        }
        for (String value : request.headers().get(field)) {
          if (field.equals(CONTENT_LENGTH)) {
            if (!gzipEncodedRequest && !deflateEncodedRequest) {
              contentLength = Integer.valueOf(value);
              connection.addRequestProperty(field, value);
            }
          } else {
            connection.addRequestProperty(field, value);
          }
        }
      }
      // Some servers choke on the default accept string.
      if (!hasAcceptHeader) {
        connection.addRequestProperty("Accept", "*/*");
      }

      if (request.body() != null) {
        if (contentLength != null) {
          connection.setFixedLengthStreamingMode(contentLength);
        } else {
          connection.setChunkedStreamingMode(8196);
        }
        connection.setDoOutput(true);
        OutputStream out = connection.getOutputStream();
        if (gzipEncodedRequest) {
          out = new GZIPOutputStream(out);
        } else if (deflateEncodedRequest) {
          out = new DeflaterOutputStream(out);
        }
        try {
          out.write(request.body());
        } finally {
          try {
            out.close();
          } catch (IOException suppressed) { // NOPMD
          }
        }
      }
      return connection;
    }

	// 接收响应
    Response convertResponse(HttpURLConnection connection, Request request) throws IOException {
      int status = connection.getResponseCode();
      String reason = connection.getResponseMessage();

      if (status < 0) {
        throw new IOException(format("Invalid status(%s) executing %s %s", status,
            connection.getRequestMethod(), connection.getURL()));
      }

      Map<String, Collection<String>> headers = new LinkedHashMap<String, Collection<String>>();
      for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {
        // response message
        if (field.getKey() != null) {
          headers.put(field.getKey(), field.getValue());
        }
      }

      Integer length = connection.getContentLength();
      if (length == -1) {
        length = null;
      }
      InputStream stream;
      if (status >= 400) {
        stream = connection.getErrorStream();
      } else {
        stream = connection.getInputStream();
      }
      return Response.builder()
          .status(status)
          .reason(reason)
          .headers(headers)
          .request(request)
          .body(stream, length)
          .build();
    }
  }
}

4. 处理服务响应

缺省情况下,Spring Cloud应用中feign客户端使用一个ResponseEntityDecoder来解析服务响应 :

package org.springframework.cloud.openfeign.support;

/**
 * Decoder adds compatibility for Spring MVC's ResponseEntity to any other decoder via
 * composition.
 * @author chadjaros
 */
public class ResponseEntityDecoder implements Decoder {

	private Decoder decoder;

	public ResponseEntityDecoder(Decoder decoder) {
		this.decoder = decoder;
	}

	@Override
	public Object decode(final Response response, Type type) throws IOException,
			FeignException {

		if (isParameterizeHttpEntity(type)) {
			type = ((ParameterizedType) type).getActualTypeArguments()[0];
			Object decodedObject = decoder.decode(response, type);

			return createResponse(decodedObject, response);
		}
		else if (isHttpEntity(type)) {
			return createResponse(null, response);
		}
		else {
			return decoder.decode(response, type);
		}
	}

	private boolean isParameterizeHttpEntity(Type type) {
		if (type instanceof ParameterizedType) {
			return isHttpEntity(((ParameterizedType) type).getRawType());
		}
		return false;
	}

	private boolean isHttpEntity(Type type) {
		if (type instanceof Class) {
			Class c = (Class) type;
			return HttpEntity.class.isAssignableFrom(c);
		}
		return false;
	}

	@SuppressWarnings("unchecked")
	private <T> ResponseEntity<T> createResponse(Object instance, Response response) {

		MultiValueMap<String, String> headers = new LinkedMultiValueMap<>();
		for (String key : response.headers().keySet()) {
			headers.put(key, new LinkedList<>(response.headers().get(key)));
		}

		return new ResponseEntity<>((T) instance, headers, HttpStatus.valueOf(response
				.status()));
	}
}

参考文章

注解 @EnableFeignClients 工作原理

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐