0. 环境

  本文所示案例为SpringBoot 2.2.2.RELEASE所搭载的maven web项目。pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.example</groupId>
    <artifactId>JavaLearn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>RELEASE</version>
        </dependency>
   </dependencies>
</project>

  spring-boot-starter-parent父pom会引入一个artifactId为spring-context的jar包。此jar包为Spring项目的核心Core包之一,搭建Spring或SpringBoot项目都会引入此jar包。

  而我们这一节的重点ThreadPoolTaskExecutor:,就在此jar包的org.springframework.scheduling.concurrent路径下

在这里插入图片描述


1. Spring自带线程池源码分析

  下面是SpringBoot 2.2.2.RELEASE在启动时的输出日志。可以看到Spring容器启动了一个name为applicationTaskExecutor的bean。
在这里插入图片描述

  定位此bean的创建过程,其代码放在spring-boot-autoconfigure模块内。创建此bean的@Configuration类如下:

//整个Configuration类有条件的加载,当Spring容器内缺失ThreadPoolTaskExecutor对象bean才生效
@ConditionalOnClass({ThreadPoolTaskExecutor.class})
@Configuration(
    proxyBeanMethods = false
)
@EnableConfigurationProperties({TaskExecutionProperties.class})
public class TaskExecutionAutoConfiguration {
    public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

    public TaskExecutionAutoConfiguration() {
    }

	//创建Builder构建类
    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
        Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
        Stream var10001 = taskExecutorCustomizers.orderedStream();
        var10001.getClass();
        builder = builder.customizers(var10001::iterator);
        builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
        return builder;
    }
    
	//向容器内注入ThreadPoolTaskExecutor实例对象bean
    @Lazy
    @Bean(
        name = {"applicationTaskExecutor", "taskExecutor"}
    )
    @ConditionalOnMissingBean({Executor.class})
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }
}

可以看出,当Spring容器中没有ThreadPoolTaskExecutorExecutor的bean实例加载时,Spring会默认创建一个ThreadPoolTaskExecutor类的实例,并注入Spring容器内。

2. 自带线程池配置项

  从上面代码看到,在创建ThreadPoolTaskExecutor时,用到了TaskExecutorBuilder的bean实例:builder,此bean正好在上面的@bean方法public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator)中有定义。
  builder使用到了TaskExecutionProperties 配置项,点击打开可见:

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

	private final Pool pool = new Pool();

	private final Shutdown shutdown = new Shutdown();

	/**
	 * Prefix to use for the names of newly created threads.
	 */
	private String threadNamePrefix = "task-";
	//省略若干setters、getters
	......

	public static class Pool {

		/**
		 * Queue capacity. An unbounded capacity does not increase the pool and therefore
		 * ignores the "max-size" property.
		 */
		private int queueCapacity = Integer.MAX_VALUE;

		/**
		 * Core number of threads.
		 */
		private int coreSize = 8;

		/**
		 * Maximum allowed number of threads. If tasks are filling up the queue, the pool
		 * can expand up to that size to accommodate the load. Ignored if the queue is
		 * unbounded.
		 */
		private int maxSize = Integer.MAX_VALUE;

		/**
		 * Whether core threads are allowed to time out. This enables dynamic growing and
		 * shrinking of the pool.
		 */
		private boolean allowCoreThreadTimeout = true;

		/**
		 * Time limit for which threads may remain idle before being terminated.
		 */
		private Duration keepAlive = Duration.ofSeconds(60);
		
		//省略若干setters、getters
		......
	}
	//省略class ShutDown
	......
}

由源码可知:Spring创建自带的线程池ThreadPoolTaskExecutor对象时,会读取配置项:spring.task.execution.pool.* 并创建线程池。

(这也就是Spring Boot可以在application.properties文件中使用这些配置,来指定线程池参数的原因)。

具体参数如下:

  • spring.task.execution.pool.core-size # 核心线程数,默认为8
  • spring.task.execution.pool.queue-capacity # 队列容量,默认为无限大
  • spring.task.execution.pool.max-size # 最大线程数,默认为无限大
  • spring.task.execution.pool.allow-core-thread-timeout #
    是否允许回收空闲的线程,默认为true
  • spring.task.execution.pool.keep-alive #空闲的线程可以保留多少秒,默认为60。如果超过这个时间没有任务调度,则线程会被回收
  • spring.task.execution.thread-name-prefix # 线程名前缀,默认为thread-

具体逻辑为:

1. 如果当前要执行的任务数超过core-size,则任务会放到队列里面等待执行,等核心线程中有任务执行完成之后,再取出队列中的任务进行调度执行。
2. 如果等待队列已经满了,再收到新任务时,则核心线程会自动扩容,最大扩展到max-size。
  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

  2. 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maxPoolSize,建新的线程来处理被添加的任务。

  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maxPoolSize,那么通过handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程
    maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

  5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

3. 自定义线程池ThreadPoolTaskExecutor

  由上面的源码可以看出,如果用户自定义ThreadPoolTaskExecutor的bean,则可以替换Spring的原生TaskExecutor类,起到自定义线程池的效果

@Configuration
public class TaskConfiguration {

	//定义自己的线程池,name为myThreadPoolTaskExecutor
    @Bean
    public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        return executor;
    }
}

重新启动Spring Boot,日志如下:
在这里插入图片描述
可以看出不再启动Spring自带的bean:applicationTaskExecutor,而是启动了我们重新定义的bean:myThreadPoolTaskExecutor

4. TaskExecutor实例

Spring自带的TaskExecutor实例很多,看基本继承结构如下:
在这里插入图片描述
常用的TaskExecutor实例用法可参考:《Spring+TaskExecutor实例

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐