Java 并行任务执行框架实践:优雅处理多个接口并发调用并统一返回结果
前言
平时做业务开发的时候,经常会遇到这样一种场景:
一个接口里要同时处理多个任务,或者同时调用多个下游接口,等它们都执行完之后,再把结果统一组装返回。
这种场景如果全都串行执行,接口耗时通常会比较长。尤其是多个任务彼此之间没有依赖关系的时候,串行就会显得很浪费。
所以比较自然的思路就是:把独立任务并行执行,最后再统一收结果。
不过问题也来了,如果每次都手写线程池、提交任务、拿 Future、处理超时、处理异常、关闭线程池,代码很快就会变得很乱。
所以我在项目里把这一套逻辑单独封装了一层,做成了一个可以复用的并行执行组件。
这篇文章就结合 poolTest 下面的真实代码,聊一下这套方案是怎么设计的。
一、业务场景
这套代码对应的场景,其实挺典型的:
需要同时并行处理多个任务,最后统一返回结果。
在示例类 MetSmoothServiceExample 里,处理的不是普通的“查用户、查订单”,而是更贴近当前业务的一个例子:
对多个指标同时做平滑计算,比如:
- o2
- co2
- ch4
- nh4
- so2
- h2s
- wvp
这些指标之间互不依赖,所以非常适合并行执行。
如果按串行方式一个一个算,总耗时就是每个指标计算时间累加;但如果改成并行,总耗时通常就取决于最慢的那个任务。
这也是并行处理最直接的价值。
二、整体思路
我这里的做法并不复杂,核心就是把并行执行过程中那些重复代码统一收起来。
整体分成 4 步:
- 先把每个要执行的任务封装成统一对象
- 交给并行执行器统一提交到线程池
- 等所有任务执行完之后收集结果
- 如果中间有异常、超时或中断,就统一处理
这样一来,业务层就只需要关心一件事:
我要并发执行哪些任务。
至于线程池怎么建、任务怎么提、异常怎么包、超时怎么控,这些都交给底层组件处理。
三、代码结构说明
这套组件主要由下面几个类组成:
- QueueType
- RejectedPolicy
- ParallelExecutionException
- ThreadTaskException
- SmoothTask
- ParallelSmoothExecutor
- ThreadPoolBuilder
- ThreadPoolUtil
- MetSmoothServiceExample
下面按顺序简单说一下每个类是干什么的。
1. QueueType:定义线程池队列类型
这个枚举类主要是把线程池用什么阻塞队列抽出来了,当前支持:
- LINKED_BLOCKING_QUEUE
- ARRAY_BLOCKING_QUEUE
- SYNCHRONOUS_QUEUE
这么做的好处是,线程池的行为就不是写死的,而是可以根据业务场景调整。
比如:
- 想让任务先排队,可以用阻塞队列
- 想严格限制堆积量,可以用固定容量队列
- 想直接交给线程处理,不做缓存,也可以单独配
示例:
package com.ruoyi.system.poolTest.Enum;
/**
* 线程池可选的队列类型。
*/
public enum QueueType {
/**
* 默认队列,任务先入队,再等空闲线程处理。
*/
LINKED_BLOCKING_QUEUE,
/**
* 固定容量队列,适合希望控制积压量的场景。
*/
ARRAY_BLOCKING_QUEUE,
/**
* 不排队,任务提交后直接交给线程处理。
*/
SYNCHRONOUS_QUEUE
}
2. RejectedPolicy:定义线程池拒绝策略
线程池满了以后,后续任务怎么处理,这块也很关键。
这里统一封装了 4 种拒绝策略:
- ABORT
- CALLER_RUNS
- DISCARD
- DISCARD_OLDEST
当前这套代码默认用的是 CALLER_RUNS,也就是任务不直接丢掉,而是回退到提交任务的线程自己执行。
这种策略在很多业务场景里都比较稳,因为它至少不会悄悄吞任务。
package com.ruoyi.system.poolTest.Enum;
/**
* 线程池任务被拒绝时的处理方式。
*/
public enum RejectedPolicy {
/**
* 直接拒绝,并抛出异常。
*/
ABORT,
/**
* 由提交任务的线程自己执行。
*/
CALLER_RUNS,
/**
* 直接丢掉当前任务。
*/
DISCARD,
/**
* 丢掉队列里最早的任务,再尝试提交当前任务。
*/
DISCARD_OLDEST
}
3. ParallelExecutionException:并行执行统一异常
这个异常类主要负责兜住“整批任务执行过程中的异常”。
比如:
- 某个任务执行失败
- 某个任务执行超时
- 等待结果时线程被中断
最终都会包装成 ParallelExecutionException 往外抛。
这样业务层就不用关心底层到底抛了多少种并发异常,统一接这一种就够了。
package com.ruoyi.system.poolTest.exception;
/**
* 并行执行过程中抛出的统一异常。
* @author 吴顺杰
*/
public class ParallelExecutionException extends RuntimeException{
/**
* 带上原始异常一起抛出,方便排查。
*/
public ParallelExecutionException(String message, Throwable cause) {
super(message, cause);
}
}
4. ThreadTaskException:单任务异常包装
这个异常类是更细一层的封装,它针对的是“单个线程任务执行失败”。
在 ThreadPoolUtil 里,不管你提交的是 Runnable 还是 Callable,最终都会先包一层,如果任务内部抛异常,就统一转成 ThreadTaskException。
这样做主要有两个好处:
- 异常语义统一
- 不容易把线程里的原始异常漏掉
package com.ruoyi.system.poolTest.exception;
/**
* 线程任务执行时抛出的运行时异常。
*/
public class ThreadTaskException extends RuntimeException {
/**
* 用原始异常包装线程任务失败原因。
*/
public ThreadTaskException(String message, Throwable cause) {
super(message, cause);
}
}
5. SmoothTask:任务抽象
这个类我觉得是整个设计里很实用的一层。
它把一个任务统一抽象成两部分:
- 任务名
- 执行逻辑
定义方式也很简单:
SmoothTask.of("o2", () -> smoothPolynomial(data.getO2List(), 19, 1))
相比直接传一堆匿名线程任务,这种写法的好处很明显:
- 每个任务都有名字,结果更好对应
- 代码可读性更好
- 后面如果要扩展任务属性,也更方便
尤其这套代码最终返回的是 Map<String, T>,所以任务名不只是“好看”,它本身就是结果映射的 key。
package com.ruoyi.system.poolTest.executor;
import java.util.Objects;
import java.util.concurrent.Callable;
/**
* 单个平滑任务,包含任务名和执行逻辑。
*/
public final class SmoothTask<T> {
private final String name;
private final Callable<T> action;
/**
* 保存任务名和任务逻辑。
*/
private SmoothTask(String name, Callable<T> action) {
this.name = Objects.requireNonNull(name, "任务名称不能为空");
this.action = Objects.requireNonNull(action, "任务执行逻辑不能为空");
}
/**
* 创建一个新的平滑任务。
*/
public static <T> SmoothTask<T> of(String name, Callable<T> action) {
return new SmoothTask<T>(name, action);
}
/**
* 返回任务名。
*/
public String getName() {
return name;
}
/**
* 运行当前任务。
*/
public T execute() throws Exception {
return action.call();
}
}
6. ParallelSmoothExecutor:并行执行核心类
这个类就是整套方案的核心。
它主要干了几件事:
- 校验任务是否合法
- 校验任务名是否为空、是否重复
- 把所有任务提交到线程池
- 按任务名收集结果
- 如果有任务失败、超时或中断,就取消整批任务
- 最后统一关闭线程池
这里有几个细节挺值得说一下。
第一,任务结果是按 LinkedHashMap 收集的,这样结果顺序和任务定义顺序一致,排查问题的时候会更舒服。
第二,任务名做了重复校验,这样可以避免两个任务用了同一个名字,最后结果被覆盖。
第三,只要中间有一个任务出问题,就会触发 cancelAll(...),把剩余任务取消掉,避免继续空耗资源。
第四,执行结束后会在 finally 里关闭线程池,这样不会留下线程资源泄漏问题。
如果用一句话概括这个类的作用,可以这么说:
它负责把一批任务平稳地并行跑完,并且把异常、超时、取消和关闭这些事都处理掉。
package com.ruoyi.system.poolTest.executor;
import com.ruoyi.system.poolTest.Enum.RejectedPolicy;
import com.ruoyi.system.poolTest.exception.ParallelExecutionException;
import com.ruoyi.system.poolTest.util.ThreadPoolUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 并行执行多个平滑任务,并按任务名收集结果。
*/
public final class ParallelSmoothExecutor<T> {
private final ThreadPoolExecutor executor;
private final long timeout;
private final TimeUnit timeoutUnit;
/**
* 按线程数直接创建一个执行器。
*/
public ParallelSmoothExecutor(String poolName, int threadCount, long timeout, TimeUnit timeoutUnit) {
this(
ThreadPoolUtil.newBuilder(poolName)
.corePoolSize(threadCount)
.maxPoolSize(threadCount)
.queueCapacity(Math.max(threadCount * 4, 16))
.rejectionPolicy(RejectedPolicy.CALLER_RUNS)
.build(),
timeout,
timeoutUnit
);
}
/**
* 使用现成线程池创建执行器。
* 调用 execute 后会一并关闭这个线程池。
*/
public ParallelSmoothExecutor(ThreadPoolExecutor executor, long timeout, TimeUnit timeoutUnit) {
this.executor = Objects.requireNonNull(executor, "线程池不能为空");
this.timeout = timeout;
this.timeoutUnit = Objects.requireNonNull(timeoutUnit, "超时单位不能为空");
}
/**
* 并行跑完所有任务,并把结果按任务名装回 Map。
*/
public Map<String, T> execute(List<SmoothTask<T>> tasks) {
if (tasks == null || tasks.isEmpty()) {
return Collections.emptyMap();
}
validateTaskNames(tasks);
Map<String, Future<T>> futureMap = new LinkedHashMap<String, Future<T>>(tasks.size());
try {
for (SmoothTask<T> task : tasks) {
futureMap.put(task.getName(), ThreadPoolUtil.submit(executor, task::execute));
}
Map<String, T> resultMap = new LinkedHashMap<String, T>(tasks.size());
for (Map.Entry<String, Future<T>> entry : futureMap.entrySet()) {
try {
resultMap.put(entry.getKey(), entry.getValue().get(timeout, timeoutUnit));
} catch (InterruptedException ex) {
cancelAll(futureMap);
Thread.currentThread().interrupt();
throw new ParallelExecutionException("并行平滑任务被中断", ex);
} catch (ExecutionException ex) {
cancelAll(futureMap);
throw new ParallelExecutionException("并行平滑任务执行失败,任务名:" + entry.getKey(), ex.getCause());
} catch (TimeoutException ex) {
cancelAll(futureMap);
throw new ParallelExecutionException("并行平滑任务执行超时,任务名:" + entry.getKey(), ex);
}
}
return resultMap;
} finally {
ThreadPoolUtil.shutdownGracefully(executor);
}
}
/**
* 检查任务对象和任务名,顺手挡掉空值和重名。
*/
private void validateTaskNames(List<SmoothTask<T>> tasks) {
List<String> names = new ArrayList<String>(tasks.size());
for (SmoothTask<T> task : tasks) {
if (task == null) {
throw new IllegalArgumentException("平滑任务不能为空");
}
String name = task.getName();
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("平滑任务名称不能为空");
}
if (names.contains(name)) {
throw new IllegalArgumentException("平滑任务名称重复:" + name);
}
names.add(name);
}
}
/**
* 出错时取消所有还没结束的任务。
*/
private void cancelAll(Map<String, Future<T>> futureMap) {
for (Future<T> future : futureMap.values()) {
future.cancel(true);
}
}
}
7. ThreadPoolBuilder:线程池构建器
如果直接手写 new ThreadPoolExecutor(...),参数一多,可读性会很差。
所以这里又单独封装了一个 ThreadPoolBuilder,支持链式设置参数,比如:
- 核心线程数
- 最大线程数
- 存活时间
- 队列类型
- 队列容量
- 拒绝策略
- 是否守护线程
- 是否允许核心线程超时
- 是否预热线程
这种方式最大的好处就是清楚。
尤其在线程池这种参数稍微多一点就容易写乱的地方,Builder 模式会舒服很多。
另外它还做了参数校验和线程命名,比如线程名会长成:
met-smooth-1 met-smooth-2 met-smooth-3
线上排查问题的时候,这种命名非常有帮助。
package com.ruoyi.system.poolTest.util;
import com.ruoyi.system.poolTest.Enum.QueueType;
import com.ruoyi.system.poolTest.Enum.RejectedPolicy;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程池构建器,用来统一组装线程池参数。
*/
public class ThreadPoolBuilder {
private final String poolName;
private int corePoolSize = ThreadPoolUtil.CPU_CORES;
private int maxPoolSize = ThreadPoolUtil.CPU_CORES;
private long keepAliveSeconds = ThreadPoolUtil.DEFAULT_KEEP_ALIVE_SECONDS;
private boolean daemon = false;
private boolean allowCoreThreadTimeOut = false;
private boolean prestartAllCoreThreads = true;
private int queueCapacity = ThreadPoolUtil.DEFAULT_QUEUE_CAPACITY;
private QueueType queueType = QueueType.LINKED_BLOCKING_QUEUE;
private RejectedPolicy rejectedPolicy = RejectedPolicy.CALLER_RUNS;
/**
* 指定线程池名称,后面创建线程时会用到这个前缀。
*/
public ThreadPoolBuilder(String poolName) {
if (poolName == null || poolName.trim().isEmpty()) {
throw new IllegalArgumentException("线程池名称不能为空");
}
this.poolName = poolName.trim();
}
/**
* 设置核心线程数。
*/
public ThreadPoolBuilder corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
/**
* 设置最大线程数。
*/
public ThreadPoolBuilder maxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
return this;
}
/**
* 设置非核心线程空闲后的存活时间,单位为秒。
*/
public ThreadPoolBuilder keepAliveSeconds(long keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
return this;
}
/**
* 设置核心线程是否允许超时回收。
*/
public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
return this;
}
/**
* 设置创建完成后是否预热全部核心线程。
*/
public ThreadPoolBuilder prestartAllCoreThreads(boolean prestartAllCoreThreads) {
this.prestartAllCoreThreads = prestartAllCoreThreads;
return this;
}
/**
* 设置任务队列类型。
*/
public ThreadPoolBuilder queueType(QueueType queueType) {
this.queueType = Objects.requireNonNull(queueType, "队列类型不能为空");
return this;
}
/**
/**
* 设置线程是否按守护线程创建。
*/
public ThreadPoolBuilder daemon(boolean daemon) {
this.daemon = daemon;
return this;
}
/**
* 设置任务队列容量。
*/
public ThreadPoolBuilder queueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}
/**
* 设置任务拒绝策略。
*/
public ThreadPoolBuilder rejectionPolicy(RejectedPolicy rejectedPolicy) {
this.rejectedPolicy = Objects.requireNonNull(rejectedPolicy, "拒绝策略不能为空");
return this;
}
/**
* 根据当前配置创建线程池实例。
*/
public ThreadPoolExecutor build() {
validate();
ThreadFactory threadFactory = new NamedThreadFactory(poolName, daemon);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
buildQueue(),
threadFactory,
buildRejectedHandler(rejectedPolicy)
);
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
if (prestartAllCoreThreads) {
executor.prestartAllCoreThreads();
}
return executor;
}
/**
* 创建前先把参数检查一遍,避免带着非法配置往下跑。
*/
private void validate() {
if (corePoolSize <= 0) {
throw new IllegalArgumentException("核心线程数必须大于 0");
}
if (maxPoolSize <= 0) {
throw new IllegalArgumentException("最大线程数必须大于 0");
}
if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("最大线程数不能小于核心线程数");
}
if (keepAliveSeconds < 0) {
throw new IllegalArgumentException("线程存活时间不能为负数");
}
if (queueType != QueueType.SYNCHRONOUS_QUEUE && queueCapacity <= 0) {
throw new IllegalArgumentException("队列容量必须大于 0");
}
}
/**
* 按配置生成对应的阻塞队列。
*/
private BlockingQueue<Runnable> buildQueue() {
switch (queueType) {
case ARRAY_BLOCKING_QUEUE:
return new ArrayBlockingQueue<Runnable>(queueCapacity);
case SYNCHRONOUS_QUEUE:
return new SynchronousQueue<Runnable>();
case LINKED_BLOCKING_QUEUE:
default:
return new LinkedBlockingQueue<Runnable>(queueCapacity);
}
}
/**
* 把自定义拒绝策略转换成 JDK 自带的处理器。
*/
private RejectedExecutionHandler buildRejectedHandler(RejectedPolicy policy) {
switch (policy) {
case ABORT:
return new ThreadPoolExecutor.AbortPolicy();
case DISCARD:
return new ThreadPoolExecutor.DiscardPolicy();
case DISCARD_OLDEST:
return new ThreadPoolExecutor.DiscardOldestPolicy();
case CALLER_RUNS:
default:
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}
/**
* 统一给线程命名,排查问题时更容易定位。
*/
private static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadIndex = new AtomicInteger(1);
private final String poolName;
private final boolean daemon;
/**
* 记录线程名前缀和守护线程标记。
*/
private NamedThreadFactory(String poolName, boolean daemon) {
this.poolName = poolName;
this.daemon = daemon;
}
/**
* 创建带编号的新线程。
*/
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, poolName + "-" + threadIndex.getAndIncrement());
thread.setDaemon(daemon);
return thread;
}
}
}
8. ThreadPoolUtil:线程池工具类
这个类主要是把线程池的常用操作再收一层。
包括:
- 快速创建固定线程池
- 快速创建 IO 型线程池
- 批量提交任务
- 批量执行任务
- 优雅关闭线程池
里面我觉得比较实用的是两点。
第一,统一包装 Runnable 和 Callable 的异常,避免业务层重复写 try-catch。
第二,shutdownGracefully(...) 做了比较标准的优雅关闭处理:
- 先正常关闭
- 超时后再强制中断
这比随手一个 shutdownNow() 更稳一些。
package com.ruoyi.system.poolTest.util;
import com.ruoyi.system.poolTest.Enum.RejectedPolicy;
import com.ruoyi.system.poolTest.exception.ThreadTaskException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 线程池工具类,封装了常用的创建、提交和关闭操作。
*/
public final class ThreadPoolUtil {
// CPU 核数
static final int CPU_CORES = Math.max(1, Runtime.getRuntime().availableProcessors());
// 默认线程空闲超时时间
static final long DEFAULT_KEEP_ALIVE_SECONDS = 60L;
// 默认队列容量
static final int DEFAULT_QUEUE_CAPACITY = 1024;
// 默认关闭超时时间
static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 30L;
/**
* 工具类不需要创建实例。
*/
private ThreadPoolUtil() {
}
/**
* 创建固定大小的线程池,适合 CPU 密集型任务。
*/
public static ThreadPoolExecutor newFixedPool(String poolName, int threadCount) {
return newBuilder(poolName)
.corePoolSize(threadCount)
.maxPoolSize(threadCount)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.rejectionPolicy(RejectedPolicy.CALLER_RUNS)
.build();
}
/**
* 创建偏向 IO 场景的线程池。
*/
public static ThreadPoolExecutor newIoPool(String poolName) {
return newBuilder(poolName)
.corePoolSize(CPU_CORES)
.maxPoolSize(CPU_CORES * 2)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.keepAliveSeconds(DEFAULT_KEEP_ALIVE_SECONDS)
.allowCoreThreadTimeOut(true)
.rejectionPolicy(RejectedPolicy.CALLER_RUNS)
.build();
}
/**
* 返回一个新的线程池构建器,方便按需继续配置。
*/
public static ThreadPoolBuilder newBuilder(String poolName) {
return new ThreadPoolBuilder(poolName);
}
/**
* 提交无返回值任务。
*/
public static void execute(ExecutorService executor, Runnable task) {
Objects.requireNonNull(executor, "线程池不能为空");
Objects.requireNonNull(task, "任务不能为空");
executor.execute(wrapRunnable(task));
}
/**
* 提交有返回值任务。
*/
public static <T> Future<T> submit(ExecutorService executor, Callable<T> task) {
Objects.requireNonNull(executor, "线程池不能为空");
Objects.requireNonNull(task, "任务不能为空");
return executor.submit(wrapCallable(task));
}
/**
* 提交无返回值任务,并返回 Future 方便后续等待。
*/
public static Future<?> submit(ExecutorService executor, Runnable task) {
Objects.requireNonNull(executor, "线程池不能为空");
Objects.requireNonNull(task, "任务不能为空");
return executor.submit(wrapRunnable(task));
}
/**
* 批量提交任务,并返回对应的 Future 列表。
*/
public static <T> List<Future<T>> submitAll(ExecutorService executor, Collection<? extends Callable<T>> tasks) {
Objects.requireNonNull(executor, "线程池不能为空");
if (tasks == null || tasks.isEmpty()) {
return Collections.emptyList();
}
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
for (Callable<T> task : tasks) {
futures.add(submit(executor, task));
}
return futures;
}
/**
* 批量执行任务,并在超时前收集全部结果。
*/
public static <T> List<T> invokeAll(
ExecutorService executor,
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit
) throws InterruptedException, ExecutionException, TimeoutException {
Objects.requireNonNull(executor, "线程池不能为空");
Objects.requireNonNull(unit, "时间单位不能为空");
if (tasks == null || tasks.isEmpty()) {
return Collections.emptyList();
}
long timeoutNanos = unit.toNanos(timeout);
long deadline = System.nanoTime() + timeoutNanos;
ExecutorCompletionService<T> completionService = new ExecutorCompletionService<T>(executor);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
for (Callable<T> task : tasks) {
futures.add(completionService.submit(wrapCallable(task)));
}
List<T> results = new ArrayList<T>(tasks.size());
try {
for (int i = 0; i < tasks.size(); i++) {
long remain = deadline - System.nanoTime();
if (remain <= 0) {
cancelAll(futures);
throw new TimeoutException("任务执行超时");
}
Future<T> completed = completionService.poll(remain, TimeUnit.NANOSECONDS);
if (completed == null) {
cancelAll(futures);
throw new TimeoutException("任务执行超时");
}
results.add(completed.get());
}
return results;
} catch (InterruptedException ex) {
cancelAll(futures);
Thread.currentThread().interrupt();
throw ex;
} catch (ExecutionException ex) {
cancelAll(futures);
throw ex;
} catch (RuntimeException ex) {
cancelAll(futures);
throw ex;
}
}
/**
* 按默认超时时间关闭线程池。
*/
public static boolean shutdownGracefully(ExecutorService executor) {
return shutdownGracefully(executor, DEFAULT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
/**
* 先尝试正常关闭,超时后再中断剩余任务。
*/
public static boolean shutdownGracefully(ExecutorService executor, long timeout, TimeUnit unit) {
Objects.requireNonNull(unit, "时间单位不能为空");
if (executor == null) {
return true;
}
executor.shutdown();
try {
if (executor.awaitTermination(timeout, unit)) {
return true;
}
executor.shutdownNow();
return executor.awaitTermination(timeout, unit);
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
return false;
}
}
/**
* 统一取消还没结束的任务。
*/
private static <T> void cancelAll(List<Future<T>> futures) {
for (Future<T> future : futures) {
future.cancel(true);
}
}
/**
* 给 Runnable 包一层异常转换,避免原始异常被吞掉。
*/
private static Runnable wrapRunnable(final Runnable task) {
return () -> {
try {
task.run();
} catch (Throwable ex) {
throw new ThreadTaskException("线程任务执行失败", ex);
}
};
}
/**
* 给 Callable 包一层异常转换,统一异常出口。
*/
private static <T> Callable<T> wrapCallable(final Callable<T> task) {
return () -> {
try {
return task.call();
} catch (Throwable ex) {
throw new ThreadTaskException("线程任务执行失败", ex);
}
};
}
}
9. MetSmoothServiceExample:业务示例类
这个类就是整套方案的实际用法展示。
它先把多个指标的平滑计算包装成 SmoothTask:
//创建并行处理任务。
List<SmoothTask<Integer>> tasks = Arrays.asList(
SmoothTask.of("o2", () -> smoothPolynomial(data.getO2List(), 19, 1)),
SmoothTask.of("co2", () -> smoothPolynomial(data.getCo2List(), 13, 1)),
SmoothTask.of("ch4", () -> smoothPolynomial(data.getCh4List(), 13, 1)),
SmoothTask.of("nh4", () -> smoothPolynomial(data.getNh4List(), 13, 1)),
SmoothTask.of("so2", () -> smoothPolynomial(data.getSo2List(), 13, 1)),
SmoothTask.of("h2s", () -> smoothPolynomial(data.getH2sList(), 13, 1)),
SmoothTask.of("wvp", () -> smoothPolynomial(data.getWvpList(), 13, 1))
);
然后创建执行器:
//创建执行器-处理并行任务 9个线程 超时60秒
ParallelSmoothExecutor<Integer> executor =
new ParallelSmoothExecutor<Integer>("met-smooth", 9, 60, TimeUnit.SECONDS);
最后执行任务并回填结果:
//执行任务-结果
Map<String, Integer> resultMap = executor.execute(tasks);
data.setO2Result(resultMap.get("o2"));
data.setCo2Result(resultMap.get("co2"));
data.setCh4Result(resultMap.get("ch4"));
data.setNh4Result(resultMap.get("nh4"));
data.setSo2Result(resultMap.get("so2"));
data.setH2sResult(resultMap.get("h2s"));
data.setWvpResult(resultMap.get("wvp"));
return data;
整个流程其实已经很清楚了:
- 任务怎么定义
- 执行器怎么创建
- 结果怎么收
- 业务对象怎么回填
package com.ruoyi.system.poolTest;
import com.ruoyi.system.poolTest.executor.ParallelSmoothExecutor;
import com.ruoyi.system.poolTest.executor.SmoothTask;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 并行平滑处理示例。
*/
public class MetSmoothServiceExample {
/**
* 并行处理各项指标的平滑计算。
*/
public MetSample smooth(MetSample data) {
if (data == null || data.getTimeIndex() == null || data.getTimeIndex().isEmpty()) {
return data;
}
//创建并行处理任务。
List<SmoothTask<Integer>> tasks = Arrays.asList(
SmoothTask.of("o2", () -> smoothPolynomial(data.getO2List(), 19, 1)),
SmoothTask.of("co2", () -> smoothPolynomial(data.getCo2List(), 13, 1)),
SmoothTask.of("ch4", () -> smoothPolynomial(data.getCh4List(), 13, 1)),
SmoothTask.of("nh4", () -> smoothPolynomial(data.getNh4List(), 13, 1)),
SmoothTask.of("so2", () -> smoothPolynomial(data.getSo2List(), 13, 1)),
SmoothTask.of("h2s", () -> smoothPolynomial(data.getH2sList(), 13, 1)),
SmoothTask.of("wvp", () -> smoothPolynomial(data.getWvpList(), 13, 1))
);
//创建执行器-处理并行任务 9个线程 超时60秒
ParallelSmoothExecutor<Integer> executor =
new ParallelSmoothExecutor<Integer>("met-smooth", 9, 60, TimeUnit.SECONDS);
//执行任务-结果
Map<String, Integer> resultMap = executor.execute(tasks);
data.setO2Result(resultMap.get("o2"));
data.setCo2Result(resultMap.get("co2"));
data.setCh4Result(resultMap.get("ch4"));
data.setNh4Result(resultMap.get("nh4"));
data.setSo2Result(resultMap.get("so2"));
data.setH2sResult(resultMap.get("h2s"));
data.setWvpResult(resultMap.get("wvp"));
return data;
}
/**
* 具体方法
*/
private Integer smoothPolynomial(DoubleList arrayList, int width, int repeat) {
if (arrayList == null || arrayList.isEmpty()) {
return 0;
}
return 1;
}
/**
* MetSample对象
*/
public static class MetSample {
private List<String> timeIndex;
private DoubleList o2List;
private DoubleList co2List;
private DoubleList ch4List;
private DoubleList nh4List;
private DoubleList so2List;
private DoubleList h2sList;
private DoubleList wvpList;
private Integer o2Result;
private Integer co2Result;
private Integer ch4Result;
private Integer nh4Result;
private Integer so2Result;
private Integer h2sResult;
private Integer wvpResult;
/**
* 返回时间轴。
*/
public List<String> getTimeIndex() {
return timeIndex;
}
/**
* 设置时间轴。
*/
public void setTimeIndex(List<String> timeIndex) {
this.timeIndex = timeIndex;
}
/**
* 返回 O2 原始序列。
*/
public DoubleList getO2List() {
return o2List;
}
/**
* 设置 O2 原始序列。
*/
public void setO2List(DoubleList o2List) {
this.o2List = o2List;
}
/**
* 返回 CO2 原始序列。
*/
public DoubleList getCo2List() {
return co2List;
}
/**
* 设置 CO2 原始序列。
*/
public void setCo2List(DoubleList co2List) {
this.co2List = co2List;
}
/**
* 返回 CH4 原始序列。
*/
public DoubleList getCh4List() {
return ch4List;
}
/**
* 设置 CH4 原始序列。
*/
public void setCh4List(DoubleList ch4List) {
this.ch4List = ch4List;
}
/**
* 返回 NH4 原始序列。
*/
public DoubleList getNh4List() {
return nh4List;
}
/**
* 设置 NH4 原始序列。
*/
public void setNh4List(DoubleList nh4List) {
this.nh4List = nh4List;
}
/**
* 返回 SO2 原始序列。
*/
public DoubleList getSo2List() {
return so2List;
}
/**
* 设置 SO2 原始序列。
*/
public void setSo2List(DoubleList so2List) {
this.so2List = so2List;
}
/**
* 返回 H2S 原始序列。
*/
public DoubleList getH2sList() {
return h2sList;
}
/**
* 设置 H2S 原始序列。
*/
public void setH2sList(DoubleList h2sList) {
this.h2sList = h2sList;
}
/**
* 返回 WVP 原始序列。
*/
public DoubleList getWvpList() {
return wvpList;
}
/**
* 设置 WVP 原始序列。
*/
public void setWvpList(DoubleList wvpList) {
this.wvpList = wvpList;
}
/**
* 返回 O2 平滑结果。
*/
public Integer getO2Result() {
return o2Result;
}
/**
* 设置 O2 平滑结果。
*/
public void setO2Result(Integer o2Result) {
this.o2Result = o2Result;
}
/**
* 返回 CO2 平滑结果。
*/
public Integer getCo2Result() {
return co2Result;
}
/**
* 设置 CO2 平滑结果。
*/
public void setCo2Result(Integer co2Result) {
this.co2Result = co2Result;
}
/**
* 返回 CH4 平滑结果。
*/
public Integer getCh4Result() {
return ch4Result;
}
/**
* 设置 CH4 平滑结果。
*/
public void setCh4Result(Integer ch4Result) {
this.ch4Result = ch4Result;
}
/**
* 返回 NH4 平滑结果。
*/
public Integer getNh4Result() {
return nh4Result;
}
/**
* 设置 NH4 平滑结果。
*/
public void setNh4Result(Integer nh4Result) {
this.nh4Result = nh4Result;
}
/**
* 返回 SO2 平滑结果。
*/
public Integer getSo2Result() {
return so2Result;
}
/**
* 设置 SO2 平滑结果。
*/
public void setSo2Result(Integer so2Result) {
this.so2Result = so2Result;
}
/**
* 返回 H2S 平滑结果。
*/
public Integer getH2sResult() {
return h2sResult;
}
/**
* 设置 H2S 平滑结果。
*/
public void setH2sResult(Integer h2sResult) {
this.h2sResult = h2sResult;
}
/**
* 返回 WVP 平滑结果。
*/
public Integer getWvpResult() {
return wvpResult;
}
/**
* 设置 WVP 平滑结果。
*/
public void setWvpResult(Integer wvpResult) {
this.wvpResult = wvpResult;
}
}
/**
* 简单的 double 列表包装,方便示例里传参。
*/
public static class DoubleList {
private final List<Double> values = new ArrayList<Double>();
/**
* 用一组 double 快速创建列表。
*/
public static DoubleList of(double... numbers) {
DoubleList list = new DoubleList();
if (numbers != null) {
for (double number : numbers) {
list.add(number);
}
}
return list;
}
/**
* 追加一个值。
*/
public void add(double value) {
values.add(value);
}
/**
* 读取指定位置的值。
*/
public double get(int index) {
return values.get(index);
}
/**
* 返回列表长度。
*/
public int size() {
return values.size();
}
/**
* 判断列表是否为空。
*/
public boolean isEmpty() {
return values.isEmpty();
}
/**
* 复制一份当前列表。
*/
public DoubleList copy() {
DoubleList copy = new DoubleList();
copy.values.addAll(this.values);
return copy;
}
/**
* 直接按列表格式输出。
*/
@Override
public String toString() {
return values.toString();
}
}
}
四、这套封装的价值
我个人觉得,这种封装在项目里主要有下面几个好处。
1. 业务代码更清爽
调用方不用反复写线程池提交、结果等待和异常处理。
2. 更适合多个独立任务并发执行
像多个指标计算、多个接口聚合、多个统计任务并发跑,这类场景都很适合。
3. 异常更统一
单任务异常和整批执行异常都做了统一包装,排查问题时思路会更清楚。
4. 线程池可配置
队列、容量、拒绝策略、线程数这些都能按业务情况调,不是写死的。
5. 后续扩展空间比较大
比如后面要加:
- 任务耗时统计
- 失败降级
- 默认值兜底
- TraceId 透传
- 监控埋点
都可以继续往这套结构上加。
五、使用时要注意的地方
虽然这种封装很好用,但也不是所有场景都适合直接套。
1. 有依赖关系的任务不适合硬并发
如果任务 B 一定要等任务 A 的结果,那就不适合简单拆成并行任务。
2. 线程池参数别盲目放大
线程数不是越大越好,还是要结合任务类型来调。
- CPU 型任务,线程太多反而会增加切换开销
- IO 型任务,可以适当多一些,但也不能无限放大
3. 当前执行器更适合“一次执行一批任务”
因为 execute(...) 结束后会关闭线程池,所以它更适合这种模式:
- 创建执行器
- 执行一批任务
- 收结果
- 关闭资源
如果你的系统里这个逻辑会被高频调用,那后面可以考虑把线程池抽出来统一复用。
4. 超时时间要结合业务来定
示例里用了 60 秒,这只是当前业务的一种配置,不一定适合所有场景。
六、总结
这套代码的核心,其实不是“写了一个线程池工具类”,而是把并行执行过程中最容易重复、最容易出错的部分统一封装起来了。
它做了几件很关键的事:
- 统一任务抽象
- 统一线程池构建
- 统一异常处理
- 统一超时取消
- 统一结果收集
- 统一资源关闭
放到业务里看,这种设计最大的好处就是:
业务层只需要定义任务,底层组件负责把这批任务平稳地并行执行完。
如果你的项目里也有“多个独立任务并行执行,最后统一返回结果”的需求,这套思路还是挺值得参考的。
更多推荐



所有评论(0)