Java中的异步编程
我们先来看一个小的项目:
一套完整可运行的 Java 示例(Spring Boot + Redis),实现:
✅ 模块 A 调用模块 B
✅ B 异步执行、写进度到 Redis
✅ A 提供接口给前端
✅ 前端轮询展示进度条
我会给你 最小但完整 的代码结构,你本地跑一遍就能看到效果。
一.项目的引入
1.整体结构
task-demo
├── controller
│ └── TaskController.java
├── service
│ └── TaskService.java
├── TaskDemoApplication.java
└── resources
└── templates
└── index.html
Redis Key 设计:
task:{taskId}:progress // 进度 0~100
task:{taskId}:status // RUNNING / DONE
2.Maven 依赖(pom.xml)
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Thymeleaf(前端页面) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
</dependencies>
3.Redis 配置(application.yml)
spring:
redis:
host: localhost
port: 6379
server:
port: 8080
4.Controller(模块 A)
@RestController
@RequestMapping("/task")
public class TaskController {
@Autowired
private TaskService taskService;
/**
* 前端调用:启动任务
*/
@PostMapping("/start")
public Map<String, String> startTask() {
String taskId = taskService.startTask();
return Map.of("taskId", taskId);
}
/**
* 前端轮询:查询进度
*/
@GetMapping("/progress")
public Map<String, Object> getProgress(@RequestParam String taskId) {
return taskService.getProgress(taskId);
}
}
5.Service(模块 B + Redis 操作)
@Service
public class TaskService {
@Autowired
private StringRedisTemplate redisTemplate;
public String startTask() {
String taskId = UUID.randomUUID().toString();
// 初始化进度
redisTemplate.opsForValue().set(
"task:" + taskId + ":progress", "0");
redisTemplate.opsForValue().set(
"task:" + taskId + ":status", "RUNNING");
// 异步执行任务
CompletableFuture.runAsync(() -> doWork(taskId));
return taskId;
}
/**
* 模拟模块 B 的耗时任务
*/
private void doWork(String taskId) {
try {
for (int i = 1; i <= 10; i++) {
Thread.sleep(1000); // 模拟慢执行
int progress = i * 10;
redisTemplate.opsForValue().set(
"task:" + taskId + ":progress",
String.valueOf(progress));
}
redisTemplate.opsForValue().set(
"task:" + taskId + ":status", "DONE");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public Map<String, Object> getProgress(String taskId) {
String progress = redisTemplate.opsForValue()
.get("task:" + taskId + ":progress");
String status = redisTemplate.opsForValue()
.get("task:" + taskId + ":status");
return Map.of(
"taskId", taskId,
"progress", Integer.parseInt(progress),
"status", status
);
}
}
6.前端页面(index.html)
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>任务进度</title>
<style>
.bar {
width: 300px;
height: 20px;
border: 1px solid #ccc;
}
.inner {
height: 100%;
background: green;
width: 0%;
}
</style>
</head>
<body>
<h2>Java 异步任务进度演示</h2>
<button onclick="startTask()">启动任务</button>
<div class="bar">
<div id="progressBar" class="inner"></div>
</div>
<p id="status"></p>
<script>
let taskId = null;
function startTask() {
fetch("/task/start", { method: "POST" })
.then(res => res.json())
.then(data => {
taskId = data.taskId;
document.getElementById("status").innerText =
"任务已启动,taskId=" + taskId;
pollProgress();
});
}
function pollProgress() {
if (!taskId) return;
fetch(`/task/progress?taskId=${taskId}`)
.then(res => res.json())
.then(data => {
document.getElementById("progressBar").style.width =
data.progress + "%";
document.getElementById("status").innerText =
`进度:${data.progress}% 状态:${data.status}`;
if (data.status !== "DONE") {
setTimeout(pollProgress, 1000);
} else {
alert("任务完成!");
}
});
}
</script>
</body>
</html>
7.运行效果
-
启动 Redis
-
启动 Spring Boot
-
浏览器访问:
http://localhost:8080 -
点击「启动任务」
-
看到进度条从 0% → 100%
二、异步编程核心概念
-
定义:异步(Asynchronous)指任务发起后,无需阻塞等待其完成,主线程可立即返回处理其他逻辑,结果通过回调、轮询或通知机制获取。目的是提高系统吞吐量和响应速度。
-
与线程的关系:异步是编程模型(思想),线程是执行载体(资源)。异步通常依赖多线程或线程池实现复用,但现代异步(如 NIO/Reactor)也可以在少量线程上处理大量并发 I/O。
-
适用场景:耗时的非核心链路操作(发邮件、记录日志、生成报表)、高并发 I/O 密集型任务、前端进度条/轮询展示(如你之前的 Redis 进度方案)。
三、基石:线程池 ThreadPoolExecutor
手动创建线程(new Thread())代价高且难管理,生产环境必须通过线程池实现异步。
1. 核心构造参数
使用 ThreadPoolExecutor手动构建(阿里规约不推荐 Executors工具类):
ExecutorService pool = new ThreadPoolExecutor(
int corePoolSize, // 核心线程数(常驻,即使空闲也不销毁)
int maximumPoolSize, // 最大线程数(队列满时扩容上限)
long keepAliveTime, TimeUnit unit, // 非核心线程空闲存活时间
BlockingQueue<Runnable> workQueue, // 任务队列(如 new ArrayBlockingQueue<>(100))
ThreadFactory threadFactory, // 线程工厂(建议自定义线程名)
RejectedExecutionHandler handler // 拒绝策略(队列满且达最大线程时的兜底)
);
2. 任务调度流程
-
核心线程未满 → 新建线程执行。
-
核心线程全忙 → 放入 工作队列 等待。
-
队列已满且未达
maximumPoolSize→ 创建非核心线程执行。 -
队列满且达最大线程 → 触发 拒绝策略(如
AbortPolicy抛异常,CallerRunsPolicy调用者线程执行)。
四、主流异步实现方式与代码
1. 基础线程与 ExecutorService
-
Runnable(无返回值):
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
fixedPool.execute(() -> System.out.println("Task running in: " + Thread.currentThread().getName()));
-
Callable + Future(有返回值,但获取结果
future.get()会阻塞):
ExecutorService pool = Executors.newCachedThreadPool();
Future<String> future = pool.submit(() -> {
Thread.sleep(1000);
return "Async Result";
});
// String result = future.get(); // 阻塞等待
2. CompletableFuture(Java 8+ 推荐)
支持链式调用、任务组合、非阻塞回调,适合复杂异步编排(如你的进度任务后台执行)。
// 无返回值异步执行(默认 ForkJoinPool)
CompletableFuture.runAsync(() -> System.out.println("Background task"));
// 有返回值 + 回调(thenAccept 是非阻塞的)
CompletableFuture.supplyAsync(() -> {
// 模拟耗时计算
return "DataLoaded";
}).thenAccept(result -> {
// 计算完成后自动回调,不阻塞主线程
System.out.println("Deal with: " + result);
}).exceptionally(ex -> {
System.out.println("Error: " + ex.getMessage());
return null;
});
3. Spring @Async(企业级开发最常用)
声明式异步,与 Spring 生态无缝整合,适合 Service 层解耦。
-
启用与配置(启动类 + 自定义线程池):
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("Async-Task-");
// 拒绝策略:由调用者线程执行,减缓提交速度
executor.setRejectedExecutionHandler(new ThreadPoolTaskExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
-
Service 层使用(对应之前的改造需求):
@Service
public class TaskService {
// 指定使用自定义线程池
@Async("taskExecutor")
public void asyncDoWork(String taskId) {
// 这里的逻辑会在异步线程中执行
// 例如:更新 Redis 进度、执行业务计算
}
}
4. 响应式编程 (Spring WebFlux / Reactor)
基于 Reactor 模型(事件循环),使用少量线程处理海量连接,适合超高并发 I/O。
Mono.fromSupplier(() -> queryDb())
.subscribeOn(Schedulers.io())
.map(data -> process(data))
.subscribe(result -> System.out.println("Done"));
五、 关键坑点与生产实践
-
@Async 同类调用失效:在同一个 Service 类中,方法 A 直接调用
this.asyncMethod()不会走代理,仍是同步执行。必须将异步方法拆分到另一个 Bean 中注入调用。 -
默认线程池风险:若不自定义且 Bean 名不为
taskExecutor,Spring 默认使用SimpleAsyncTaskExecutor,每次请求新建线程,无上限,极易导致 OOM。必须自定义ThreadPoolTaskExecutor。 -
异常吞噬:
@Async返回void时,内部异常默认只打印日志,调用方无感知。解决:① 返回CompletableFuture并在exceptionally处理;② 实现AsyncUncaughtExceptionHandler全局捕获。 -
事务上下文丢失:异步线程不共享主线程事务,
@Async方法内的数据库操作需新建事务(Propagation.REQUIRES_NEW),否则事务可能不提交。 -
上下文传递:用户登录信息(Session/TraceId)不会自动传给子线程,需借助
TransmittableThreadLocal或 Spring 的TaskDecorator传播。
六、方案选型对比
|
方式 |
适用场景 |
优点 |
缺点 |
|---|---|---|---|
|
ExecutorService |
简单并发任务 |
JDK 原生,轻量 |
Future.get() 易阻塞,无链式编排 |
|
CompletableFuture |
复杂任务编排 |
强大的链式/组合/回调,非阻塞 |
代码稍复杂,需手动传线程池 |
|
@Async |
Spring 业务解耦 |
声明式,代码简洁,与 Spring 集成好 |
需注意代理失效、事务边界问题 |
|
WebFlux |
超高并发网关/接口 |
资源占用极低,高吞吐 |
学习曲线陡,全链路需响应式驱动 |
更多推荐

所有评论(0)