前言

平时做业务开发的时候,经常会遇到这样一种场景:

一个接口里要同时处理多个任务,或者同时调用多个下游接口,等它们都执行完之后,再把结果统一组装返回。

这种场景如果全都串行执行,接口耗时通常会比较长。尤其是多个任务彼此之间没有依赖关系的时候,串行就会显得很浪费。

所以比较自然的思路就是:把独立任务并行执行,最后再统一收结果。

不过问题也来了,如果每次都手写线程池、提交任务、拿 Future、处理超时、处理异常、关闭线程池,代码很快就会变得很乱。

所以我在项目里把这一套逻辑单独封装了一层,做成了一个可以复用的并行执行组件。

这篇文章就结合 poolTest 下面的真实代码,聊一下这套方案是怎么设计的。


一、业务场景

这套代码对应的场景,其实挺典型的:

需要同时并行处理多个任务,最后统一返回结果。

在示例类 MetSmoothServiceExample 里,处理的不是普通的“查用户、查订单”,而是更贴近当前业务的一个例子:

对多个指标同时做平滑计算,比如:

  • o2
  • co2
  • ch4
  • nh4
  • so2
  • h2s
  • wvp

这些指标之间互不依赖,所以非常适合并行执行。

如果按串行方式一个一个算,总耗时就是每个指标计算时间累加;但如果改成并行,总耗时通常就取决于最慢的那个任务。

这也是并行处理最直接的价值。


二、整体思路

我这里的做法并不复杂,核心就是把并行执行过程中那些重复代码统一收起来。

整体分成 4 步:

  1. 先把每个要执行的任务封装成统一对象
  2. 交给并行执行器统一提交到线程池
  3. 等所有任务执行完之后收集结果
  4. 如果中间有异常、超时或中断,就统一处理

这样一来,业务层就只需要关心一件事:

我要并发执行哪些任务。

至于线程池怎么建、任务怎么提、异常怎么包、超时怎么控,这些都交给底层组件处理。


三、代码结构说明

这套组件主要由下面几个类组成:

  • QueueType
  • RejectedPolicy
  • ParallelExecutionException
  • ThreadTaskException
  • SmoothTask
  • ParallelSmoothExecutor
  • ThreadPoolBuilder
  • ThreadPoolUtil
  • MetSmoothServiceExample

下面按顺序简单说一下每个类是干什么的。

1. QueueType:定义线程池队列类型

这个枚举类主要是把线程池用什么阻塞队列抽出来了,当前支持:

  • LINKED_BLOCKING_QUEUE
  • ARRAY_BLOCKING_QUEUE
  • SYNCHRONOUS_QUEUE

这么做的好处是,线程池的行为就不是写死的,而是可以根据业务场景调整。

比如:

  • 想让任务先排队,可以用阻塞队列
  • 想严格限制堆积量,可以用固定容量队列
  • 想直接交给线程处理,不做缓存,也可以单独配

示例:

package com.ruoyi.system.poolTest.Enum;

/**
 * 线程池可选的队列类型。
 */
public enum QueueType {

    /**
     * 默认队列,任务先入队,再等空闲线程处理。
     */
    LINKED_BLOCKING_QUEUE,

    /**
     * 固定容量队列,适合希望控制积压量的场景。
     */
    ARRAY_BLOCKING_QUEUE,

    /**
     * 不排队,任务提交后直接交给线程处理。
     */
    SYNCHRONOUS_QUEUE
}

2. RejectedPolicy:定义线程池拒绝策略

线程池满了以后,后续任务怎么处理,这块也很关键。

这里统一封装了 4 种拒绝策略:

  • ABORT
  • CALLER_RUNS
  • DISCARD
  • DISCARD_OLDEST

当前这套代码默认用的是 CALLER_RUNS,也就是任务不直接丢掉,而是回退到提交任务的线程自己执行。

这种策略在很多业务场景里都比较稳,因为它至少不会悄悄吞任务。

package com.ruoyi.system.poolTest.Enum;

/**
 * 线程池任务被拒绝时的处理方式。
 */
public enum RejectedPolicy {

    /**
     * 直接拒绝,并抛出异常。
     */
    ABORT,

    /**
     * 由提交任务的线程自己执行。
     */
    CALLER_RUNS,

    /**
     * 直接丢掉当前任务。
     */
    DISCARD,

    /**
     * 丢掉队列里最早的任务,再尝试提交当前任务。
     */
    DISCARD_OLDEST
}

3. ParallelExecutionException:并行执行统一异常

这个异常类主要负责兜住“整批任务执行过程中的异常”。

比如:

  • 某个任务执行失败
  • 某个任务执行超时
  • 等待结果时线程被中断

最终都会包装成 ParallelExecutionException 往外抛。

这样业务层就不用关心底层到底抛了多少种并发异常,统一接这一种就够了。

package com.ruoyi.system.poolTest.exception;

/**
 * 并行执行过程中抛出的统一异常。
 * @author 吴顺杰
 */
public class ParallelExecutionException extends RuntimeException{

    /**
     * 带上原始异常一起抛出,方便排查。
     */
    public ParallelExecutionException(String message, Throwable cause) {
        super(message, cause);
    }
}

4. ThreadTaskException:单任务异常包装

这个异常类是更细一层的封装,它针对的是“单个线程任务执行失败”。

在 ThreadPoolUtil 里,不管你提交的是 Runnable 还是 Callable,最终都会先包一层,如果任务内部抛异常,就统一转成 ThreadTaskException。

这样做主要有两个好处:

  • 异常语义统一
  • 不容易把线程里的原始异常漏掉
package com.ruoyi.system.poolTest.exception;

/**
 * 线程任务执行时抛出的运行时异常。
 */
public class ThreadTaskException extends RuntimeException {

    /**
     * 用原始异常包装线程任务失败原因。
     */
    public ThreadTaskException(String message, Throwable cause) {
        super(message, cause);
    }
}

5. SmoothTask:任务抽象

这个类我觉得是整个设计里很实用的一层。

它把一个任务统一抽象成两部分:

  • 任务名
  • 执行逻辑

定义方式也很简单:

SmoothTask.of("o2", () -> smoothPolynomial(data.getO2List(), 19, 1))

相比直接传一堆匿名线程任务,这种写法的好处很明显:

  • 每个任务都有名字,结果更好对应
  • 代码可读性更好
  • 后面如果要扩展任务属性,也更方便

尤其这套代码最终返回的是 Map<String, T>,所以任务名不只是“好看”,它本身就是结果映射的 key。

package com.ruoyi.system.poolTest.executor;

import java.util.Objects;
import java.util.concurrent.Callable;

/**
 * 单个平滑任务,包含任务名和执行逻辑。
 */
public final class SmoothTask<T> {

    private final String name;
    private final Callable<T> action;

    /**
     * 保存任务名和任务逻辑。
     */
    private SmoothTask(String name, Callable<T> action) {
        this.name = Objects.requireNonNull(name, "任务名称不能为空");
        this.action = Objects.requireNonNull(action, "任务执行逻辑不能为空");
    }

    /**
     * 创建一个新的平滑任务。
     */
    public static <T> SmoothTask<T> of(String name, Callable<T> action) {
        return new SmoothTask<T>(name, action);
    }

    /**
     * 返回任务名。
     */
    public String getName() {
        return name;
    }

    /**
     * 运行当前任务。
     */
    public T execute() throws Exception {
        return action.call();
    }
}

6. ParallelSmoothExecutor:并行执行核心类

这个类就是整套方案的核心。

它主要干了几件事:

  • 校验任务是否合法
  • 校验任务名是否为空、是否重复
  • 把所有任务提交到线程池
  • 按任务名收集结果
  • 如果有任务失败、超时或中断,就取消整批任务
  • 最后统一关闭线程池

这里有几个细节挺值得说一下。

第一,任务结果是按 LinkedHashMap 收集的,这样结果顺序和任务定义顺序一致,排查问题的时候会更舒服。

第二,任务名做了重复校验,这样可以避免两个任务用了同一个名字,最后结果被覆盖。

第三,只要中间有一个任务出问题,就会触发 cancelAll(...),把剩余任务取消掉,避免继续空耗资源。

第四,执行结束后会在 finally 里关闭线程池,这样不会留下线程资源泄漏问题。

如果用一句话概括这个类的作用,可以这么说:

它负责把一批任务平稳地并行跑完,并且把异常、超时、取消和关闭这些事都处理掉。

package com.ruoyi.system.poolTest.executor;

import com.ruoyi.system.poolTest.Enum.RejectedPolicy;
import com.ruoyi.system.poolTest.exception.ParallelExecutionException;
import com.ruoyi.system.poolTest.util.ThreadPoolUtil;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 并行执行多个平滑任务,并按任务名收集结果。
 */
public final class ParallelSmoothExecutor<T> {

    private final ThreadPoolExecutor executor;
    private final long timeout;
    private final TimeUnit timeoutUnit;

    /**
     * 按线程数直接创建一个执行器。
     */
    public ParallelSmoothExecutor(String poolName, int threadCount, long timeout, TimeUnit timeoutUnit) {
        this(
                ThreadPoolUtil.newBuilder(poolName)
                        .corePoolSize(threadCount)
                        .maxPoolSize(threadCount)
                        .queueCapacity(Math.max(threadCount * 4, 16))
                        .rejectionPolicy(RejectedPolicy.CALLER_RUNS)
                        .build(),
                timeout,
                timeoutUnit
        );
    }

    /**
     * 使用现成线程池创建执行器。
     * 调用 execute 后会一并关闭这个线程池。
     */
    public ParallelSmoothExecutor(ThreadPoolExecutor executor, long timeout, TimeUnit timeoutUnit) {
        this.executor = Objects.requireNonNull(executor, "线程池不能为空");
        this.timeout = timeout;
        this.timeoutUnit = Objects.requireNonNull(timeoutUnit, "超时单位不能为空");
    }

    /**
     * 并行跑完所有任务,并把结果按任务名装回 Map。
     */
    public Map<String, T> execute(List<SmoothTask<T>> tasks) {
        if (tasks == null || tasks.isEmpty()) {
            return Collections.emptyMap();
        }

        validateTaskNames(tasks);

        Map<String, Future<T>> futureMap = new LinkedHashMap<String, Future<T>>(tasks.size());
        try {
            for (SmoothTask<T> task : tasks) {
                futureMap.put(task.getName(), ThreadPoolUtil.submit(executor, task::execute));
            }

            Map<String, T> resultMap = new LinkedHashMap<String, T>(tasks.size());
            for (Map.Entry<String, Future<T>> entry : futureMap.entrySet()) {
                try {
                    resultMap.put(entry.getKey(), entry.getValue().get(timeout, timeoutUnit));
                } catch (InterruptedException ex) {
                    cancelAll(futureMap);
                    Thread.currentThread().interrupt();
                    throw new ParallelExecutionException("并行平滑任务被中断", ex);
                } catch (ExecutionException ex) {
                    cancelAll(futureMap);
                    throw new ParallelExecutionException("并行平滑任务执行失败,任务名:" + entry.getKey(), ex.getCause());
                } catch (TimeoutException ex) {
                    cancelAll(futureMap);
                    throw new ParallelExecutionException("并行平滑任务执行超时,任务名:" + entry.getKey(), ex);
                }
            }
            return resultMap;
        } finally {
            ThreadPoolUtil.shutdownGracefully(executor);
        }
    }

    /**
     * 检查任务对象和任务名,顺手挡掉空值和重名。
     */
    private void validateTaskNames(List<SmoothTask<T>> tasks) {
        List<String> names = new ArrayList<String>(tasks.size());
        for (SmoothTask<T> task : tasks) {
            if (task == null) {
                throw new IllegalArgumentException("平滑任务不能为空");
            }
            String name = task.getName();
            if (name == null || name.trim().isEmpty()) {
                throw new IllegalArgumentException("平滑任务名称不能为空");
            }
            if (names.contains(name)) {
                throw new IllegalArgumentException("平滑任务名称重复:" + name);
            }
            names.add(name);
        }
    }

    /**
     * 出错时取消所有还没结束的任务。
     */
    private void cancelAll(Map<String, Future<T>> futureMap) {
        for (Future<T> future : futureMap.values()) {
            future.cancel(true);
        }
    }
}

7. ThreadPoolBuilder:线程池构建器

如果直接手写 new ThreadPoolExecutor(...),参数一多,可读性会很差。

所以这里又单独封装了一个 ThreadPoolBuilder,支持链式设置参数,比如:

  • 核心线程数
  • 最大线程数
  • 存活时间
  • 队列类型
  • 队列容量
  • 拒绝策略
  • 是否守护线程
  • 是否允许核心线程超时
  • 是否预热线程

这种方式最大的好处就是清楚。

尤其在线程池这种参数稍微多一点就容易写乱的地方,Builder 模式会舒服很多。

另外它还做了参数校验和线程命名,比如线程名会长成:

met-smooth-1 met-smooth-2 met-smooth-3

线上排查问题的时候,这种命名非常有帮助。

package com.ruoyi.system.poolTest.util;

import com.ruoyi.system.poolTest.Enum.QueueType;
import com.ruoyi.system.poolTest.Enum.RejectedPolicy;

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池构建器,用来统一组装线程池参数。
 */
public class ThreadPoolBuilder {

    private final String poolName;
    private int corePoolSize = ThreadPoolUtil.CPU_CORES;
    private int maxPoolSize = ThreadPoolUtil.CPU_CORES;
    private long keepAliveSeconds = ThreadPoolUtil.DEFAULT_KEEP_ALIVE_SECONDS;
    private boolean daemon = false;
    private boolean allowCoreThreadTimeOut = false;
    private boolean prestartAllCoreThreads = true;
    private int queueCapacity = ThreadPoolUtil.DEFAULT_QUEUE_CAPACITY;
    private QueueType queueType = QueueType.LINKED_BLOCKING_QUEUE;
    private RejectedPolicy rejectedPolicy = RejectedPolicy.CALLER_RUNS;

    /**
     * 指定线程池名称,后面创建线程时会用到这个前缀。
     */
    public ThreadPoolBuilder(String poolName) {
        if (poolName == null || poolName.trim().isEmpty()) {
            throw new IllegalArgumentException("线程池名称不能为空");
        }
        this.poolName = poolName.trim();
    }

    /**
     * 设置核心线程数。
     */
    public ThreadPoolBuilder corePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
        return this;
    }

    /**
     * 设置最大线程数。
     */
    public ThreadPoolBuilder maxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
        return this;
    }

    /**
     * 设置非核心线程空闲后的存活时间,单位为秒。
     */
    public ThreadPoolBuilder keepAliveSeconds(long keepAliveSeconds) {
        this.keepAliveSeconds = keepAliveSeconds;
        return this;
    }



    /**
     * 设置核心线程是否允许超时回收。
     */
    public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
        return this;
    }

    /**
     * 设置创建完成后是否预热全部核心线程。
     */
    public ThreadPoolBuilder prestartAllCoreThreads(boolean prestartAllCoreThreads) {
        this.prestartAllCoreThreads = prestartAllCoreThreads;
        return this;
    }


    /**
     * 设置任务队列类型。
     */
    public ThreadPoolBuilder queueType(QueueType queueType) {
        this.queueType = Objects.requireNonNull(queueType, "队列类型不能为空");
        return this;
    }

    /**
    /**
     * 设置线程是否按守护线程创建。
     */
    public ThreadPoolBuilder daemon(boolean daemon) {
        this.daemon = daemon;
        return this;
    }

    /**
     * 设置任务队列容量。
     */
    public ThreadPoolBuilder queueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
        return this;
    }


    /**
     * 设置任务拒绝策略。
     */
    public ThreadPoolBuilder rejectionPolicy(RejectedPolicy rejectedPolicy) {
        this.rejectedPolicy = Objects.requireNonNull(rejectedPolicy, "拒绝策略不能为空");
        return this;
    }

    /**
     * 根据当前配置创建线程池实例。
     */
    public ThreadPoolExecutor build() {
        validate();

        ThreadFactory threadFactory = new NamedThreadFactory(poolName, daemon);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveSeconds,
                TimeUnit.SECONDS,
                buildQueue(),
                threadFactory,
                buildRejectedHandler(rejectedPolicy)
        );

        executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
        if (prestartAllCoreThreads) {
            executor.prestartAllCoreThreads();
        }
        return executor;
    }

    /**
     * 创建前先把参数检查一遍,避免带着非法配置往下跑。
     */
    private void validate() {
        if (corePoolSize <= 0) {
            throw new IllegalArgumentException("核心线程数必须大于 0");
        }
        if (maxPoolSize <= 0) {
            throw new IllegalArgumentException("最大线程数必须大于 0");
        }
        if (maxPoolSize < corePoolSize) {
            throw new IllegalArgumentException("最大线程数不能小于核心线程数");
        }
        if (keepAliveSeconds < 0) {
            throw new IllegalArgumentException("线程存活时间不能为负数");
        }
        if (queueType != QueueType.SYNCHRONOUS_QUEUE && queueCapacity <= 0) {
            throw new IllegalArgumentException("队列容量必须大于 0");
        }
    }

    /**
     * 按配置生成对应的阻塞队列。
     */
    private BlockingQueue<Runnable> buildQueue() {
        switch (queueType) {
            case ARRAY_BLOCKING_QUEUE:
                return new ArrayBlockingQueue<Runnable>(queueCapacity);
            case SYNCHRONOUS_QUEUE:
                return new SynchronousQueue<Runnable>();
            case LINKED_BLOCKING_QUEUE:
            default:
                return new LinkedBlockingQueue<Runnable>(queueCapacity);
        }
    }

    /**
     * 把自定义拒绝策略转换成 JDK 自带的处理器。
     */
    private RejectedExecutionHandler buildRejectedHandler(RejectedPolicy policy) {
        switch (policy) {
            case ABORT:
                return new ThreadPoolExecutor.AbortPolicy();
            case DISCARD:
                return new ThreadPoolExecutor.DiscardPolicy();
            case DISCARD_OLDEST:
                return new ThreadPoolExecutor.DiscardOldestPolicy();
            case CALLER_RUNS:
            default:
                return new ThreadPoolExecutor.CallerRunsPolicy();
        }
    }

    /**
     * 统一给线程命名,排查问题时更容易定位。
     */
    private static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadIndex = new AtomicInteger(1);
        private final String poolName;
        private final boolean daemon;

        /**
         * 记录线程名前缀和守护线程标记。
         */
        private NamedThreadFactory(String poolName, boolean daemon) {
            this.poolName = poolName;
            this.daemon = daemon;
        }

        /**
         * 创建带编号的新线程。
         */
        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, poolName + "-" + threadIndex.getAndIncrement());
            thread.setDaemon(daemon);
            return thread;
        }
    }
}

8. ThreadPoolUtil:线程池工具类

这个类主要是把线程池的常用操作再收一层。

包括:

  • 快速创建固定线程池
  • 快速创建 IO 型线程池
  • 批量提交任务
  • 批量执行任务
  • 优雅关闭线程池

里面我觉得比较实用的是两点。

第一,统一包装 Runnable 和 Callable 的异常,避免业务层重复写 try-catch。

第二,shutdownGracefully(...) 做了比较标准的优雅关闭处理:

  • 先正常关闭
  • 超时后再强制中断

这比随手一个 shutdownNow() 更稳一些。

package com.ruoyi.system.poolTest.util;

import com.ruoyi.system.poolTest.Enum.RejectedPolicy;
import com.ruoyi.system.poolTest.exception.ThreadTaskException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 线程池工具类,封装了常用的创建、提交和关闭操作。
 */
public final class ThreadPoolUtil {

    // CPU 核数
    static final int CPU_CORES = Math.max(1, Runtime.getRuntime().availableProcessors());
    // 默认线程空闲超时时间
    static final long DEFAULT_KEEP_ALIVE_SECONDS = 60L;
    // 默认队列容量
    static final int DEFAULT_QUEUE_CAPACITY = 1024;
    // 默认关闭超时时间
    static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 30L;

    /**
     * 工具类不需要创建实例。
     */
    private ThreadPoolUtil() {
    }

    /**
     * 创建固定大小的线程池,适合 CPU 密集型任务。
     */
    public static ThreadPoolExecutor newFixedPool(String poolName, int threadCount) {
        return newBuilder(poolName)
                .corePoolSize(threadCount)
                .maxPoolSize(threadCount)
                .queueCapacity(DEFAULT_QUEUE_CAPACITY)
                .rejectionPolicy(RejectedPolicy.CALLER_RUNS)
                .build();
    }

    /**
     * 创建偏向 IO 场景的线程池。
     */
    public static ThreadPoolExecutor newIoPool(String poolName) {
        return newBuilder(poolName)
                .corePoolSize(CPU_CORES)
                .maxPoolSize(CPU_CORES * 2)
                .queueCapacity(DEFAULT_QUEUE_CAPACITY)
                .keepAliveSeconds(DEFAULT_KEEP_ALIVE_SECONDS)
                .allowCoreThreadTimeOut(true)
                .rejectionPolicy(RejectedPolicy.CALLER_RUNS)
                .build();
    }

    /**
     * 返回一个新的线程池构建器,方便按需继续配置。
     */
    public static ThreadPoolBuilder newBuilder(String poolName) {
        return new ThreadPoolBuilder(poolName);
    }

    /**
     * 提交无返回值任务。
     */
    public static void execute(ExecutorService executor, Runnable task) {
        Objects.requireNonNull(executor, "线程池不能为空");
        Objects.requireNonNull(task, "任务不能为空");
        executor.execute(wrapRunnable(task));
    }

    /**
     * 提交有返回值任务。
     */
    public static <T> Future<T> submit(ExecutorService executor, Callable<T> task) {
        Objects.requireNonNull(executor, "线程池不能为空");
        Objects.requireNonNull(task, "任务不能为空");
        return executor.submit(wrapCallable(task));
    }

    /**
     * 提交无返回值任务,并返回 Future 方便后续等待。
     */
    public static Future<?> submit(ExecutorService executor, Runnable task) {
        Objects.requireNonNull(executor, "线程池不能为空");
        Objects.requireNonNull(task, "任务不能为空");
        return executor.submit(wrapRunnable(task));
    }

    /**
     * 批量提交任务,并返回对应的 Future 列表。
     */
    public static <T> List<Future<T>> submitAll(ExecutorService executor, Collection<? extends Callable<T>> tasks) {
        Objects.requireNonNull(executor, "线程池不能为空");
        if (tasks == null || tasks.isEmpty()) {
            return Collections.emptyList();
        }

        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        for (Callable<T> task : tasks) {
            futures.add(submit(executor, task));
        }
        return futures;
    }

    /**
     * 批量执行任务,并在超时前收集全部结果。
     */
    public static <T> List<T> invokeAll(
            ExecutorService executor,
            Collection<? extends Callable<T>> tasks,
            long timeout,
            TimeUnit unit
    ) throws InterruptedException, ExecutionException, TimeoutException {
        Objects.requireNonNull(executor, "线程池不能为空");
        Objects.requireNonNull(unit, "时间单位不能为空");

        if (tasks == null || tasks.isEmpty()) {
            return Collections.emptyList();
        }

        long timeoutNanos = unit.toNanos(timeout);
        long deadline = System.nanoTime() + timeoutNanos;
        ExecutorCompletionService<T> completionService = new ExecutorCompletionService<T>(executor);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

        for (Callable<T> task : tasks) {
            futures.add(completionService.submit(wrapCallable(task)));
        }

        List<T> results = new ArrayList<T>(tasks.size());
        try {
            for (int i = 0; i < tasks.size(); i++) {
                long remain = deadline - System.nanoTime();
                if (remain <= 0) {
                    cancelAll(futures);
                    throw new TimeoutException("任务执行超时");
                }

                Future<T> completed = completionService.poll(remain, TimeUnit.NANOSECONDS);
                if (completed == null) {
                    cancelAll(futures);
                    throw new TimeoutException("任务执行超时");
                }
                results.add(completed.get());
            }
            return results;
        } catch (InterruptedException ex) {
            cancelAll(futures);
            Thread.currentThread().interrupt();
            throw ex;
        } catch (ExecutionException ex) {
            cancelAll(futures);
            throw ex;
        } catch (RuntimeException ex) {
            cancelAll(futures);
            throw ex;
        }
    }

    /**
     * 按默认超时时间关闭线程池。
     */
    public static boolean shutdownGracefully(ExecutorService executor) {
        return shutdownGracefully(executor, DEFAULT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
    }

    /**
     * 先尝试正常关闭,超时后再中断剩余任务。
     */
    public static boolean shutdownGracefully(ExecutorService executor, long timeout, TimeUnit unit) {
        Objects.requireNonNull(unit, "时间单位不能为空");
        if (executor == null) {
            return true;
        }

        executor.shutdown();
        try {
            if (executor.awaitTermination(timeout, unit)) {
                return true;
            }

            executor.shutdownNow();
            return executor.awaitTermination(timeout, unit);
        } catch (InterruptedException ex) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /**
     * 统一取消还没结束的任务。
     */
    private static <T> void cancelAll(List<Future<T>> futures) {
        for (Future<T> future : futures) {
            future.cancel(true);
        }
    }

    /**
     * 给 Runnable 包一层异常转换,避免原始异常被吞掉。
     */
    private static Runnable wrapRunnable(final Runnable task) {
        return () -> {
            try {
                task.run();
            } catch (Throwable ex) {
                throw new ThreadTaskException("线程任务执行失败", ex);
            }
        };
    }

    /**
     * 给 Callable 包一层异常转换,统一异常出口。
     */
    private static <T> Callable<T> wrapCallable(final Callable<T> task) {
        return () -> {
            try {
                return task.call();
            } catch (Throwable ex) {
                throw new ThreadTaskException("线程任务执行失败", ex);
            }
        };
    }
}

9. MetSmoothServiceExample:业务示例类

这个类就是整套方案的实际用法展示。

它先把多个指标的平滑计算包装成 SmoothTask:

        //创建并行处理任务。
        List<SmoothTask<Integer>> tasks = Arrays.asList(
                SmoothTask.of("o2", () -> smoothPolynomial(data.getO2List(), 19, 1)),
                SmoothTask.of("co2", () -> smoothPolynomial(data.getCo2List(), 13, 1)),
                SmoothTask.of("ch4", () -> smoothPolynomial(data.getCh4List(), 13, 1)),
                SmoothTask.of("nh4", () -> smoothPolynomial(data.getNh4List(), 13, 1)),
                SmoothTask.of("so2", () -> smoothPolynomial(data.getSo2List(), 13, 1)),
                SmoothTask.of("h2s", () -> smoothPolynomial(data.getH2sList(), 13, 1)),
                SmoothTask.of("wvp", () -> smoothPolynomial(data.getWvpList(), 13, 1))
        );

然后创建执行器:


        //创建执行器-处理并行任务 9个线程  超时60秒
        ParallelSmoothExecutor<Integer> executor =
                new ParallelSmoothExecutor<Integer>("met-smooth", 9, 60, TimeUnit.SECONDS);

最后执行任务并回填结果:

      //执行任务-结果
        Map<String, Integer> resultMap = executor.execute(tasks);
        data.setO2Result(resultMap.get("o2"));
        data.setCo2Result(resultMap.get("co2"));
        data.setCh4Result(resultMap.get("ch4"));
        data.setNh4Result(resultMap.get("nh4"));
        data.setSo2Result(resultMap.get("so2"));
        data.setH2sResult(resultMap.get("h2s"));
        data.setWvpResult(resultMap.get("wvp"));
        return data;

整个流程其实已经很清楚了:

  • 任务怎么定义
  • 执行器怎么创建
  • 结果怎么收
  • 业务对象怎么回填
package com.ruoyi.system.poolTest;

import com.ruoyi.system.poolTest.executor.ParallelSmoothExecutor;
import com.ruoyi.system.poolTest.executor.SmoothTask;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 并行平滑处理示例。
 */
public class MetSmoothServiceExample {

    /**
     * 并行处理各项指标的平滑计算。
     */
    public MetSample smooth(MetSample data) {
        if (data == null || data.getTimeIndex() == null || data.getTimeIndex().isEmpty()) {
            return data;
        }

        //创建并行处理任务。
        List<SmoothTask<Integer>> tasks = Arrays.asList(
                SmoothTask.of("o2", () -> smoothPolynomial(data.getO2List(), 19, 1)),
                SmoothTask.of("co2", () -> smoothPolynomial(data.getCo2List(), 13, 1)),
                SmoothTask.of("ch4", () -> smoothPolynomial(data.getCh4List(), 13, 1)),
                SmoothTask.of("nh4", () -> smoothPolynomial(data.getNh4List(), 13, 1)),
                SmoothTask.of("so2", () -> smoothPolynomial(data.getSo2List(), 13, 1)),
                SmoothTask.of("h2s", () -> smoothPolynomial(data.getH2sList(), 13, 1)),
                SmoothTask.of("wvp", () -> smoothPolynomial(data.getWvpList(), 13, 1))
        );

        //创建执行器-处理并行任务 9个线程  超时60秒
        ParallelSmoothExecutor<Integer> executor =
                new ParallelSmoothExecutor<Integer>("met-smooth", 9, 60, TimeUnit.SECONDS);

        //执行任务-结果
        Map<String, Integer> resultMap = executor.execute(tasks);
        data.setO2Result(resultMap.get("o2"));
        data.setCo2Result(resultMap.get("co2"));
        data.setCh4Result(resultMap.get("ch4"));
        data.setNh4Result(resultMap.get("nh4"));
        data.setSo2Result(resultMap.get("so2"));
        data.setH2sResult(resultMap.get("h2s"));
        data.setWvpResult(resultMap.get("wvp"));
        return data;
    }

    /**
     * 具体方法
     */
    private Integer smoothPolynomial(DoubleList arrayList, int width, int repeat) {
        if (arrayList == null || arrayList.isEmpty()) {
            return 0;
        }
        return 1;
    }

    /**
     * MetSample对象
     */
    public static class MetSample {
        private List<String> timeIndex;
        private DoubleList o2List;
        private DoubleList co2List;
        private DoubleList ch4List;
        private DoubleList nh4List;
        private DoubleList so2List;
        private DoubleList h2sList;
        private DoubleList wvpList;
        private Integer o2Result;
        private Integer co2Result;
        private Integer ch4Result;
        private Integer nh4Result;
        private Integer so2Result;
        private Integer h2sResult;
        private Integer wvpResult;

        /**
         * 返回时间轴。
         */
        public List<String> getTimeIndex() {
            return timeIndex;
        }

        /**
         * 设置时间轴。
         */
        public void setTimeIndex(List<String> timeIndex) {
            this.timeIndex = timeIndex;
        }

        /**
         * 返回 O2 原始序列。
         */
        public DoubleList getO2List() {
            return o2List;
        }

        /**
         * 设置 O2 原始序列。
         */
        public void setO2List(DoubleList o2List) {
            this.o2List = o2List;
        }

        /**
         * 返回 CO2 原始序列。
         */
        public DoubleList getCo2List() {
            return co2List;
        }

        /**
         * 设置 CO2 原始序列。
         */
        public void setCo2List(DoubleList co2List) {
            this.co2List = co2List;
        }

        /**
         * 返回 CH4 原始序列。
         */
        public DoubleList getCh4List() {
            return ch4List;
        }

        /**
         * 设置 CH4 原始序列。
         */
        public void setCh4List(DoubleList ch4List) {
            this.ch4List = ch4List;
        }

        /**
         * 返回 NH4 原始序列。
         */
        public DoubleList getNh4List() {
            return nh4List;
        }

        /**
         * 设置 NH4 原始序列。
         */
        public void setNh4List(DoubleList nh4List) {
            this.nh4List = nh4List;
        }

        /**
         * 返回 SO2 原始序列。
         */
        public DoubleList getSo2List() {
            return so2List;
        }

        /**
         * 设置 SO2 原始序列。
         */
        public void setSo2List(DoubleList so2List) {
            this.so2List = so2List;
        }

        /**
         * 返回 H2S 原始序列。
         */
        public DoubleList getH2sList() {
            return h2sList;
        }

        /**
         * 设置 H2S 原始序列。
         */
        public void setH2sList(DoubleList h2sList) {
            this.h2sList = h2sList;
        }

        /**
         * 返回 WVP 原始序列。
         */
        public DoubleList getWvpList() {
            return wvpList;
        }

        /**
         * 设置 WVP 原始序列。
         */
        public void setWvpList(DoubleList wvpList) {
            this.wvpList = wvpList;
        }

        /**
         * 返回 O2 平滑结果。
         */
        public Integer getO2Result() {
            return o2Result;
        }

        /**
         * 设置 O2 平滑结果。
         */
        public void setO2Result(Integer o2Result) {
            this.o2Result = o2Result;
        }

        /**
         * 返回 CO2 平滑结果。
         */
        public Integer getCo2Result() {
            return co2Result;
        }

        /**
         * 设置 CO2 平滑结果。
         */
        public void setCo2Result(Integer co2Result) {
            this.co2Result = co2Result;
        }

        /**
         * 返回 CH4 平滑结果。
         */
        public Integer getCh4Result() {
            return ch4Result;
        }

        /**
         * 设置 CH4 平滑结果。
         */
        public void setCh4Result(Integer ch4Result) {
            this.ch4Result = ch4Result;
        }

        /**
         * 返回 NH4 平滑结果。
         */
        public Integer getNh4Result() {
            return nh4Result;
        }

        /**
         * 设置 NH4 平滑结果。
         */
        public void setNh4Result(Integer nh4Result) {
            this.nh4Result = nh4Result;
        }

        /**
         * 返回 SO2 平滑结果。
         */
        public Integer getSo2Result() {
            return so2Result;
        }

        /**
         * 设置 SO2 平滑结果。
         */
        public void setSo2Result(Integer so2Result) {
            this.so2Result = so2Result;
        }

        /**
         * 返回 H2S 平滑结果。
         */
        public Integer getH2sResult() {
            return h2sResult;
        }

        /**
         * 设置 H2S 平滑结果。
         */
        public void setH2sResult(Integer h2sResult) {
            this.h2sResult = h2sResult;
        }

        /**
         * 返回 WVP 平滑结果。
         */
        public Integer getWvpResult() {
            return wvpResult;
        }

        /**
         * 设置 WVP 平滑结果。
         */
        public void setWvpResult(Integer wvpResult) {
            this.wvpResult = wvpResult;
        }
    }

    /**
     * 简单的 double 列表包装,方便示例里传参。
     */
    public static class DoubleList {
        private final List<Double> values = new ArrayList<Double>();

        /**
         * 用一组 double 快速创建列表。
         */
        public static DoubleList of(double... numbers) {
            DoubleList list = new DoubleList();
            if (numbers != null) {
                for (double number : numbers) {
                    list.add(number);
                }
            }
            return list;
        }

        /**
         * 追加一个值。
         */
        public void add(double value) {
            values.add(value);
        }

        /**
         * 读取指定位置的值。
         */
        public double get(int index) {
            return values.get(index);
        }

        /**
         * 返回列表长度。
         */
        public int size() {
            return values.size();
        }

        /**
         * 判断列表是否为空。
         */
        public boolean isEmpty() {
            return values.isEmpty();
        }

        /**
         * 复制一份当前列表。
         */
        public DoubleList copy() {
            DoubleList copy = new DoubleList();
            copy.values.addAll(this.values);
            return copy;
        }

        /**
         * 直接按列表格式输出。
         */
        @Override
        public String toString() {
            return values.toString();
        }
    }
}

四、这套封装的价值

我个人觉得,这种封装在项目里主要有下面几个好处。

1. 业务代码更清爽

调用方不用反复写线程池提交、结果等待和异常处理。

2. 更适合多个独立任务并发执行

像多个指标计算、多个接口聚合、多个统计任务并发跑,这类场景都很适合。

3. 异常更统一

单任务异常和整批执行异常都做了统一包装,排查问题时思路会更清楚。

4. 线程池可配置

队列、容量、拒绝策略、线程数这些都能按业务情况调,不是写死的。

5. 后续扩展空间比较大

比如后面要加:

  • 任务耗时统计
  • 失败降级
  • 默认值兜底
  • TraceId 透传
  • 监控埋点

都可以继续往这套结构上加。


五、使用时要注意的地方

虽然这种封装很好用,但也不是所有场景都适合直接套。

1. 有依赖关系的任务不适合硬并发

如果任务 B 一定要等任务 A 的结果,那就不适合简单拆成并行任务。

2. 线程池参数别盲目放大

线程数不是越大越好,还是要结合任务类型来调。

  • CPU 型任务,线程太多反而会增加切换开销
  • IO 型任务,可以适当多一些,但也不能无限放大

3. 当前执行器更适合“一次执行一批任务”

因为 execute(...) 结束后会关闭线程池,所以它更适合这种模式:

  • 创建执行器
  • 执行一批任务
  • 收结果
  • 关闭资源

如果你的系统里这个逻辑会被高频调用,那后面可以考虑把线程池抽出来统一复用。

4. 超时时间要结合业务来定

示例里用了 60 秒,这只是当前业务的一种配置,不一定适合所有场景。


六、总结

这套代码的核心,其实不是“写了一个线程池工具类”,而是把并行执行过程中最容易重复、最容易出错的部分统一封装起来了。

它做了几件很关键的事:

  • 统一任务抽象
  • 统一线程池构建
  • 统一异常处理
  • 统一超时取消
  • 统一结果收集
  • 统一资源关闭

放到业务里看,这种设计最大的好处就是:

业务层只需要定义任务,底层组件负责把这批任务平稳地并行执行完。

如果你的项目里也有“多个独立任务并行执行,最后统一返回结果”的需求,这套思路还是挺值得参考的。

更多推荐