Java 异步编程之 Thread、Runnable、Callable、CompletableFuture 与线程池实战
一、为什么需要异步编程?
在后端开发中,一个接口里经常会做很多事情。
比如用户下单:
1. 创建订单
2. 扣减库存
3. 扣减余额
4. 发送短信
5. 写操作日志
6. 通知第三方系统
其中:创建订单、扣库存、扣余额 属于核心流程,通常需要同步执行。
但是:发送短信、写日志、通知第三方 不一定要阻塞主流程,可以考虑异步执行。
异步编程的价值是:
提升接口响应速度
提高系统吞吐量
减少主线程等待时间
更好地利用 CPU 和 IO 资源
但是异步也不是万能的,它会带来:
异常处理复杂
事务边界复杂
线程安全问题
线程池资源管理问题
所以学习异步编程,不能只会写代码,还要理解它的底层逻辑和适用场景。
二、Java 线程基础:Thread
Thread 是 Java 中表示线程的类。
最基础的创建线程方式是继承 Thread,重写 run() 方法。
1. 继承 Thread 创建线程
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("子线程执行:" + Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
System.out.println("主线程执行:" + Thread.currentThread().getName());
}
}
可能输出:
主线程执行:main
子线程执行:Thread-0
线程执行顺序不是固定的,具体由操作系统调度决定。
2. start() 和 run() 的区别
thread.start(); 表示真正启动一个新线程。
thread.run(); 只是普通方法调用,不会创建新线程。
3. 为什么不推荐直接继承 Thread?
虽然继承 Thread 可以创建线程,但实际开发中不推荐。
原因有三个:
1. Java 是单继承,继承 Thread 后就不能继承其他类。
2. 线程和任务耦合,不利于代码复用。
3. 生产环境更推荐使用线程池,而不是频繁 new Thread。
三、任务模型一:Runnable
Runnable 是一个任务接口,表示一个没有返回值的任务。
@FunctionalInterface
public interface Runnable {
void run();
}
它只有一个 run() 方法。
1. 实现 Runnable
public class SendSmsTask implements Runnable {
@Override
public void run() {
System.out.println("发送短信:" + Thread.currentThread().getName());
}
public static void main(String[] args) {
SendSmsTask task = new SendSmsTask();
Thread thread = new Thread(task);
thread.start();
}
}
这里的结构比继承 Thread 更清晰:
SendSmsTask 负责描述任务
Thread 负责执行任务。
2. 匿名内部类写法
public class RunnableDemo {
public static void main(String[] args) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("异步写日志:" + Thread.currentThread().getName());
}
};
new Thread(task).start();
}
}
3. Lambda 写法
因为 Runnable 是函数式接口,所以可以用 Lambda 简化:
public class RunnableLambdaDemo {
public static void main(String[] args) {
new Thread(() -> {
System.out.println("异步任务执行:" + Thread.currentThread().getName());
}).start();
}
}
Lambda 只是简化写法,本质还是实现 Runnable 的 run() 方法。
4. Runnable 的特点
没有返回值
不能直接抛出受检异常
可以交给 Thread 执行
可以交给线程池执行
四、任务模型二:Callable
Runnable 没有返回值,如果我们希望异步任务执行完成后返回一个结果,就可以使用 Callable。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable 有两个特点:
有返回值
可以抛出异常
1. Callable 示例
import java.util.concurrent.Callable;
public class QueryUserTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("查询用户信息:" + Thread.currentThread().getName());
Thread.sleep(1000);
return "用户信息:张三";
}
}
但是注意,Thread 不能直接执行 Callable。
下面这种写法是错误的:
FutureTask<String> task = new FutureTask(new QueryUserTask());
Thread thread = new Thread(task); // 编译错误
原因是 thread 的构造方法接收 Runnable,不接收 Callable。
所以 Callable 如果想交给 Thread 执行,需要借助 FutureTask。
FutureTask<String> futureTask = new QueryUserTask<>();
Thread thread = new Thread(futureTask);
thread.start();
六、FutureTask:连接 Callable 和 Thread 的桥梁
FutureTask 很重要。
它既是 Runnable,又是 Future。
源码关系大概是:
public class FutureTask<V> implements RunnableFuture<V> {
}
而:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
所以:
FutureTask 可以被 Thread 执行。
FutureTask 也可以通过 get() 获取结果。
1. Callable + FutureTask + Thread
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static void main(String[] args) throws Exception {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("子线程开始查询用户:" + Thread.currentThread().getName());
Thread.sleep(2000);
return "用户信息:张三";
}
};
FutureTask<String> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("主线程继续执行:" + Thread.currentThread().getName());
String result = futureTask.get(); //同步阻塞,等待获取异步结果
System.out.println("获取异步结果:" + result);
}
}
2. FutureTask 的缺点
FutureTask 适合简单异步任务,但缺点也明显:
get() 会阻塞
不方便做回调
不方便组合多个任务
异常处理不够优雅
任务编排能力弱
七、CompletableFuture:更强大的异步编排工具
FutureTask 能解决简单异步任务,但是它不适合复杂任务编排。
例如商品详情页需要同时查:
商品信息
库存信息
价格信息
优惠券信息
评价信息
如果串行查:
商品 200ms
库存 300ms
价格 200ms
优惠券 400ms
评价 300ms
总耗时约 1400ms
如果并行查:
多个任务同时执行
总耗时约等于最慢的任务,也就是 400ms 左右
这类场景就适合使用 CompletableFuture。
1. CompletableFuture 的定位
CompletableFuture 可以理解为:
Future 的增强版 + 异步任务编排工具。
它支持:
异步执行
任务回调
任务串联
任务合并
多个任务并行
异常兜底
八、CompletableFuture 常用方法详解
1. runAsync:执行无返回值任务
CompletableFuture<Void> future =
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("异步发送短信:" + Thread.currentThread().getName());
}
}, executor);
future.join();
适合:
发送短信
写日志
清理缓存
异步通知
因为没有返回值,所以类型是:CompletableFuture<Void>
2. supplyAsync:执行有返回值任务
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("查询用户:" + Thread.currentThread().getName());
return "用户信息:张三";
}, executor);
String result = future.join();
System.out.println(result);
supplyAsync 适合有返回结果的任务。
3. thenApply:接收结果,并返回新结果
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
return "用户1001";
}, executor).thenApply(userInfo -> {
return userInfo + " 的订单信息";
});
String result = future.join();
System.out.println(result);
输出:用户1001 的订单信息
特点:接收上一步结果 返回一个新结果
4. thenAccept:接收结果,不返回新结果
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(() -> {
return "订单创建成功";
}, executor).thenAccept(result -> {
System.out.println("发送通知:" + result);
});
future.join();
特点:接收上一步结果 没有返回值
5. thenRun:不接收结果,也不返回结果
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(() -> {
return "订单结果";
}, executor).thenRun(() -> {
System.out.println("订单流程结束,记录日志");
});
future.join();
特点:
不关心上一步结果
只是在上一步完成后继续执行
6. thenApply 和 thenApplyAsync 的区别
这是重点。
thenApply() 不一定开启新线程,通常由上一步任务完成的线程继续执行。
thenApplyAsync() 会把下一步重新提交到线程池执行。
示例:
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("第一步:" + Thread.currentThread().getName());
return "用户信息";
}, executor).thenApplyAsync(result -> {
System.out.println("第二步:" + Thread.currentThread().getName());
return result + " + 订单信息";
}, executor);
选择建议:
7. thenCombine:合并两个任务结果
如果两个任务互不依赖,可以并行执行,最后合并结果。
CompletableFuture<UserDTO> userFuture =
CompletableFuture.supplyAsync(() -> {
return userService.queryUserInfo(1001L);
}, executor);
CompletableFuture<OrderDTO> orderFuture =
CompletableFuture.supplyAsync(() -> {
return orderService.queryOrderInfo(1001L);
}, executor);
CompletableFuture<UserOrderDTO> resultFuture =
userFuture.thenCombine(orderFuture, (UserDTO user, OrderDTO order) -> {
UserOrderDTO dto = new UserOrderDTO();
dto.setUser(user);
dto.setOrder(order);
return dto;
});
UserOrderDTO result = resultFuture.join();
thenCombine 的意思是:
等两个任务都完成后,拿到两个任务结果,再合并成一个新结果。
8. allOf:等待多个任务全部完成
CompletableFuture<UserDTO> userFuture =
CompletableFuture.supplyAsync(() -> userService.queryUserInfo(1001L), executor);
CompletableFuture<OrderDTO> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.queryOrderInfo(1001L), executor);
CompletableFuture<CouponDTO> couponFuture =
CompletableFuture.supplyAsync(() -> couponService.queryCoupon(1001L), executor);
CompletableFuture.allOf(userFuture, orderFuture, couponFuture).join();
UserDTO user = userFuture.join();
OrderDTO order = orderFuture.join();
CouponDTO coupon = couponFuture.join();
注意:
CompletableFuture.allOf(...)
返回的是:
CompletableFuture<Void>
每个任务结果还需要分别:
userFuture.join();
orderFuture.join();
couponFuture.join();
9. anyOf:任意一个任务完成即可
CompletableFuture<String> future1 =
CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "服务A返回结果";
}, executor);
CompletableFuture<String> future2 =
CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "服务B返回结果";
}, executor);
Object result = CompletableFuture.anyOf(future1, future2).join();
System.out.println(result);
输出大概率是: 服务B返回结果
适合场景:
多个服务查同一份数据
谁先返回就用谁
容灾查询
九、CompletableFuture 异常处理
异步任务也可能失败。
如果不处理异常:
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "success";
}, executor);
String result = future.join();
join() 会抛出:CompletionException
1. exceptionally:异常兜底
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "success";
}, executor).exceptionally(ex -> {
System.out.println("异步任务异常:" + ex.getMessage());
return "默认结果";
});
String result = future.join();
System.out.println(result);
输出: 默认结果
exceptionally 只在异常时执行。
2. handle:成功失败都处理
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
return "正常结果";
}, executor).handle((result, ex) -> {
if (ex != null) {
return "失败兜底结果";
}
return result;
});
String result = future.join();
System.out.println(result);
handle 的特点:
成功也执行 失败也执行 可以返回新的结果
3. whenComplete:只做观察,不改变结果
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
return "订单创建成功";
}, executor).whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("任务失败:" + ex.getMessage());
} else {
System.out.println("任务成功:" + result);
}
});
String result = future.join();
whenComplete 适合:
打印日志
监控埋点
记录任务状态
十、线程池:生产环境不建议直接 new Thread
前面的代码里,我们经常写:new Thread(task).start();
但是生产环境不建议频繁这么写。
原因是:
线程创建销毁有成本
线程数量不可控
高并发下可能创建大量线程,压垮服务器
不方便统一管理线程
所以实际开发中更推荐使用线程池。
1. ExecutorService 执行 Runnable
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorRunnableDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("执行 Runnable 任务:" + Thread.currentThread().getName());
}
});
pool.shutdown();
}
}
execute() 用于执行没有返回值的任务。
2. ExecutorService 执行 Callable
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorCallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(3);
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("执行 Callable 任务:" + Thread.currentThread().getName());
Thread.sleep(1000);
return "订单查询结果";
}
});
String result = future.get();
System.out.println("获取结果:" + result);
pool.shutdown();
}
}
这里:pool.submit(callable);
返回的是: Future<String>
然后通过:future.get(); 获取结果。
3. execute 和 submit 的区别

异常表现也不同:
execute() 执行任务异常,通常会直接打印异常。
submit() 执行任务异常,异常会封装进 Future,调用 get() 时才抛出。
4. ThreadPoolExecutor:推荐的线程池创建方式
虽然下面写法简单:
ExecutorService pool = Executors.newFixedThreadPool(10);
但是生产环境更推荐手动创建 ThreadPoolExecutor。
原因是 Executors 一些方法底层使用无界队列,任务过多时可能造成内存压力。
推荐写法
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolConfigDemo {
public static ThreadPoolExecutor buildExecutor() {
return new ThreadPoolExecutor(
10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
核心参数说明
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);

线程池执行流程
假设:
corePoolSize = 10
maximumPoolSize = 20
queue = 100
执行流程:

常见拒绝策略

十一、CompletableFuture + 线程池实战:商品详情页聚合查
1. 业务场景
商品详情页需要查询:
商品基本信息
库存信息
价格信息
优惠券信息
评价信息
这些查询之间没有强依赖,可以并行执行。
2. 线程池配置
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncExecutorConfig {
public static ThreadPoolExecutor buildExecutor() {
return new ThreadPoolExecutor(
10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
3. DTO 示例
public class ProductDetailDTO {
private ProductDTO product;
private StockDTO stock;
private PriceDTO price;
private CouponDTO coupon;
private CommentDTO comment;
public ProductDTO getProduct() {
return product;
}
public void setProduct(ProductDTO product) {
this.product = product;
}
public StockDTO getStock() {
return stock;
}
public void setStock(StockDTO stock) {
this.stock = stock;
}
public PriceDTO getPrice() {
return price;
}
public void setPrice(PriceDTO price) {
this.price = price;
}
public CouponDTO getCoupon() {
return coupon;
}
public void setCoupon(CouponDTO coupon) {
this.coupon = coupon;
}
public CommentDTO getComment() {
return comment;
}
public void setComment(CommentDTO comment) {
this.comment = comment;
}
}
简单 DTO:
public class ProductDTO {
private Long productId;
private String productName;
public ProductDTO(Long productId, String productName) {
this.productId = productId;
this.productName = productName;
}
}
public class StockDTO {
private Integer stock;
public StockDTO(Integer stock) {
this.stock = stock;
}
}
public class StockDTO {
private Integer stock;
public StockDTO(Integer stock) {
this.stock = stock;
}
}
public class CouponDTO {
private String couponName;
public CouponDTO(String couponName) {
this.couponName = couponName;
}
}
public class CommentDTO {
private Integer commentCount;
public CommentDTO(Integer commentCount) {
this.commentCount = commentCount;
}
}
4. 模拟 Service
public class ProductService {
public ProductDTO queryProduct(Long productId) {
sleep(200);
return new ProductDTO(productId, "iPhone 15");
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class StockService {
public StockDTO queryStock(Long productId) {
sleep(300);
return new StockDTO(100);
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class PriceService {
public PriceDTO queryPrice(Long productId) {
sleep(200);
return new PriceDTO(5999);
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class CouponService {
public CouponDTO queryCoupon(Long productId) {
sleep(400);
return new CouponDTO("满5000减300");
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class CouponService {
public CouponDTO queryCoupon(Long productId) {
sleep(400);
return new CouponDTO("满5000减300");
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5. 聚合查询实现
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
public class ProductDetailService {
private final ProductService productService = new ProductService();
private final StockService stockService = new StockService();
private final PriceService priceService = new PriceService();
private final CouponService couponService = new CouponService();
private final CommentService commentService = new CommentService();
private final ThreadPoolExecutor executor = AsyncExecutorConfig.buildExecutor();
public ProductDetailDTO queryProductDetail(Long productId) {
CompletableFuture<ProductDTO> productFuture =
CompletableFuture.supplyAsync(() -> {
return productService.queryProduct(productId);
}, executor);
CompletableFuture<StockDTO> stockFuture =
CompletableFuture.supplyAsync(() -> {
return stockService.queryStock(productId);
}, executor);
CompletableFuture<PriceDTO> priceFuture =
CompletableFuture.supplyAsync(() -> {
return priceService.queryPrice(productId);
}, executor);
CompletableFuture<CouponDTO> couponFuture =
CompletableFuture.supplyAsync(() -> {
return couponService.queryCoupon(productId);
}, executor).exceptionally(ex -> {
System.out.println("优惠券查询失败:" + ex.getMessage());
return null;
});
CompletableFuture<CommentDTO> commentFuture =
CompletableFuture.supplyAsync(() -> {
return commentService.queryComment(productId);
}, executor).exceptionally(ex -> {
System.out.println("评价查询失败:" + ex.getMessage());
return new CommentDTO(0);
});
CompletableFuture.allOf(
productFuture,
stockFuture,
priceFuture,
couponFuture,
commentFuture
).join();
ProductDetailDTO detailDTO = new ProductDetailDTO();
detailDTO.setProduct(productFuture.join());
detailDTO.setStock(stockFuture.join());
detailDTO.setPrice(priceFuture.join());
detailDTO.setCoupon(couponFuture.join());
detailDTO.setComment(commentFuture.join());
return detailDTO;
}
}
6. 测试代码
public class ProductDetailTest {
public static void main(String[] args) {
ProductDetailService productDetailService = new ProductDetailService();
long start = System.currentTimeMillis();
ProductDetailDTO detailDTO = productDetailService.queryProductDetail(1001L);
long end = System.currentTimeMillis();
System.out.println("查询完成,耗时:" + (end - start) + "ms");
System.out.println(detailDTO);
}
}
串行查询大约需要:200 + 300 + 200 + 400 + 300 = 1400ms
并行查询大约需要:取最慢任务耗时,大约 400ms
这就是 CompletableFuture + 线程池 的典型应用场景。
十二、异步编程中的事务问题
异步编程一定要注意事务。
错误示例:
@Transactional
public void createOrder(Long userId, Long productId) {
orderMapper.insertOrder(userId, productId);
CompletableFuture.runAsync(() -> {
stockMapper.deductStock(productId);
}, executor);
accountMapper.deductBalance(userId);
}
很多人以为:
createOrder() 方法加了 @Transactional,
所以里面所有数据库操作都在一个事务里。
但这是错误的。
因为:
Spring 事务默认绑定当前线程。
CompletableFuture 开启的是新线程。
新线程不会自动加入外层 @Transactional 事务。
也就是说:stockMapper.deductStock(productId); 通常不在外层事务中
如果外层事务回滚,异步线程里的库存扣减不一定回滚,可能造成数据不一致。
核心写流程建议同步事务执行
@Transactional
public void createOrder(Long userId, Long productId) {
orderMapper.insertOrder(userId, productId);
stockMapper.deductStock(productId);
accountMapper.deductBalance(userId);
}
如果是跨服务场景,要考虑:
可靠消息
本地消息表
TCC
Saga
补偿机制
幂等控制
最终一致性
十三、异步编程最佳实践
1. 不要所有任务都异步
适合异步的任务:
耗时任务
非核心任务
可以延迟完成的任务
失败后可以补偿的任务
多个互不依赖的查询任务
2. 一定要使用自定义线程池
不要大量使用:CompletableFuture.supplyAsync(() -> queryData());
因为默认使用的是:ForkJoinPool.commonPool()
这是公共线程池,不适合承载大量业务任务。
推荐:
ompletableFuture.supplyAsync(() -> queryData(), executor);
3. 一定要处理异常
不要写成:
CompletableFuture.supplyAsync(() -> {
return remoteService.query();
}, executor);
推荐加异常处理:
CompletableFuture.supplyAsync(() -> {
return remoteService.query();
}, executor).exceptionally(ex -> {
System.out.println("查询失败:" + ex.getMessage());
return defaultValue;
});
4. join 也会阻塞
虽然 CompletableFuture 是异步的,但是 future.join();
仍然会阻塞当前线程。
如果任务还没有完成,当前线程会等待。
所以: 异步的是任务执行。 阻塞的是最终等待结果的 join。
5. 注意线程池隔离
不同业务最好使用不同线程池。
比如:
订单线程池
短信线程池
报表线程池
文件处理线程池
不要所有异步任务共用一个线程池。
否则某个慢任务堆积,可能拖垮其他业务。
十四、核心知识点总结


更多推荐
所有评论(0)