一、解决多线程异步Feign调用请求头丢失问题

问题
在微服务中,多线程异步+Feign调用会出现请求头丢失

解决
在主线程中先获取请求头参数
传入子线程中
由子线程将请求头参数设置到上下文中
最后在Feign转发处理中拿到子线程设置的上下文的请求头数据,转发到下游。
获取上下文请求参数工具类

@Slf4j
public class RequestContextUtil {

    /**
     * 获取请求头数据
     *
     * @return key->请求头名称 value->请求头值
     * @author zhengqingya
     * @date 2021/6/30 9:39 下午
     */
    public static Map<String, String> getHeaderMap() {
        Map<String, String> headerMap = Maps.newLinkedHashMap();
        try {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            if (requestAttributes == null) {
                return headerMap;
            }
            HttpServletRequest request = requestAttributes.getRequest();
            Enumeration<String> enumeration = request.getHeaderNames();
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                String value = request.getHeader(key);
                headerMap.put(key, value);
            }
        } catch (Exception e) {
            log.error("《RequestContextUtil》 获取请求头参数失败:", e);
        }
        return headerMap;
    }

}

请求头上下文

@Slf4j
public class RequestHeaderHandler {

    public static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new ThreadLocal<>();

    public static void setHeaderMap(Map<String, String> headerMap) {
        THREAD_LOCAL.set(headerMap);
    }

    public static Map<String, String> getHeaderMap() {
        return THREAD_LOCAL.get();
    }

    public static void remove() {
        THREAD_LOCAL.remove();
    }

}

Feign转发处理rpc调用传参

 */
@Slf4j
@Configuration
public class FeignRequestInterceptor implements RequestInterceptor {

    @Override
    @SneakyThrows
    public void apply(RequestTemplate requestTemplate) {
        log.debug("========================== ↓↓↓↓↓↓ 《FeignRequestInterceptor》 Start... ↓↓↓↓↓↓ ==========================");
        Map<String, String> threadHeaderNameMap = RequestHeaderHandler.getHeaderMap();
        if (!CollectionUtils.isEmpty(threadHeaderNameMap)) {
            threadHeaderNameMap.forEach((headerName, headerValue) -> {
                log.debug("《FeignRequestInterceptor》 多线程 headerName:【{}】 headerValue:【{}】", headerName, headerValue);
                requestTemplate.header(headerName, headerValue);
            });
        }
        Map<String, String> headerMap = RequestContextUtil.getHeaderMap();
        headerMap.forEach((headerName, headerValue) -> {
            log.debug("《FeignRequestInterceptor》 headerName:【{}】 headerValue:【{}】", headerName, headerValue);
            requestTemplate.header(headerName, headerValue);
        });
        log.debug("========================== ↑↑↑↑↑↑ 《FeignRequestInterceptor》 End... ↑↑↑↑↑↑ ==========================");
    }

}

使用案例

@Slf4j
@RestController
@RequestMapping("/web/api/demo/test")
@Api(tags = "测试api")
@AllArgsConstructor
public class RpcController extends BaseController {

    private SystemTaskThread systemTaskThread;

    @GetMapping("getContextUserId")
    @ApiOperation("rpc调用测试 - Async")
    public void getContextUserId() {
        Map<String, String> headerMap = RequestContextUtil.getHeaderMap();
        log.info("主线程请求头值: {}", headerMap.get("userId"));
        this.systemTaskThread.getRequestHeaderUserId(RequestContextUtil.getHeaderMap());
    }

}

@Slf4j
@Component
@AllArgsConstructor
public class SystemTaskThread {

    private ISystemClient systemClient;

    @SneakyThrows
    @Async(ThreadPoolConstant.SMALL_TOOLS_THREAD_POOL)
    public void getRequestHeaderUserId(Map<String, String> headerMap) {
        RequestHeaderHandler.setHeaderMap(headerMap);
        log.info("子线程请求头值: {}", this.systemClient.getRequestHeaderUserId());
    }

}

注:网上也有资料提到在主线程获取请求参数RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();传到子线程中,再重新赋值RequestContextHolder.setRequestAttributes(requestAttributes); 但是这种方式小编尝试无效,顺便记录在这里吧~

本文案例demo源码
https://gitee.com/zhengqingya/small-tools
————————————————
版权声明:本文为CSDN博主「郑清」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_38225558/article/details/118372177

二、Feign客户端异常Incomplete output stream解决方案

1. 加feign-httpclient依赖
没办法,只能用Apache HttpClient替换掉原生HttpURLConnection。加入依赖:

<!-- 使用Apache HttpClient替换Feign原生httpclient -->
<!-- feign-httpclient内含Apache HttpClient -->
        <dependency>
            <groupId>com.netflix.feign</groupId>
            <artifactId>feign-httpclient</artifactId>
            <version>8.17.0</version>
        </dependency>

2. 开启Feign支持httpclient
然后在application.yml中添加如下:

feign:
  httpclient:
    enabled: true

再次重新调用,一切恢复正常。
————————————————
版权声明:本文为CSDN博主「Thinkingcao」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Thinkingcao/article/details/109161139

三、SpringBoot+Vue 后端异步多线程加前端实时更新的实现

SpringBoot+Vue 后端异步多线程加前端实时更新的实现
Spring Boot在一些场景下需要用户发起请求服务器执行耗时任务(例如批量发送邮件),此时如果让用户一直等待任务执行,明显不合理,而且会很容易超时,这个时候正确的做法是后端采取异步多线程的方式执行任务,并保持任务执行的进度状态,前端使用定时器获取任务执行状态进行持续更新直到任务完成。具体的实现方法

1、后端异步多线程实现
一、springboot开启异步支持
在application入口类添加@EnableAsync注解

使用配置类为springboot配置一个线程池,此处必须实现AsyncConfigurer接口,并使用@Configuration进行注解

@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
    /**
     * 核心线程数
     */
//    @Value("${threadPool.corePoolSize}")
    private int corePoolSize = 10;
    /**
     * 最大线程数
     */
    private int maxPoolSize = 50;
    /**
     * 线程池缓冲队列容量
     */
    private int queueCapacity = 10;
    /**
     * 空闲线程销毁前等待时长
     */
    private int awaitTerminationSeconds = 10;
    /**
     * 线程名前缀
     */
    private String threadNamePrefix = "txzmap-tile-download";

    /**
     * ThreadPoolTaskExcutor运行原理
     * 当线程池的线程数小于corePoolSize,则新建线程入池处理请求
     * 当线程池的线程数等于corePoolSize,则将任务放入Queue中,线程池中的空闲线程会从Queue中获取任务并处理
     * 当Queue中的任务数超过queueCapacity,则新建线程入池处理请求,但如果线程池线程数达到maxPoolSize,将会通过RejectedExecutionHandler做拒绝处理
     * 当线程池的线程数大于corePoolSize时,空闲线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
     */

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(corePoolSize);
        threadPool.setMaxPoolSize(maxPoolSize);
        threadPool.setQueueCapacity(queueCapacity);
        threadPool.setAwaitTerminationSeconds(awaitTerminationSeconds);
        threadPool.setThreadNamePrefix(threadNamePrefix);

        //关机时,是否等待任务执行完
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        //设置拒绝策略
        //CALLER_RUNS:由调用者所在的线程执行该任务
        threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        //初始化线程
        threadPool.initialize();
        return threadPool.getThreadPoolExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

二、编写具体的任务方法
在具体的任务方法上使用@Async进行注解,表示该方法需要异步执行,这里没有指定线程池则为上面已经配置的默认线程池,如果有多个线程池可以通过@Async的value属性指定线程池

@Service
@Scope("prototype")
public class MailSendService {
    @Autowired
    TaskInfoList taskInfoList;
    @Async
    public void sendMail(String[] mails) throws InterruptedException {
        System.out.println("任务开始");        
        TaskInfoList.TaskInfo task = new TaskInfoList.TaskInfo();
        int size = taskInfoList.addNewTask(task);
        int process=0;
        int total=mails.length;
        for (int i = 0; i <= total; i++) {
           //发送邮件
                process++;
                doSendMail();
                System.out.println(Thread.currentThread().getName() + "Mission is running at " + process + "/" + total);
                task.setProcess(process);
                task.setStatus(TaskInfoList.TaskStatus.RUNNING);
                taskInfoList.update(task);            
        }
        System.out.println("mission completed!");
        //更新任务状态信息
        task.setStatus(TaskInfoList.TaskStatus.FINISHED);
        task.setEndTime(System.currentTimeMillis());
        task.setUrl("finished");
        taskInfoList.update(task);
    }
}

三、任务状态保持
为了记录当前任务的执行状态(耗时,完成进度,状态)等信息,就需要一个全局的方法对任务的状态进行维护,这里可以选用redis或者是一个简单的管理类来实现。由于项目较小,就没有必要为了这个功能而上Redis,如果项目已经有Redis了那么可考虑使用Redis,这里我们采用后者,编写一个简单的任务状态维护类。上代码

/**
 * 任务信息管理
 */
@Component
public class TaskInfoList {
    public enum TaskStatus {
        START, RUNNING, FINISHED
    }
    //任务信息类
    public static class TaskInfo {
        String uuid;
        String name = "";
        Long startTime;
        Long endTime;
        TaskStatus status;
        Integer total = 0;
        Integer process = 0;
        Integer size = 0;
        //最后生成的文件存放的地址
        String url = "";
        //省略getter setter
    }

    public static Map<String, TaskInfo> taskList = new HashMap<>();

    /**
     * 添加新任务并返回当前所有任务的总数
     *
     * @param task
     * @return
     */
    public Integer addNewTask(TaskInfo task) {
        taskList.put(task.getUuid(), task);
        return taskList.size();
    }

    /**
     * 更新任务状态
     *
     * @param task
     */
    public void update(TaskInfo task) {
        taskList.put(task.getUuid(), task);
    }
    /**
     * 根据uuid移除相关的任务信息
     *
     * @param task
     */
    public void remove(TaskInfo task) {
        taskList.remove(task.getUuid());
    }
    /**
     * 根据uuid查询
     *
     * @param uuid
     * @return
     */
    public TaskInfo search(String uuid) {
        return taskList.get(uuid);
    }

    //系统初始化的时候
    @PostConstruct
    private void init() {
    }

    //系统结束运行的时候
    @PreDestroy
    public void destroy() {
        //系统运行结束
        taskList.clear();
    }
}


具体状态维护业务代码可见上文中的业务代码。

四、Controller业务实现
这里需要实现两个接口

用于用户提交任务信息的接口

    @PostMapping("/sendMail")
    public MyResult sendMail(Integer cid) {
        //生成taskId
        String taskId = UUID.randomUUID().toString();
        String[] mails=mailService.getByCid(cid);
        try {
            MailSendService.sendMail(String[] mails);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return new MyResult(0, "创建失败" + e.getMessage());
        }
        //返回rediskey 然后用户可以循环查询
        return new MyResult(1, "任务添加成功!", taskId);
    }

最后将该任务的uuid返回给用户便于用户查询

提供任务状态查询的接口

@GetMapping("/ps")
    public TaskInfoList.TaskInfo updateProcessStatus(@RequestParam(value = "id", required = true) String taskId) {
        return taskInfoList.search(taskId);
    }

2、vue前端进度更新的实现
用一个map来维护当前的多任务状态

 data: {taskMap:new Map()}
1
任务添加提交

 this.post(url, p, data => {
                    if (data.code == 1) {
                        this.$message.success('任务添加成功!');
                        //获取到任务id
                        let id = data.data;
                        this.taskMap.set('id',data);
           }
   });

开启定时器持续获取所有任务的状态直到任务完成

this.timer = setInterval(() => {
                        this.taskMap.forEach((value, key, map) => {
                            //如果该任务还在进行中 则继续获取状态 
                            if (value.status != 'FINISHED') {
                                this.get(url + key, data => {
                                    console.log(data);
                                    //更新
                                    map.set(key, data);
                                    this.$forceUpdate();
                                });
                            }
                        });
                    }, 1000);

剩下的就是相关的view层的更新,具体实现很简单,大家根据项目情况进行修改就行,这里不赘述。
————————————————
版权声明:本文为CSDN博主「35岁还能战几年」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/jayhuang44754019/article/details/124236284

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐