Java高并发系列5-线程池
Java高并发系列5-线程池接上一篇Java并发系列4-并发容器我们继续在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要开个新线程去执行,则这些线程的创建和销毁将消耗大量的资源;并且很难对其单个线程进行控制,更何况有一堆的线程在执行。这时就需要线程池来对线程进行管理。在线程池的管理下,线程分为启动,执行,空闲状态, 如果新来任务则...
Java高并发系列5-线程池
接上一篇Java并发系列4-并发容器我们继续
在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要开个新线程去执行,则这些线程的创建和销毁将消耗大量的资源;并且很难对其单个线程进行控制,更何况有一堆的线程在执行。这时就需要线程池来对线程进行管理。
在线程池的管理下,线程分为启动,执行,空闲状态, 如果新来任务则将任务交给空闲线程执行即可。 先看一条程序来了解一下线程池
mport java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
service.shutdown(); // 关闭线程, 但是不会立即终止线程池中的线程。 而是先标记Stop状态,停止接受线程,直到所有线程均执行完毕。
/// service.shutdownNow(); /// 立刻关闭线程池, 尝试打断所有在执行的线程。 并将状态标记为Shutdown 。
System.out.println(service.isTerminated()); /// CPU是否空闲, 只要线程池中有任务在执行 就是false
System.out.println(service.isShutdown()); /// 是否是只要调用上边任意一个shutdown 方法 就返回true
System.out.println(service);
TimeUnit.SECONDS.sleep(5);
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
}
}
Executor 和 ExecutorService 和 Executors
看一下 Executor
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
这个类是Executor ,实现的方法 也只有 execute 方法 。接受一个Runnable ,实现可以参考该类的注释。
*
直接执行
* <pre> {@code
* class DirectExecutor implements Executor {
* public void execute(Runnable r) {
* r.run();
* }
* }}</pre>
*
* More typically, tasks are executed in some thread other than the
* caller's thread. The executor below spawns a new thread for each
* task.
* 创建一个新的线程来执行runable的r
* <pre> {@code
* class ThreadPerTaskExecutor implements Executor {
* public void execute(Runnable r) {
* new Thread(r).start();
* }
* }}</pre>
*
继续看, 精简一下 ,列出了主要的方法
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
...
}
我们大概知道了 该方法就是线程池的大概的框架接口, 提供的 提交任务,结束任务,池状态类。 仔细看就会发现 submit 不仅可以接受Runable ,也可以接受Callable 为参数 。
现在大概说一下 Runnable 与Callable 与 FutureTask 的区别
主要区别
Callable提供了泛型T作为任务执行的结果 。
而Runnable没有任务返回值。
FutureTask 看类结构 代码这里就不再粘了, 不作为本文的重点展开描述。
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口继承自 Runnable , Future . 用大腿想一下 大概也就知道了, 这个类就拥有了 Runnable 和Future 共同的特性,并且可以接收Callable为参数 构建FutureTask 来执行。
再说一下 Future , Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
-
cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
-
isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
-
get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
-
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
举个生活场景来描述这一部分应用。
早晨到蛋糕店买蛋糕(有目的性的任务),告诉店员蛋糕需求(长宽,需求,味道等),店员告诉你要下午才能做好,并给你了个取蛋糕收条(Future<蛋糕>) , 结果下午你提前到了,拿着蛋糕收条取蛋糕 调用 get() ,由于提前到了,所以人还没做好,只能扣手机了。 这时你是阻塞在这里,等待蛋糕做好取走。 当然这时可以设置等待时间,比如我只等30分钟,超时没做好 ,太慢了没做好 大爷的不要了(土豪)。 这就是提交submit ,执行任务, 等待获取get到蛋糕。
再来看一下Executors ,看一下Executors 的使用方式, newFixedThreadPool, newWorkStealingPool,newFixedThreadPool,newSingleThreadExecutor ,newCachedThreadPool 等创建线程池的方式。 大概就明白了这玩意儿和Arrays差不多, Arrays 对数组的各种操作, 集合。
常见的线程池使用大概有四种 看了实现调用方法, 都是从ThreadPoolExecutor 的构造函数中配置不同的参数而构造的线程池。
看一下 ThreadPoolExecutor 共有四个构造函数, 我们看一下最全的参数最多的构造函数 , 瞜一眼代码
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
///核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运
行的线程数少于corePoolSize,则创建新线程来处理任务;如果等于或者多于corePoolSize,则不再创建。如
果调用线程池的 prestartAllcoreThread方法,线程池会提前创建并启动所有的核心线程来等待任务。
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
线程池允许创建的最大线程数。如果任务队列满了并且线程数小于
maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
//非核心线程闲置的超时时间。超过这个时间则回收。
* @param unit the time unit for the {@code keepAliveTime} argument
/// 单位 时,分,秒,毫秒
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务
队列是BlockingQueue类型的,也就是阻塞队列。
* @param threadFactory the factory to use when the executor
* creates a new thread , 线程工厂,
可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
线程饱和策略, 这是当任务队列和线程池都满了时所采取的应对策略,默认
是AbordPolicy,表示无法处理新任务,并抛出RejectedExecutionException异常。
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 构造函数的意义都已明白,再看一下线程池的工作流程。
- 提交任务到线程池, 提交任务后,线程池先判断线程数是否达到了核心线程数(corePoolSize)。如果没有达到核心线程数,则创建核心线程来执行该任务。 如果达到了核心线程数,转2 。
- 则判断任务队列是否已满,没有满则将任务添加至任务列表等待。 如果满了转3.
- 如果满了则判断是否达到了最大线程数。 如果没有达到最大线程数则创建非核心线程来执行此任务。 如果达到了最大线程数则执行handler ,即饱和策略。
还有一个类
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService{
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
}
这个类 虽然起了个新名字,但结构也简单 ,实现继承自ThreadPoolExecutor , 构造函数使用的也是父类的构造函数, 值得注意的一点是,使用的任务队列是DelayQueue ,简单说一下 DelayQueue。
DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现
Delayed 接口。创建元素时,可以指定元素到期的时间,只有在元素到期时才能从队列中取走。
使用这样的队列就给ScheduledThreadPoolExecutor 的Scheduled 起到了关键性作用。
接下来大概看一下这几种常见的线程池。
- FixedThreadPool
FixedThreadPool 是可重用固定线程数的线程池。我们看到核心线程数和最大线程数是一样的,意味着该方式创建的线程池只有核心线程。没有非核心线程。keepAliveTime设置为0L意味着多余的线程会被立即终止。因为不会产生多余的线程,所以keepAliveTime是无效的参数。另外,任务队列采用了无界的阻塞队列LinkedBlockingQueue。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
执行任务流程大概是
如果当前运行的线程未达到corePoolSize(核心线程数)时就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过corePoolSize 时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行。同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝。添加到不能添加为止。
- SingleThreadExecutor
SingleThreadExecutor是使用单个工作线程的线程池,构造函数调用方法如下代码。
corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都
和FixedThreadPool一样,这里就不赘述了。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
值得一提的是因为这种创建方式是,核心线程和总线程数量都为1,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因此,SingleThreadExecutor能确保所有的任务在一个线程, 所以所有提交的线程只能按照队列顺序依次执行。
SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题。
- CachedThreadPool
CachedThreadPool是一个根据需要创建线程的线程池,构建线程池的构造函数如下。corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,这意味着CachedThreadPool没有核心线程,非核心线程是无界的。keepAliveTime设置为60L,则空闲线程等待新任务的最长时间为 60s。同时使用的任务队列是SynchronousQueue同步队列,这就意味着线程池的数量无限大,新任务会直接分配或者创建一个线程进行执行。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
首先会执行SynchronousQueue的offer方法来提交任务,并且查询线程池中是否有空闲的线程执行SynchronousQueue的poll方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行SynchronousQueue的poll方法,等待SynchronousQueue中新提交的任务。如果超过 60s 没有新任务提交到
SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize 是无界的,所以如果提交的任务大于线
程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。
所以 CachedThreadPool 比较适于大量的需要立即处理并且耗时较少的任务。 例如搞个棋牌室游戏,后台需要对每个人打的牌进行立刻转发。 使用CachedThreadPool比较高效。
- ScheduledThreadPool
ScheduledThreadPool是一个能实现定时和周期性任务的线程池,构造函数参数传递,实际就是调用了ThreadPoolExecutor的构造函数。
corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
当执行 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 或者 scheduleWithFixedDelay方法时,会向DelayedWorkQueue 添加一个 实现 RunnableScheduledFuture 接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到 corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而是去 DelayedWorkQueue 中取ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。
其跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中的time变量改为下次要执行的时间并放回到DelayedWorkQueue中。
/**
* Returns the nanoTime-based trigger time of a delayed action.
*/
long triggerTime(long delay) {
return System.nanoTime() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 将设置好delay时间的task ,添加至任务列表。
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
最后再看一下饱和策略 RejectedExecutionHandler 。这是当任务队列和线程池都满了时所采取的应对策略,默认
是AbordPolicy,表示无法处理新任务,并抛出RejectedExecutionException异常。此外还有3种策略,它们分
别如下。
(1)CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓
新任务的提交速度。
(2)DiscardPolicy:不能执行的任务,并将该任务删除。
(3)DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务
当然我们也可以自定义饱和策略。
我们也可以通过修改 ThreadPoolExecutor的构造函数来自定义任务处理策略。例如面对的业务是将数据异步写入HBase,当HBase严重超时的时候允许写入失败并记录日志以便事后补写。对于这种应用场景,如果使用FixedThreadPool,在HBase服务严重超时的时候会导致队列无限增长,引发内存问题;如果使用CachedThreadPool,会导致线程数量无限增长。对于这种场景,我们可以设置ExecutorService使用带有长度限制的队列以及限定最大线程个数的线程池,同时通过设置RejectedExecutionHandler处理任务被拒绝的情况。
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 处理任务被拒绝的情况,例如记录日志等
}
}
再来几种不太常用的
- ForkJoinPool是一种支持任务分解的线程池,当提交给他的任务“过大”,他就会按照预先定义的规则将大任务分解成小任务,多线程并发执行。一般要配合可分解任务接口ForkJoinTask来使用,ForkJoinTask有两个实现它的抽象类:RecursiveAction和RecursiveTask,其区别是前者没有返回值,后者有返回值。
来段demo 看下
public class ForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}
System.out.println(Arrays.stream(nums).sum()); //stream api
}
/*
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
*/
static class AddTask extends RecursiveTask<Long> {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
/// 当分配的数值区间,小于分配最大阈值 则进行任务fork ,
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
/// 将执行结果join在一起
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);
fjp.awaitTermination(20, TimeUnit.SECONDS);//等待20s,观察结果
long result = task.join();
System.out.println(result);
fjp.shutdown();
//System.in.read();
}
}
代码胜千言,简单明了。不再赘述。
- WorkStealingPool
(1)Steal,翻译为偷窃,窃取。这里的意思是,如果当前工作线程处理完自己本地任务队列中的任务时,就会去全局队列或者其他工程线程的队列里面查找工作任务,帮助它们完成。
(2)利用Work Staling,可以更好实现负载均衡。因为每个工作线程的任务都是不一样的,完成的时间也不一样。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
来一段demo
public class WorkStealingPool {
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool();
System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000)); //daemon
service.execute(new R(2000));
//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
System.in.read();
}
static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
-
这个线程池的优点:
(1)当添加任务时,唤醒一个空闲的工作线程,而不是一群线程,所以不会产生惊群现象。
(2)Work stealing pool,每个工作线程有自己的任务队列,当前完成自己本地的队列的任务时,会自动去全局队列里面获取任务来工作,或者去”偷“其他线程的队列里面的任务。
(3)当添加任务时,没有直接就把任务集中放在全局队列里面,避免工作线程集中去全局队列里面获取任务而造成频繁的锁开销。 -
这个线程的缺点:
这个线程池有一个很明显的缺陷,就是,如果线程池里只有一个线程时,所添加的工作任务不支持任务递归,什么意思呢?就是说,在线程所要执行的工作任务,不能再添加新的工作任务到线程池中,否则,会造成死锁。
为什么会有这个问题呢?
其实,跟这个线程池的实现有很大关系(这不是废话嘛),线程在执行任务时,用了加锁操作,而且只有在当前任务执行完成后才通过信号量的方式通知主线程(等待结果的线程)计算结果已经完成了,所以,如果在任务中递归执行添加新的任务在线程池中,就会造成死锁,因为第一个在执行第一个任务之前就锁住了线程。
一些可能的解决办法:
要怎么解决这个问题呢?一个可能性的解决方法是,对应这种内部的任务,另外开一个线程去执行。不过,因为时间的关系,我还没有试过。
总结:
- 对于希望提交的任务尽快分配线程执行,使用CachedThreadPool
- 对于需要保证所有提交的任务都要被执行的情况,使用FixedThreadPool
- 如果限定只能使用一个线程进行任务处理,使用SingleThreadExecutor
如果业务上允许任务执行失败,或者任务执行过程可能出现执行时间过长进而影响其他业务的应用场景,可以通过使用限定线程数量的线程池以及限定长度的队列进行容错处理。
参考资料:
https://blog.csdn.net/xhjcehust/article/details/45844901
https://www.cnblogs.com/ok-wolf/p/7761755.html
更多推荐
所有评论(0)