一、线程池原理

线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。他的主要特点为:线程复用、控制最大并发数、管理线程

线程复用

每一个 Thread 的类都有一个 start 方法, 当调用 start 启动线程时 Java 虚拟机会调用该类的 run方法,那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法; 我们可以继承重写Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象, 这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以是阻塞的。

二、线程生命周期(状态)

当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5 种状态。尤其是当线程启动以后,它不可能一直"霸占"着 CPU 独自运行,所以 CPU 需要在多条线程之间切换,于是线程状态也会多次在运行、阻塞之间切换

新建状态(NEW)

当程序使用 new 关键字创建了一个线程之后,该线程就处于新建状态,此时仅由 JVM 为其分配内存,并初始化其成员变量的值

就绪状态(RUNNABLE)

当线程对象调用了 start()方法之后,该线程处于就绪状态。Java 虚拟机会为其创建方法调用栈和程序计数器,等待调度运行。

运行状态(RUNNING)

如果处于就绪状态的线程获得了 CPU,开始执行 run()方法的线程执行体,则该线程处于运行状态。

阻塞状态(BLOCKED)

阻塞状态是指线程因为某种原因放弃了 cpu 使用权,也即让出了 cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得 cpu timeslice 转到运行(running)状态。阻塞的情况分三种:

  1. 等待阻塞(o.wait->等待对列):

运行(running)的线程执行 o.wait()方法,JVM 会把该线程放入等待队列(waitting queue) 中。

  1. 同步阻塞(lock->锁池) :

运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则 JVM 会把该线程放入锁池(lock pool)中。

  1. 其他阻塞(sleep/join) :

运行(running)的线程执行 Thread.sleep(long ms)或 t.join()方法,或者发出了 I/O 请求时,JVM 会把该线程置为阻塞状态。当 sleep()状态超时、join()等待线程终止或者超时、或者 I/O处理完毕时,线程重新转入可运行(runnable)状态。

线程死亡(DEAD)

线程会以下面三种方式结束,结束后就是死亡状态。

  1. 正常结束

run()或 call()方法执行完成,线程正常结束。

  1. 异常结束

线程抛出一个未捕获的 Exception 或 Error。

  1. 调用 stop

直接调用该线程的 stop()方法来结束该线程—该方法通常容易导致死锁,不推荐使用。

三、线程池组成

一般线程池主要组成部分

  1. 线程池管理器:用于创建并管理线程池

  1. 工作线程:线程池中的线程

  1. 任务接口:每个任务必须实现的接口,用于工作线程调度其运行

  1. 任务队列:用于存放待处理的任务,提供一种缓冲机制

Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors, ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。

ThreadPoolExecutor 构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  1. corePoolSize:指定了线程池中的线程数量。

  1. maximumPoolSize:指定了线程池中的最大线程数量。

  1. keepAliveTime:当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多次时间内会被销毁。

  1. unit:keepAliveTime 的单位。

  1. workQueue:任务队列,被提交但尚未被执行的任务。

  1. threadFactory:线程工厂,用于创建线程,一般用默认的即可。

  1. handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。

拒绝策略

线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。

JDK 内置的拒绝策略如下:

AbortPolicy : 直接抛出异常,阻止系统正常运行。

CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。

以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。

JAVA线程池工作过程

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。

  1. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:

  1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

  1. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;

  1. 如果这时候队列满了,而且正在运行的线程数量小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

  1. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,则线程池会抛出异常RejectExecutionException。

  1. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  1. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

四、JAVA 阻塞队列原理

阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:

  1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

  1. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

阻塞队列的主要方法

方法类型

抛出异常

特殊值

阻塞

超时

插入

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除

remove()

poll()

take()

poll(time,unit)

检查

element()

peek()

不可用

不可用

 抛出异常:抛出一个异常;

 特殊值:返回一个特殊值(null 或 false,视情况而定)

 则塞:在成功操作之前,一直阻塞线程

 超时:放弃前只在最大的时间内阻塞

插入数据操作
  1. public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。

  1. public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。

  1. public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)

public void put(E paramE) throws InterruptedException {
     checkNotNull(paramE);
     ReentrantLock localReentrantLock = this.lock;
     localReentrantLock.lockInterruptibly();
     try {
         while (this.count == this.items.length)
         this.notFull.await();//如果队列满了,则线程阻塞等待
         enqueue(paramE);
         localReentrantLock.unlock();
     } finally {
         localReentrantLock.unlock();
     }
 }
  1. offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。

获取数据操作
  1. poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;

  1. poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。

  1. take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。

  1. drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

Java 中的阻塞队列

1.ArrayBlockingQueue :由数组结构组成的有界阻塞队列。

2.LinkedBlockingQueue :由链表结构组成的有界阻塞队列。

3.PriorityBlockingQueue :支持优先级排序的无界阻塞队列。

4.DelayQueue:使用优先级队列实现的无界阻塞队列。

5.SynchronousQueue:不存储元素的阻塞队列。

6.LinkedTransferQueue:由链表结构组成的无界阻塞队列。

7.LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

ArrayBlockingQueue(公平、非公平)

用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

LinkedBlockingQueue(两个独立锁提高并发)

基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。

PriorityBlockingQueue(compareTo 排序实现优先)

是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造 参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue(缓存失效、定时任务 )

是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:

  1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。

  1. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。

SynchronousQueue(不存储数据、可用于传递数据)

是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。 SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线 程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给 另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 , LinkedTransferQueue 多了 tryTransfertransfer 方法。

  1. transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回

  1. tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。

对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。

LinkedBlockingDeque

是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。

五、线程池实现/创建方式

继承 Thread 类

Thread 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过 Thread 类的 start()实例方法。start()方法是一个 native 方法,它将启动一个新线程,并执行 run()方法。

public class ThreadTest extends Thread{

    @Override
    public void run() {
        System.out.println("ThreadTest run...");
    }

    public static void main(String[] args) {
        ThreadTest test = new ThreadTest();
        test.start();
    }
}

实现 Runnable 接口

如果自己的类已经 extends 另一个类,就无法直接 extends Thread,此时,可以实现一个Runnable 接口。(单继承多实现)

public class RunnableTest extends UserInfo implements Runnable {

    @Override
    public void run() {
        System.out.println("Runnable run...");
    }

    public static void main(String[] args) {
        //启动RunnableTest,需要首先实例化一个Thread,并传入自己的RunnableTest实例:
        RunnableTest runnableTest = new RunnableTest();
        Thread thread = new Thread(runnableTest);
        thread.start();

        //事实上,当传入一个 Runnable target 参数给 Thread 后,Thread 的 run()方法就会调用target.run()
        //public void run() {
        //    if (target != null) {
        //        target.run();
        //    }
        //}
    }
}

ExecutorService、Callable<Class>、Future 有返回值线程

有返回值的任务必须实现 Callable 接口,类似的,无返回值的任务必须 Runnable 接口。执行Callable 任务后,可以获取一个 Future 的对象,在该对象上调用 get 就可以获取到 Callable 任务返回的 Object 了,再结合线程池接口 ExecutorService 就可以实现传说中有返回结果的多线程了。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ExecutorServiceTest {

    private final static int taskSize = 2;

    static class MyCallable implements Callable<Integer>{
        private Integer i;

        public MyCallable(int i) {
            this.i = i;
        }

        @Override
        public Integer call() throws Exception {
            i = i+100;
            return i;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建一个线程池
        ExecutorService pool = Executors.newFixedThreadPool(taskSize);
        // 创建多个有返回值的任务
        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < taskSize; i++) {
            Callable c = new MyCallable(i);
            // 执行任务并获取 Future 对象
            Future f = pool.submit(c);
            list.add(f);
        }
        // 关闭线程池
        pool.shutdown();
        // 获取所有并发任务的运行结果
        for (Future f : list) {
            // 从 Future 对象上获取任务的返回值,并输出到控制台
            System.out.println("res:" + f.get().toString());
        }
    }
}

基于线程池的方式

线程和数据库连接这些资源都是非常宝贵的资源。那么每次需要的时候创建,不需要的时候销毁,是非常浪费资源的。那么我们就可以使用缓存的策略,也就是使用线程池。

Execute方式(无需返回值)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

    public static void main(String[] args) {
        // 创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 20; i++) {
            int j = i;
            threadPool.execute(new Runnable() { // 提交多个线程任务,并执行
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " is running .." + j);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

实现结果:

Submit方式(需返回值)
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolTest {

    public static void main(String[] args) {
        // 创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        List<String> strings = new ArrayList<>();
        strings.add("1");
        strings.add("2");
        strings.add("3");
        strings.add("4");
        strings.add("5");
        strings.add("6");
        strings.add("7");
        strings.add("8");
        strings.add("9");
        ArrayList<Future<Object>> objects = new ArrayList<>(strings.size());
        for (String s : strings) {
            Future<Object> future = threadPool.submit(() -> Thread.currentThread().getName()  + s + 1);
            objects.add(future);
        }
        for (Future<Object> futrue : objects) {
            try {
                System.out.println(futrue.get());
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        threadPool.shutdown();
    }
}

实现结果:

4种线程池实现方式

Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是 ExecutorService

newCachedThreadPool

创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。

newFixedThreadPool

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

newScheduledThreadPool

创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3); 
scheduledThreadPool.schedule(newRunnable(){ 
	@Override 
	public void run() {
		System.out.println("延迟三秒");
	}
}, 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(newRunnable(){ 
	@Override 
	public void run() {
		System.out.println("延迟 1 秒后每三秒执行一次");
	}
},1,3,TimeUnit.SECONDS);

newSingleThreadExecutor

Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去

希望本篇文章能给大家带来满满的收获!😆😆😆

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐