线程池的正确使用
什么是线程池?为什么要用线程池?1、降低资源的消耗。降低线程创建和销毁的资源消耗;2、提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间3、提高线程的可管理性。实现一个我们自己的线程池1、线程必须在池子已经创建好了,并且可以保持住,要有容器保存多个线程;2、线程还要能够接受外部的任务,运行这个任务。容器保持这个来不及运行的任务.MyThrea...
什么是线程池?为什么要用线程池?
1、降低资源的消耗。降低线程创建和销毁的资源消耗;
2、提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
3、提高线程的可管理性。
实现一个我们自己的线程池
1、线程必须在池子已经创建好了,并且可以保持住,要有容器保存多个线程;
2、线程还要能够接受外部的任务,运行这个任务。容器保持这个来不及运行的任务.
MyThreadPool
package com.enjoy.demo.p1.ch6.mypool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @Author: BillYu
* @Description:自己线程池的实现
* @Date: Created in 15:19 2019-03-12.
*/
public class MyThreadPool {
/**
* 线程池中默认线程的个数为5
*/
private static int WORK_NUM = 5;
/**
* 队列默认任务个数为100
*/
private static int TASK_COUNT = 100;
/**
* 工作线程
*/
private WorkThread[] workThreads;
/**
* 任务队列 作为一个缓冲
*/
private final BlockingQueue<Runnable> taskQueue;
/**
* 用户构造这个池,希望的启动的线程数
*/
private final int worker_num;
/**
* 创建具有默认线程个数的线程池
*/
public MyThreadPool(){
this(WORK_NUM,TASK_COUNT);
}
/**
* 创建线程池 ,worker_num为线程池中工作线程的个数
* @param worker_num
* @param taskCount
*/
public MyThreadPool( int worker_num,int taskCount) {
if(worker_num<=0){
worker_num = WORK_NUM;
}
if(taskCount<=0){
taskCount=TASK_COUNT;
}
this.worker_num = worker_num;
taskQueue = new ArrayBlockingQueue<>(taskCount);
workThreads = new WorkThread[worker_num];
for (int i =0 ;i<worker_num;i++){
workThreads[i] = new WorkThread();
workThreads[i].start();
}
}
/**
* 执行任务,其实只是任务加入任务队列,什么时候执行有线程池管理器决定
*/
public void execute(Runnable task){
try {
taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 销毁线程池,该方法保证所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
*/
public void destory(){
//工作线程停止工作,且置为null
System.out.println("ready close pool...");
for (int i =0 ;i<worker_num;i++){
//help gc
workThreads[i].stopWorker();
workThreads[i] = null;
}
//清空任务队列
taskQueue.clear();
}
@Override
public String toString() {
return "workThread number" + worker_num +
"wait task number:" + taskQueue.size();
}
/**
* 内部类,工作线程
*/
private class WorkThread extends Thread{
@Override
public void run() {
Runnable r = null;
while (!isInterrupted()){
try {
r = taskQueue.take();
if(r!=null){
System.out.println(getId()+" ready exec :"+r);
r.run();
}
//help gc
r = null;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stopWorker(){
interrupt();
}
}
}
测试类
package com.enjoy.demo.p1.ch6.mypool;
import java.util.Random;
/**
* @Author: BillYu
* @Description:
* @Date: Created in 16:07 2019-03-12.
*/
public class TestMyThreadPool {
public static void main(String[] args) throws InterruptedException{
// 创建3个线程
MyThreadPool pool = new MyThreadPool(3,0);
pool.execute(new MyTask("testA"));
pool.execute(new MyTask("testB"));
pool.execute(new MyTask("testC"));
pool.execute(new MyTask("testD"));
pool.execute(new MyTask("testE"));
System.out.println(pool);
Thread.sleep(10000);
//所有任务执行完成才destory
pool.destory();
System.out.println(pool);
//机器的cpu核心数
System.out.println(Runtime.getRuntime().availableProcessors());
}
/**
* 任务类
*/
static class MyTask implements Runnable{
private String name;
private Random r = new Random();
public MyTask(String name){
this.name = name;
}
public String getName(){
return name;
}
@Override
public void run() {
try {
Thread.sleep(r.nextInt(1000)+2000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
+Thread.currentThread().isInterrupted());
}
System.out.println("任务 " + name + " 完成");
}
}
}
JDK中的线程池和工作机制
1.线程池的创建
ThreadPoolExecutor,jdk所有线程池实现的父类
2.各个参数含义
int corePoolSize :线程池中核心线程数,< corePoolSize ,就会创建新线程,= corePoolSize ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize 个数的线程。
int maximumPoolSize, 允许的最大线程数,BlockingQueue也满了,< maximumPoolSize时候就会再次创建新的线程
long keepAliveTime, 线程空闲下来后,存活的时间,这个参数只在> corePoolSize才有用
TimeUnit unit, 存活时间的单位值
BlockingQueue<Runnable> workQueue, 保存任务的阻塞队列
ThreadFactory threadFactory, 创建线程的工厂,给新建的线程赋予名字
RejectedExecutionHandler handler :饱和策略
AbortPolicy :直接抛出异常,默认;
CallerRunsPolicy:用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务
DiscardPolicy :当前任务直接丢弃
3.实现自己的饱和策略,实现RejectedExecutionHandler接口即可
package com.enjoy.demo.p1.ch6;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: BillYu
* @Description:测试有界队列线程池,实现自定义拒绝策略
* @Date: Created in 11:11 2019-03-14.
* 参考:https://blog.csdn.net/liangzelei/article/details/80693729
*/
public class MyPoolRejectHandle {
static class MyTask implements Runnable{
private int taskId;
private String taskName;
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public MyTask(int taskId, String taskName) {
super();
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public String toString() {
return Integer.toString(this.taskId);
}
@Override
public void run() {
try {
System.out.println("run taskId = " + this.taskId);
Thread.sleep(5*1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
static class MyRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// TODO Auto-generated method stub
System.out.println("自定义处理...");
System.out.println("当前被拒绝任务为:"+ r.toString());
}
}
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, // coresize
2, // maxsize
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3)
,new MyRejected()
);
MyTask mt1 = new MyTask(1, "任务1");
MyTask mt2 = new MyTask(2, "任务2");
MyTask mt3 = new MyTask(3, "任务3");
MyTask mt4 = new MyTask(4, "任务4");
MyTask mt5 = new MyTask(5, "任务5");
MyTask mt6 = new MyTask(6, "任务6");
MyTask mt7 = new MyTask(7, "任务7");
pool.execute(mt1);
pool.execute(mt2);
pool.execute(mt3);
pool.execute(mt4);
pool.execute(mt5);
pool.execute(mt6);
pool.execute(mt7);
pool.shutdown();
}
}
4.提交任务
execute(Runnable command) 不需要返回
Future<T> submit(Callable<T> task) 需要返回
4.关闭线程池
shutdown(),shutdownNow();
shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程
shutdown()设置线程池的状态,只会中断所有没有执行任务的线程
工作机制
合理配置线程池
根据任务的性质来:计算密集型(CPU),IO密集型,混合型
计算密集型:加密,大数分解,正则……., 线程数适当小一点,最大推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)
IO密集型:读取文件,数据库连接,网络通讯, 线程数适当大一点,机器的Cpu核心数*2,
混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,IO密集型~计算密集型
队列的选择上,应该使用有界,无界队列可能会导致内存溢出,OOM
预定义的线程池
1.FixedThreadPool
创建固定线程数量的,适用于负载较重的服务器,使用了无界队列
2.SingleThreadExecutor
创建单个线程,需要顺序保证执行任务,不会有多个线程活动,使用了无界队列
3.CachedThreadPool
会根据需要来创建新线程的,执行很多短期异步任务的程序,使用了SynchronousQueue
4.WorkStealingPool(JDK7以后)
基于ForkJoinPool实现
5.ScheduledThreadPoolExecutor
需要定期执行周期任务,Timer不建议使用了。
newSingleThreadScheduledExecutor:只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务
newScheduledThreadPool 可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候
方法说明:
schedule:只执行一次,任务还可以延时执行
scheduleAtFixedRate:提交固定时间间隔的任务
scheduleWithFixedDelay:提交固定延时间隔执行的任务
使用方式
package com.enjoy.demo.p1.ch6;
import com.enjoy.demo.p1.ch1.class1.SleepTools;
import java.util.Random;
import java.util.concurrent.*;
/**
* @Author: BillYu
* @Description:线程池的使用
* @Date: Created in 15:19 2019-03-12.
*/
public class UserThreadPool {
/**
* 工作线程
*/
static class Worker implements Runnable{
private String taskName;
private Random r = new Random();
public Worker(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" process the task : "+taskName);
SleepTools.ms(r.nextInt(100)*5);
}
}
static class CallWorker implements Callable<String> {
private String taskName;
private Random r = new Random();
public CallWorker(String taskName) {
this.taskName = taskName;
}
public String getTaskName() {
return taskName;
}
@Override
public String call() {
System.out.println(Thread.currentThread().getName()+" process the task : "+taskName);
return Thread.currentThread().getName()+":"+r.nextInt(100)*5;
}
}
public static void main(String[] args) throws InterruptedException,ExecutionException {
ExecutorService pool = new ThreadPoolExecutor(2,4,3, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.DiscardOldestPolicy());
// ExecutorService pool = Executors.newCachedThreadPool();
for (int i=0;i<6;i++){
Worker worker = new Worker("worker_"+i);
pool.execute(worker);
}
for (int i=0;i<6;i++){
CallWorker callWorker = new CallWorker("callWorker"+i);
Future<String> result = pool.submit(callWorker);
System.out.println(result.get());
}
}
}
CompletionService
CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。
在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
使用CompletionService 可以取出队列中的先完成任务的结果
任务类WorkTask 随机休眠时长并返回
package com.enjoy.demo.p1.ch6.comps;
import java.util.Random;
import java.util.concurrent.Callable;
/**
* @Author: BillYu
* @Description:任务类 随机休眠时长并返回
* @Date: Created in 15:38 2019-03-14.
*/
public class WorkTask implements Callable<Integer> {
private String name;
public WorkTask(String name) {
this.name = name;
}
@Override
public Integer call() throws Exception {
int sleepTime = new Random().nextInt(1000);
try{
Thread.sleep(sleepTime);
}catch (InterruptedException e){
e.printStackTrace();
}
//返回给调用者的值
return sleepTime;
}
}
两种处理方式的对比
package com.enjoy.demo.p1.ch6.comps;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: BillYu
* @Description:CompletionService 可以取出队列中的先完成任务的结果
* @Date: Created in 15:42 2019-03-14.
*/
public class CompletionCase {
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;
/**
* 方法一 自己写集合实现获取线程池中任务的返回结果
* @throws Exception
*/
public void testByQueue()throws Exception{
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
//创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
//容器存放提交给线程池的任务
BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
//向里面扔任务
for (int i=0;i<TOTAL_TASK;i++){
Future<Integer> future = pool.submit(new WorkTask("ExecTasj"+i));
queue.add(future);
}
//检查线程池任务执行结果
for (int i =0;i<TOTAL_TASK;i++){
int sleptTime = queue.take().get();
System.out.println(" slept "+sleptTime+" ms ...");
count.addAndGet(sleptTime);
}
//关闭线程池
pool.shutdown();
System.out.println("-----------------take sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms");
}
/**
* 方法二,通过CompletionService来实现获取线程池中任务的返回结果
*/
public void testByCompletion()throws Exception{
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
//创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService<Integer> cService = new ExecutorCompletionService<>(pool);
//向里面扔任务
for (int i=0;i<TOTAL_TASK;i++){
cService.submit(new WorkTask("ExecTask"+i));
}
//检查线程池任务执行结果
for (int i=0;i<TOTAL_TASK;i++){
int sleptTime = cService.take().get();
System.out.println(" slept "+sleptTime+" ms ...");
count.addAndGet(sleptTime);
}
//关闭线程池
pool.shutdown();
System.out.println("-------------------takes sleep time "+count.get()
+"ms ,and spend time "+(System.currentTimeMillis()-start)+" ms");
}
public static void main(String[] args)throws Exception {
CompletionCase t = new CompletionCase();
t.testByQueue();
t.testByCompletion();
}
}
更多推荐
所有评论(0)