什么是线程池?为什么要用线程池?

1、降低资源的消耗。降低线程创建和销毁的资源消耗;
2、提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
3、提高线程的可管理性。

实现一个我们自己的线程池

1、线程必须在池子已经创建好了,并且可以保持住,要有容器保存多个线程;
2、线程还要能够接受外部的任务,运行这个任务。容器保持这个来不及运行的任务.

MyThreadPool

package com.enjoy.demo.p1.ch6.mypool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @Author: BillYu
 * @Description:自己线程池的实现
 * @Date: Created in 15:19 2019-03-12.
 */
public class MyThreadPool {
    /**
     * 线程池中默认线程的个数为5
     */
    private static int WORK_NUM = 5;

    /**
     * 队列默认任务个数为100
     */
    private static int TASK_COUNT = 100;

    /**
     * 工作线程
     */
    private WorkThread[] workThreads;
    /**
     * 任务队列 作为一个缓冲
     */
    private final BlockingQueue<Runnable> taskQueue;

    /**
     * 用户构造这个池,希望的启动的线程数
     */
    private final int worker_num;

    /**
     * 创建具有默认线程个数的线程池
     */
    public MyThreadPool(){
        this(WORK_NUM,TASK_COUNT);
    }

    /**
     * 创建线程池 ,worker_num为线程池中工作线程的个数
     * @param worker_num
     * @param taskCount
     */
    public MyThreadPool( int worker_num,int taskCount) {
        if(worker_num<=0){
            worker_num = WORK_NUM;
        }
        if(taskCount<=0){
            taskCount=TASK_COUNT;
        }

        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);

        workThreads = new WorkThread[worker_num];
        for (int i =0 ;i<worker_num;i++){
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
    }

    /**
     * 执行任务,其实只是任务加入任务队列,什么时候执行有线程池管理器决定
     */
    public void execute(Runnable task){
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 销毁线程池,该方法保证所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
     */
    public void destory(){
        //工作线程停止工作,且置为null
        System.out.println("ready close pool...");
        for (int i =0 ;i<worker_num;i++){
            //help gc
            workThreads[i].stopWorker();
            workThreads[i] = null;
        }
        //清空任务队列
        taskQueue.clear();
    }

    @Override
    public String toString() {
        return "workThread number" + worker_num +
                "wait task number:" + taskQueue.size();
    }

    /**
     * 内部类,工作线程
     */
    private class WorkThread extends Thread{
        @Override
        public void run() {
            Runnable r = null;
            while (!isInterrupted()){
                try {
                    r = taskQueue.take();
                    if(r!=null){
                        System.out.println(getId()+" ready exec :"+r);
                        r.run();
                    }
                    //help gc
                    r = null;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void stopWorker(){
            interrupt();
        }
    }
}

测试类

package com.enjoy.demo.p1.ch6.mypool;

import java.util.Random;

/**
 * @Author: BillYu
 * @Description:
 * @Date: Created in 16:07 2019-03-12.
 */
public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException{
//        创建3个线程
        MyThreadPool pool = new MyThreadPool(3,0);
        pool.execute(new MyTask("testA"));
        pool.execute(new MyTask("testB"));
        pool.execute(new MyTask("testC"));
        pool.execute(new MyTask("testD"));
        pool.execute(new MyTask("testE"));
        System.out.println(pool);
        Thread.sleep(10000);
        //所有任务执行完成才destory
        pool.destory();
        System.out.println(pool);
        //机器的cpu核心数
        System.out.println(Runtime.getRuntime().availableProcessors());

    }

    /**
     * 任务类
     */
    static class MyTask implements Runnable{
        private String name;
        private Random r = new Random();
        public MyTask(String name){
            this.name = name;
        }

        public String getName(){
            return name;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任务 " + name + " 完成");
        }
    }

}

JDK中的线程池和工作机制

1.线程池的创建
ThreadPoolExecutor,jdk所有线程池实现的父类
2.各个参数含义
int corePoolSize :线程池中核心线程数,< corePoolSize ,就会创建新线程,= corePoolSize ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize 个数的线程。
int maximumPoolSize, 允许的最大线程数,BlockingQueue也满了,< maximumPoolSize时候就会再次创建新的线程
long keepAliveTime, 线程空闲下来后,存活的时间,这个参数只在> corePoolSize才有用
TimeUnit unit, 存活时间的单位值
BlockingQueue<Runnable> workQueue, 保存任务的阻塞队列
ThreadFactory threadFactory, 创建线程的工厂,给新建的线程赋予名字
RejectedExecutionHandler handler :饱和策略
AbortPolicy :直接抛出异常,默认;
CallerRunsPolicy:用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务
DiscardPolicy :当前任务直接丢弃
3.实现自己的饱和策略,实现RejectedExecutionHandler接口即可

package com.enjoy.demo.p1.ch6;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author: BillYu
 * @Description:测试有界队列线程池,实现自定义拒绝策略
 * @Date: Created in 11:11 2019-03-14.
 * 参考:https://blog.csdn.net/liangzelei/article/details/80693729
 */
public class MyPoolRejectHandle {
    static class MyTask implements Runnable{

        private int taskId;
        private String taskName;

        public int getTaskId() {
            return taskId;
        }

        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }

        public MyTask(int taskId, String taskName) {
            super();
            this.taskId = taskId;
            this.taskName = taskName;
        }
        @Override
        public String toString() {
            return Integer.toString(this.taskId);
        }

        @Override
        public void run() {
            try {
                System.out.println("run taskId = " + this.taskId);
                Thread.sleep(5*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }



    static class MyRejected implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // TODO Auto-generated method stub
            System.out.println("自定义处理...");
            System.out.println("当前被拒绝任务为:"+ r.toString());
        }
    }


    public static void main(String[] args) {

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,  // coresize
                2,  // maxsize
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3)
                ,new MyRejected()
        );
        MyTask mt1 = new MyTask(1, "任务1");
        MyTask mt2 = new MyTask(2, "任务2");
        MyTask mt3 = new MyTask(3, "任务3");
        MyTask mt4 = new MyTask(4, "任务4");
        MyTask mt5 = new MyTask(5, "任务5");
        MyTask mt6 = new MyTask(6, "任务6");
        MyTask mt7 = new MyTask(7, "任务7");

        pool.execute(mt1);
        pool.execute(mt2);
        pool.execute(mt3);
        pool.execute(mt4);
        pool.execute(mt5);
        pool.execute(mt6);
        pool.execute(mt7);

        pool.shutdown();
    }
}

4.提交任务
execute(Runnable command) 不需要返回
Future<T> submit(Callable<T> task) 需要返回
4.关闭线程池
shutdown(),shutdownNow();
shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程
shutdown()设置线程池的状态,只会中断所有没有执行任务的线程

工作机制

7E2C6399-1592-4C19-A734-0897ACCDB377.png

合理配置线程池

根据任务的性质来:计算密集型(CPU),IO密集型,混合型
计算密集型:加密,大数分解,正则……., 线程数适当小一点,最大推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)
IO密集型:读取文件,数据库连接,网络通讯, 线程数适当大一点,机器的Cpu核心数*2,
混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,IO密集型~计算密集型
队列的选择上,应该使用有界,无界队列可能会导致内存溢出,OOM

预定义的线程池

1.FixedThreadPool
创建固定线程数量的,适用于负载较重的服务器,使用了无界队列
2.SingleThreadExecutor
创建单个线程,需要顺序保证执行任务,不会有多个线程活动,使用了无界队列
3.CachedThreadPool
会根据需要来创建新线程的,执行很多短期异步任务的程序,使用了SynchronousQueue
4.WorkStealingPool(JDK7以后) 
基于ForkJoinPool实现
5.ScheduledThreadPoolExecutor 
需要定期执行周期任务,Timer不建议使用了。
newSingleThreadScheduledExecutor:只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务
newScheduledThreadPool 可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候
方法说明:
schedule:只执行一次,任务还可以延时执行
scheduleAtFixedRate:提交固定时间间隔的任务
scheduleWithFixedDelay:提交固定延时间隔执行的任务

使用方式

package com.enjoy.demo.p1.ch6;

import com.enjoy.demo.p1.ch1.class1.SleepTools;

import java.util.Random;
import java.util.concurrent.*;

/**
 * @Author: BillYu
 * @Description:线程池的使用
 * @Date: Created in 15:19 2019-03-12.
 */
public class UserThreadPool {
    /**
     * 工作线程
     */
    static class Worker implements Runnable{
        private String taskName;

        private Random r = new Random();

        public Worker(String taskName) {
            this.taskName = taskName;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+" process the task : "+taskName);
            SleepTools.ms(r.nextInt(100)*5);
        }
    }

    static class CallWorker implements Callable<String> {
        private String taskName;

        private Random r = new Random();

        public CallWorker(String taskName) {
            this.taskName = taskName;
        }

        public String getTaskName() {
            return taskName;
        }
        @Override
        public String call() {
            System.out.println(Thread.currentThread().getName()+" process the task : "+taskName);
            return Thread.currentThread().getName()+":"+r.nextInt(100)*5;
        }
    }

    public static void main(String[] args) throws InterruptedException,ExecutionException {
        ExecutorService pool = new ThreadPoolExecutor(2,4,3, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.DiscardOldestPolicy());
//        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i=0;i<6;i++){
            Worker worker = new Worker("worker_"+i);
            pool.execute(worker);
        }
        for (int i=0;i<6;i++){
            CallWorker callWorker = new CallWorker("callWorker"+i);
            Future<String> result = pool.submit(callWorker);
            System.out.println(result.get());
        }
    }
}

CompletionService

CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。

在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。

使用CompletionService 可以取出队列中的先完成任务的结果
任务类WorkTask 随机休眠时长并返回

package com.enjoy.demo.p1.ch6.comps;

import java.util.Random;
import java.util.concurrent.Callable;

/**
 * @Author: BillYu
 * @Description:任务类 随机休眠时长并返回
 * @Date: Created in 15:38 2019-03-14.
 */
public class WorkTask implements Callable<Integer> {
    private String name;

    public WorkTask(String name) {
        this.name = name;
    }

    @Override
    public Integer call() throws Exception {
        int sleepTime = new Random().nextInt(1000);
        try{
            Thread.sleep(sleepTime);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        //返回给调用者的值
        return sleepTime;

    }
}

两种处理方式的对比

package com.enjoy.demo.p1.ch6.comps;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: BillYu
 * @Description:CompletionService 可以取出队列中的先完成任务的结果
 * @Date: Created in 15:42 2019-03-14.
 */
public class CompletionCase {
    private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();

    private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;

    /**
     * 方法一 自己写集合实现获取线程池中任务的返回结果
     * @throws Exception
     */
    public void testByQueue()throws Exception{
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        //创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        //容器存放提交给线程池的任务
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
        //向里面扔任务
        for (int i=0;i<TOTAL_TASK;i++){
            Future<Integer> future = pool.submit(new WorkTask("ExecTasj"+i));
            queue.add(future);
        }

        //检查线程池任务执行结果
        for (int i =0;i<TOTAL_TASK;i++){
            int sleptTime = queue.take().get();
            System.out.println(" slept "+sleptTime+" ms ...");
            count.addAndGet(sleptTime);
        }
        //关闭线程池
        pool.shutdown();
        System.out.println("-----------------take sleep time "+count.get()
            +"ms,and spend time "
            +(System.currentTimeMillis()-start)+" ms");
    }

    /**
     * 方法二,通过CompletionService来实现获取线程池中任务的返回结果
     */
    public void testByCompletion()throws Exception{
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        //创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        CompletionService<Integer> cService = new ExecutorCompletionService<>(pool);

        //向里面扔任务
        for (int i=0;i<TOTAL_TASK;i++){
            cService.submit(new WorkTask("ExecTask"+i));
        }

        //检查线程池任务执行结果
        for (int i=0;i<TOTAL_TASK;i++){
            int sleptTime = cService.take().get();
            System.out.println(" slept "+sleptTime+" ms ...");
            count.addAndGet(sleptTime);
        }

        //关闭线程池
        pool.shutdown();
        System.out.println("-------------------takes sleep time "+count.get()
        +"ms ,and spend time "+(System.currentTimeMillis()-start)+" ms");
    }

    public static void main(String[] args)throws Exception {
        CompletionCase t = new CompletionCase();
        t.testByQueue();
        t.testByCompletion();

    }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐