Feign接口 多线程问题
Spring Cloud Feign传输Header,并保证多线程情况下也适用一、现象微服务在生产中,常遇到需要把 header 传递到下一子服务的情况(如服务A访问服务B的接口,需要传递header),网上大多数的方案是实现 RequestInterceptor 接口,在重写方法中,把 header 填进 Feign 的请求中。我们先按这种方式,简单实现代码如下:1、继承RequestInter
Spring Cloud Feign传输Header,并保证多线程情况下也适用
一、现象
微服务在生产中,常遇到需要把 header 传递到下一子服务的情况(如服务A访问服务B的接口,需要传递header),网上大多数的方案是实现 RequestInterceptor 接口,在重写方法中,把 header 填进 Feign 的请求中。我们先按这种方式,简单实现代码如下:
1、继承RequestInterceptor
服务A新建类,继承 RequestInterceptor,把 header 设置到请求中,注意 header 的key若是大写时,请求中一般会被转为小写,所以建议header的key一般设置为小写。
package com.he.feign.config;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
/**
* <b>@Desc</b>: 1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@Configuration
public class FeignConfig implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
//当主线程的请求执行完毕后,Servlet容器会被销毁当前的Servlet,因此在这里需要做判空
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
//不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写
if (name.equals("feignheader")) {
requestTemplate.header(name,request.getHeader(name));
}
}
}
}
}
2、修改 hystrix 的隔离策略为 semaphore
RequestContextHolder.getRequestAttributes()方法,实际上是从ThreadLocal变量中取得相应信息的。hystrix断路器的默认隔离策略为THREAD,该策略是无法取得ThreadLocal值的,所以需要修改hystrix的隔离策略,一般是改为[semaphore],在服务A中的 yml 新增配置如下#2、hystrix 的隔离策略改为 SEMAPHORE
hystrix:
command:
default:
execution:
timeout:
enable: true
isolation:
strategy: SEMAPHORE
thread:
timeoutInMilleseconds: 60000
3、客户端A的测试代码
3.1、服务A的controller接口
package com.he.feign.controller;
import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* <b>@Desc</b>: 测试
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {
@Autowired
private HeaderFeign headerFeign;
@Autowired
private HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test
@GetMapping("/main_thread")
public String mainThread() {
String resp = headerFeign.test();
log.info("resp: {}", resp);
return resp;
}
@GetMapping("/sub_thread")
public void subThread() {
new Thread(() -> {
String resp = headerFeign.test();
log.info("resp: {}", resp);
}).start();
}
@GetMapping("/sub_thread/block")
public String subThreadBlock() {
//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet容器不会销毁HttpServletRequest,
//所以请求属性还保存在请求链路中,能被传递下去
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());
String resp = null;
try {
resp = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
log.info("resp: ", resp);
return resp;
}
}
3.2、Feign类
feignclient的注解可以省略configuration配置,即configuration = FeignConfig.class可不声明
package com.he.feign.feign;
import com.he.feign.feign.hystrix.HeaderFeignFallback;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;/**
* <b>@Desc</b>: TODO
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
//@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class,configuration = FeignConfig.class)//可以省略configuration配置
@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class)
public interface HeaderFeign {
@GetMapping("/test")
String test();
}
package com.he.feign.feign.hystrix;
import com.he.feign.feign.HeaderFeign;
import org.springframework.stereotype.Component;
/**
* <b>@Desc</b>: TODO
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/1
* <b>@Modify</b>:
*/
@Component
public class HeaderFeignFallback implements HeaderFeign {
@Override
public String test() {
return null;
}
}
4、服务端B的接口代码
package com.he.eurekaclient.controller;
import com.he.eurekaclient.feign.HelloFeign;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
/**
* <b>@Desc</b>: 测试header传递
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@RequestMapping("/header")
@RestController
public class HeaderController {
@Value("${spring.application.name}")
private String appName;
@Autowired
private HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignHeader-test
@GetMapping("/test")
public String test() {
StringBuffer sb = new StringBuffer("hello from ").append(appName).append("\n");
StringBuffer requestURL = servletRequest.getRequestURL();
sb.append("requestURL: ").append(requestURL).append("\n");
boolean isContain = false;
sb.append("headers: \n");
Enumeration<String> headerNames = servletRequest.getHeaderNames();//header的name都是小写
while (headerNames.hasMoreElements()){
String headername = headerNames.nextElement();
String headerValue = servletRequest.getHeader(headername);
sb.append(headername).append("-").append(headerValue).append("\n");
if (headername.equals("feignheader")) isContain = true;
}
if (!isContain) {
sb.append("--error--").append("not contain required header!");
}
return sb.toString();
}
}
5、启动服务,在postman中测试如下
5.1、调用接口 http://localhost:8060/test_header/main_thread,结果如下
5.2、调用接口 http://localhost:8060/test_header/sub_thread ,结果如下
5.3、调用 http://localhost:8060/test_header/sub_thread/block,结果如下
从5.1 – 5.3的查询结果,可以得到结论
经过上述的配置后,用户线程(主线程)中调用非feign请求,可把header传递到服务B中;
若在用户线程(主线程)中启动子线程,并在子线程中调用feign请求,header传递不到服务B中;
即是子线程最终异步转同步阻塞等待结果,header仍传递不到服务B中。
二、网络上大多数的解决方案
出现上面的原因, 主要是 RequestAttributes 默认不是线程共享的;主线程调用子线程时,没把 RequestAttributes 共享给子线程。因此,只要在主线程调用其他线程前将RequestAttributes对象设置为子线程共享,就能把header等信息传递下去。
1、因此,网络上大多数的解决方案如下,在主线程调用子线程前,增加下面配置
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承,线程共享
1
修改后的代码如下
package com.he.feign.controller;
import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* <b>@Desc</b>: 测试
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {
@Autowired
private HeaderFeign headerFeign;
@Autowired
private HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test
@GetMapping("/main_thread")
public String mainThread() {
String resp = headerFeign.test();
log.info("resp: {}", resp);
return resp;
}
@GetMapping("/sub_thread")
public void subThread() {
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承
new Thread(() -> {
String resp = headerFeign.test();
log.info("resp: {}", resp);
}).start();
}
@GetMapping("/sub_thread/block")
public String subThreadBlock() {
//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());
String resp = null;
try {
resp = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
log.info("resp: ", resp);
return resp;
}
}
2、重新启动服务A,再次调用两个带子线程的接口,现象如下
调用 http://localhost:8060/test_header/sub_thread/block,结果如下
调用接口 http://localhost:8060/test_header/sub_thread ,结果如下
测试结果,有以下两种现象
在主线程get()阻塞等待子线程执行完毕时,每次请求都成功;
主线程直接启动子线程,且执行完自己逻辑后便结束不需理会子线程结果的,请求偶尔成功, 偶尔失败;
这是为什么呢,作者认为主要是以下原因
Servlet容器中Servlet属性生命周期与接收请求的用户线程(父线程)同步, 随着父线程执行完destroy()而销毁;
子线程虽然可以从父线程共享信息中获得了请求属性,但这个属性由父线程维护
当父线程比子线程执行完慢时,请求属性还在,子线程请求成功;当快时,请求属性随着父线程结束而销毁,子线程的请求属性变为null,请求失败。
由此可见,简单的设置 RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);
在多线程情况下, 并非是一劳永逸的。
三、作者的解决方案
针对上面的问题,及问题根本原因,我们团队的解决方案仍是使用 ThreadLocal,进行线程间的变量共享通信。
1、新建 ThreadLocalUtil
package com.he.feign.thread;
import java.util.HashMap;
import java.util.Map;
/**
* <b>@Desc</b>: 线程共享
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/22
* <b>@Modify</b>:
*/
public class ThreadLocalUtil {
//使用InheritableThreadLocal,使得共享变量可被子线程继承
private static final InheritableThreadLocal<Map<String,String>> headerMap = new InheritableThreadLocal<Map<String, String>>(){
@Override
protected Map<String, String> initialValue() {
return new HashMap<>();
}
};
public static Map<String,String> get(){
return headerMap.get();
}
public static String get(String key) {
return headerMap.get().get(key);
}
public static void set(String key, String value){
headerMap.get().put(key,value);
}
}
2、修改服务A 的接口 TestHeaderController
package com.he.feign.controller;
import com.he.feign.feign.HeaderFeign;
import com.he.feign.thread.ThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* <b>@Desc</b>: 测试
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {
@Autowired
private HeaderFeign headerFeign;
@Autowired
private HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test
@GetMapping("/main_thread")
public String mainThread() {
String resp = headerFeign.test();
log.info("resp: {}", resp);
return resp;
}
@GetMapping("/sub_thread")
public void subThread() {
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承
Enumeration<String> headerNames = servletRequest.getHeaderNames();
while (headerNames.hasMoreElements()){
String name = headerNames.nextElement();
if (Objects.equals(name,"feignheader")){
ThreadLocalUtil.set(name,servletRequest.getHeader(name));
}
}
new Thread(() -> {
new Thread(() -> {
new Thread(() -> {
String resp = headerFeign.test();
log.info("resp: {}", resp);
}).start();
}).start();
}).start();
}
@GetMapping("/sub_thread/block")
public String subThreadBlock() {
//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承
Enumeration<String> headerNames = servletRequest.getHeaderNames();
while (headerNames.hasMoreElements()){
String name = headerNames.nextElement();
if (Objects.equals(name,"feignheader")){
ThreadLocalUtil.set(name,servletRequest.getHeader(name));
}
}
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());
String resp = null;
try {
resp = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
log.info("resp: ", resp);
return resp;
}
}
3、修改服务A的 FeignConfig
package com.he.feign.config;
import com.he.feign.thread.ThreadLocalUtil;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* <b>@Desc</b>: 1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@Slf4j
@Configuration
public class FeignConfig implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
// ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
// //当主线程的请求执行完毕后,Servlet会被销毁,因此在这里需要做判空
// if (attributes != null) {
// HttpServletRequest request = attributes.getRequest();
//
// Enumeration<String> headerNames = request.getHeaderNames();
//
// while (headerNames.hasMoreElements()) {
// String name = headerNames.nextElement();
// //不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写
// if (name.equals("feignheader")) {
// requestTemplate.header(name,request.getHeader(name));
// }
// }
// }
//读取设置的header信息,传递到下一个服务
Map<String, String> headerMap = ThreadLocalUtil.get();
for (String key : headerMap.keySet()) {
log.info("--从ThreadLocal获取消息头传递到下一个服务:key-[{}],value-[{}]",key,headerMap.get(key));
requestTemplate.header(key,headerMap.get(key));
}
}
}
4、重启服务A,测试结果如下
4.1、连续调用 http://localhost:8060/test_header/sub_thread 接口,日志打印如下
2020-06-22 23:18:23.658 INFO 18236 --- [ Thread-131] com.he.feign.config.FeignConfig : --从ThreadLocal获取消息头传递到下一个服务:key-[feignheader],value-[test]
2020-06-22 23:18:23.662 INFO 18236 --- [ Thread-131] c.h.f.controller.TestHeaderController : resp: hello from eureka-client
requestURL: http://192.168.56.1:8200/header/test
headers:
feignheader-test
accept-*/*
user-agent-Java/1.8.0_162
host-192.168.56.1:8200
connection-keep-alive
结合执行日志可知,header信息通过feign成功传递到下一个服务,而且不再出现偶尔失败的情况!
4.2、连续调用接口 http://localhost:8060/test_header/sub_thread/block
综上可见,真正解决从网关或者上层链路,把header经过feign传递到另一个服务,既要配置feign,也需要结合threadlocal。
下一步的优化,可设置拦截器或者切面,把header信息统一设置到threadlocal中。
package com.he.feign.config;
import com.he.feign.thread.ThreadLocalUtil;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Enumeration;
import java.util.Objects;
/**
* <b>@Desc</b>: 拦截器
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/22
* <b>@Modify</b>:
*/
public class MyInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//拦截请求,设置header到ThreadLocal中
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()){
String name = headerNames.nextElement();
if (Objects.equals(name,"feignheader")){
ThreadLocalUtil.set(name,request.getHeader(name));
}
}
return true;
}
}
package com.he.feign.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
/**
* <b>@Desc</b>: web配置
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/25
* <b>@Modify</b>:
*/
@Configuration
public class WebConfig extends WebMvcConfigurationSupport {
@Override
protected void addInterceptors(InterceptorRegistry registry) {
//添加自定义的拦截器
registry.addInterceptor(new MyInterceptor()).addPathPatterns("/**");
}
}
TestHeaderController修改如下
package com.he.feign.controller;
import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* <b>@Desc</b>: 测试
* <b>@Author</b>: hesh
* <b>@Date</b>: 2020/6/21
* <b>@Modify</b>:
*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {
@Autowired
private HeaderFeign headerFeign;
@Value("${server.port}")
private String port;
@Autowired
private HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test
@GetMapping("/main_thread")
public String mainThread() {
String resp = headerFeign.test();
log.info("resp: {}", resp);
return resp;
}
@GetMapping("/sub_thread")
public void subThread() {
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承
// Enumeration<String> headerNames = servletRequest.getHeaderNames();
// while (headerNames.hasMoreElements()){
// String name = headerNames.nextElement();
// if (Objects.equals(name,"feignheader")){
// ThreadLocalUtil.set(name,servletRequest.getHeader(name));
// }
// }
new Thread(() -> {
new Thread(() -> {
new Thread(() -> {
String resp = headerFeign.test();
log.info("resp: {}", resp);
}).start();
}).start();
}).start();
}
@GetMapping("/sub_thread/block")
public String subThreadBlock() {
//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承
// Enumeration<String> headerNames = servletRequest.getHeaderNames();
// while (headerNames.hasMoreElements()){
// String name = headerNames.nextElement();
// if (Objects.equals(name,"feignheader")){
// ThreadLocalUtil.set(name,servletRequest.getHeader(name));
// }
// }
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());
String resp = null;
try {
resp = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
log.info("resp: {}", resp);
return resp;
}
}
以上,便是作者针对spring cloud feign 传递 header 信息在多线程情况下失败问题的解决方式,若有错误请指正,欢迎交流指导。
————————————————
版权声明:本文为CSDN博主「HE-RUNNING」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
以上是刚开始使用的文章,但是随着使用线程池,就出现了问题,由于线程池是把线程回收,不是新建,就出现了在变量传递的时候,下次取到线程是从上一次父线程提供的共享变量导致了变量错乱问题。经过研究 阿里的解决方案出现在眼前
加入以下pom依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.2.0</version>
</dependency>
转载改造hystrix线程池方法:
改造线程池方式
上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:
在启动之前调用进行注册自定义实现的逻辑:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());
ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。
我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.PlatformSpecific;
public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()
+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize
+ ". Maximum size will be set to " + dynamicCoreSize
+ ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit,
workQueue, threadFactory);
} else {
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit,
workQueue, threadFactory);
}
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties
.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()
+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize
+ ". Maximum size will be set to " + dynamicCoreSize
+ ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime,
TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime,
TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES,
workQueue, threadFactory);
}
}
private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,
"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
} else {
return PlatformSpecific.getAppEngineThreadFactory();
}
}
}
ThreadLocalThreadPoolExecutor的代码:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>();
public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(Runnable command) {
super.execute(TtlRunnable.get(command));
}
}
启动时加入插件
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());
使用方法:调用feign client服务之前,设置线程变量
ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);
在FeignAuthConfiguration里,调用appTokenHolder.get();之前加入设置租户id
Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get();
DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);
使用线程变量三种方式测试:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// testThreadLocal1();
// testThreadLocal2();
testThreadLocal3();
}
private static void testThreadLocal1() throws InterruptedException, ExecutionException {
final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < 20; i++) {
local.set(i + "");
System.out.println(local.get());
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + local.get());
local.set(null);
}
});
future.get();
System.out.println(local.get());
local.set(null);
}
}
private static void testThreadLocal2() throws InterruptedException, ExecutionException {
ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < 20; i++) {
local.set(i + "");
System.out.println(local.get());
Future<?> future = executorService.submit(new ParamRunnable(i + ""));
future.get();
System.out.println(local.get());
local.set(null);
}
}
private static void testThreadLocal3() throws InterruptedException, ExecutionException {
final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < 20; i++) {
context.set(i + "");
System.out.println(context.get());
Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + context.get());
context.set(null);
}
}));
future.get();
System.out.println(context.get());
context.set(null);
}
}
private static class ParamRunnable implements Runnable {
private String param;
public ParamRunnable(String param) {
this.param = param;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + param);
}
}
}
原文链接:https://blog.csdn.net/weishaoqi2/article/details/106964787
更多推荐
所有评论(0)