今天学习了java的并发,线程池,同一时间执行一个操作。

报错:java.util.concurrent.RejectedExecutionException,排查发现是等待队列设小了,导致

拒绝策略,当队列满时,处理策略报错异常。

上代码:

package aqs;

import java.util.concurrent.*;

/**
 * @author WHM
 * 实现指定时间内做一定事情
 * @date 2021年08月06日 16:27
 */
public class CountDownLatchTest {
    public static int clientTotal = 2000;
    // 核心线程数,当线程池空闲时保留的线程数
    static int corePoolSize = 2;
    // 线程池最大线程数,线程池繁忙时能够扩容到的最大线程数
    static int maximumPoolSize = 5;
    // 线程活跃时间,当线程数大于核心数时,并且线程开始空闲,此时多余的线程经过活跃时间后自动关闭
    static int keepAliveTime = 1;
    // 线程活跃时间单位
    static TimeUnit unit = TimeUnit.SECONDS;
    static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(20); // 等待队列,ArrayBlockingQueue为有界阻塞队列,当队列满时进行阻塞
    static RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略,当队列满时,处理策略

    static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, handler);

    public static void main(String[] args) throws  Exception{

        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for(int i = 0;i < clientTotal; i++) {
            final int now = i;
            executor.execute(()->{
                try{
                    test(now);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        System.out.println("thread finish");
        executor.shutdown();
    }

    private static void test(final int threadNum) throws Exception {
        Thread.sleep(3000);
        System.out.println("thread: {}" + threadNum);
    }
}

本来以为写的还不错的代码执行报错了:

分析
通过对ThreadPoolExecutor类分析,引发java.util.concurrent.RejectedExecutionException主要有两种原因:
1. 线程池显示的调用了shutdown()之后,再向线程池提交任务的时候,如果你配置的拒绝策略是ThreadPoolExecutor.AbortPolicy的话,这个异常就被会抛出来。
2. 当你的排队策略为有界队列,并且配置的拒绝策略是ThreadPoolExecutor.AbortPolicy,当线程池的线程数量已经达到了maximumPoolSize的时候,你再向它提交任务,就会抛出ThreadPoolExecutor.AbortPolicy异常。 (我们设定了)

static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(20); 

但是

线程有2000个

所以,被拒绝啦。

对于分析的第一个我们可以做个例子:

这一点很好理解。比如说,你向一个仓库去存放货物,一开始,仓库管理员把门给你打开了,你放了第一件商品到仓库里,但是当你放好出去后,有人把仓库门关了,那你下次再来存放物品时,你就会被拒绝。示例代码如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
 
public class TextExecutor {
	public ExecutorService fixedExecutorService = Executors.newFixedThreadPool(5);
	public ExecutorService cachedExecutorService = Executors.newCachedThreadPool();
	public ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
	
	public void testExecutorException() {
		for (int i = 0; i < 10; i ++) {
			fixedExecutorService.execute(new SayHelloRunnable());
			fixedExecutorService.shutdown();
		}
	}
	
	private class SayHelloRunnable implements Runnable {
 
		@Override
		public void run() {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				System.out.println("hello world!");
			}
			
		}
	}
	
	public static void main(String[] args) {
		TextExecutor testExecutor = new TextExecutor();
		testExecutor.testExecutorException();
	}
}

解决方案
1. 不要显示的调用shutdown方法,例如Android里,只有你在Destory方法里cancel掉AsyncTask,则线程池里没有活跃线程会自己回收自己。
2. 调用线程池时,判断是否已经shutdown,通过API方法isShutDown方法判断,示例代码:
 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
 
public class TextExecutor {
	public ExecutorService fixedExecutorService = Executors.newFixedThreadPool(5);
	public ExecutorService cachedExecutorService = Executors.newCachedThreadPool();
	public ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
	
	public void testExecutorException() {
		for (int i = 0; i < 10; i ++) {
			// 增加isShutdown()判断
			if (!fixedExecutorService.isShutdown()) {
				fixedExecutorService.execute(new SayHelloRunnable());
			}
			fixedExecutorService.shutdown();
		}
	}
	
	private class SayHelloRunnable implements Runnable {
 
		@Override
		public void run() {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				System.out.println("hello world!");
			}
			
		}
	}
	
	public static void main(String[] args) {
		TextExecutor testExecutor = new TextExecutor();
		testExecutor.testExecutorException();
	}
}

第二种报错代码已经给出:

我们看如何解决:

1.提大排队队列

static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2000); 

 2.使用LinkedBlockingQueue

 static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 

问题延伸

1.不建议使用Executors创建线程

较为方便的Executors工厂方法Executors.newCachedThreadPool() (无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),但是通过源码我们可以发现最后他们均调用了ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 方法,因此我们在分析java.util.concurrent.RejectedExecutionException之前,需要深入学习一下ThreadPoolExecutor的使用。

 2.了解一下:TreadPoolExecutor

核心池和最大池的大小
TreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。当新任务在方法execute(java.lang.Runnable)中提交时,如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新的线程。如果设置的corePoolSize和maximumPoolSize相同,则创建了固定大小的线程池。如果将maximumPoolSize设置为基本的无界值(如Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。

3. BlockingQueue/LinkedBlockingQueue我们通过异同快速了解一下:

相同:

1、LinkedBlockingQueue和ArrayBlockingQueue都实现了BlockingQueue接口;

2、LinkedBlockingQueue和ArrayBlockingQueue都是可阻塞的队列

  内部都是使用ReentrantLock和Condition来保证生产和消费的同步;

  当队列为空,消费者线程被阻塞;当队列装满,生产者线程被阻塞;

使用Condition的方法来同步和通信:await()和signal()

不同:

1、由上图可以看出,他们的锁机制不同

  LinkedBlockingQueue中的锁是分离的,生产者的锁PutLock,消费者的锁takeLock

  而ArrayBlockingQueue生产者和消费者使用的是同一把锁;

2、他们的底层实现机制也不同

  LinkedBlockingQueue内部维护的是一个链表结构

在生产和消费的时候,需要创建Node对象进行插入或移除,大批量数据的系统中,其对于GC的压力会比较大

  而ArrayBlockingQueue内部维护了一个数组

 

 在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例

 3、构造时候的区别

  LinkedBlockingQueue有默认的容量大小为:Integer.MAX_VALUE,当然也可以传入指定的容量大小

ArrayBlockingQueue在初始化的时候,必须传入一个容量大小的值

  看其提供的构造方法就能知道 (ideaALT+7 可以查看类方法)

 

4、执行clear()方法

  LinkedBlockingQueue执行clear方法时,会加上两把锁

 要问什么锁,想想,肯定是生产/消费锁

 ArrayBlockingQueue是添加一把锁

 5、统计元素的个数

  LinkedBlockingQueue中使用了一个AtomicInteger对象来统计元素的个数

       毕竟有2个锁,所以保障count的原子性,需要使用AtomicInteger来控制,底层使用CAS来控制同步。  好烦是不是又要看CAS (之前有写过一篇哦)

      ArrayBlockingQueue则使用int类型来统计元素

作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐