一、为什么需要异步编程?

在后端开发中,一个接口里经常会做很多事情。

比如用户下单:

1. 创建订单
2. 扣减库存
3. 扣减余额
4. 发送短信
5. 写操作日志
6. 通知第三方系统

其中:创建订单、扣库存、扣余额 属于核心流程,通常需要同步执行。

但是:发送短信、写日志、通知第三方 不一定要阻塞主流程,可以考虑异步执行。

异步编程的价值是:

提升接口响应速度
提高系统吞吐量
减少主线程等待时间
更好地利用 CPU 和 IO 资源

但是异步也不是万能的,它会带来:

异常处理复杂
事务边界复杂
线程安全问题
线程池资源管理问题

所以学习异步编程,不能只会写代码,还要理解它的底层逻辑和适用场景。

二、Java 线程基础:Thread

Thread 是 Java 中表示线程的类。

最基础的创建线程方式是继承 Thread,重写 run() 方法。

1. 继承 Thread 创建线程

public class MyThread extends Thread {

    @Override
    public void run() {
        System.out.println("子线程执行:" + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        MyThread thread = new MyThread();

        thread.start();

        System.out.println("主线程执行:" + Thread.currentThread().getName());
    }
}

可能输出:

主线程执行:main
子线程执行:Thread-0

线程执行顺序不是固定的,具体由操作系统调度决定。

2. start() 和 run() 的区别

thread.start(); 表示真正启动一个新线程。

thread.run();  只是普通方法调用,不会创建新线程。

3. 为什么不推荐直接继承 Thread?

虽然继承 Thread 可以创建线程,但实际开发中不推荐。

原因有三个:

1. Java 是单继承,继承 Thread 后就不能继承其他类。
2. 线程和任务耦合,不利于代码复用。
3. 生产环境更推荐使用线程池,而不是频繁 new Thread。

三、任务模型一:Runnable

Runnable 是一个任务接口,表示一个没有返回值的任务。

@FunctionalInterface
public interface Runnable {
    void run();
}

它只有一个 run() 方法。

1. 实现 Runnable

public class SendSmsTask implements Runnable {

    @Override
    public void run() {
        System.out.println("发送短信:" + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        SendSmsTask task = new SendSmsTask();

        Thread thread = new Thread(task);

        thread.start();
    }
}

这里的结构比继承 Thread 更清晰:

SendSmsTask 负责描述任务
Thread 负责执行任务。

2. 匿名内部类写法

public class RunnableDemo {

    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println("异步写日志:" + Thread.currentThread().getName());
            }
        };

        new Thread(task).start();
    }
}

3. Lambda 写法

因为 Runnable 是函数式接口,所以可以用 Lambda 简化:

public class RunnableLambdaDemo {

    public static void main(String[] args) {
        new Thread(() -> {
            System.out.println("异步任务执行:" + Thread.currentThread().getName());
        }).start();
    }
}

Lambda 只是简化写法,本质还是实现 Runnablerun() 方法。

4. Runnable 的特点

没有返回值
不能直接抛出受检异常
可以交给 Thread 执行
可以交给线程池执行

四、任务模型二:Callable

Runnable 没有返回值,如果我们希望异步任务执行完成后返回一个结果,就可以使用 Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable 有两个特点:

有返回值
可以抛出异常

1. Callable 示例

import java.util.concurrent.Callable;

public class QueryUserTask implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("查询用户信息:" + Thread.currentThread().getName());
        Thread.sleep(1000);
        return "用户信息:张三";
    }
}

但是注意,Thread 不能直接执行 Callable

下面这种写法是错误的:

FutureTask<String> task = new FutureTask(new QueryUserTask());

Thread thread = new Thread(task); // 编译错误

原因是 thread 的构造方法接收 Runnable,不接收 Callable。

所以 Callable 如果想交给 Thread 执行,需要借助 FutureTask

FutureTask<String> futureTask = new QueryUserTask<>();
Thread thread = new Thread(futureTask);

thread.start();

六、FutureTask:连接 Callable 和 Thread 的桥梁

FutureTask 很重要。

它既是 Runnable,又是 Future

源码关系大概是:

public class FutureTask<V> implements RunnableFuture<V> {
}

而:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

所以:

FutureTask 可以被 Thread 执行。
FutureTask 也可以通过 get() 获取结果。

1. Callable + FutureTask + Thread

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

    public static void main(String[] args) throws Exception {

        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("子线程开始查询用户:" + Thread.currentThread().getName());
                Thread.sleep(2000);
                return "用户信息:张三";
            }
        };

        FutureTask<String> futureTask = new FutureTask<>(callable);

        Thread thread = new Thread(futureTask);

        thread.start();

        System.out.println("主线程继续执行:" + Thread.currentThread().getName());

        String result = futureTask.get(); //同步阻塞,等待获取异步结果

        System.out.println("获取异步结果:" + result);
    }
}

2. FutureTask 的缺点

FutureTask 适合简单异步任务,但缺点也明显:

get() 会阻塞
不方便做回调
不方便组合多个任务
异常处理不够优雅
任务编排能力弱

七、CompletableFuture:更强大的异步编排工具

FutureTask 能解决简单异步任务,但是它不适合复杂任务编排。

例如商品详情页需要同时查:

商品信息
库存信息
价格信息
优惠券信息
评价信息

如果串行查:

商品 200ms
库存 300ms
价格 200ms
优惠券 400ms
评价 300ms

总耗时约 1400ms

如果并行查:

多个任务同时执行
总耗时约等于最慢的任务,也就是 400ms 左右

这类场景就适合使用 CompletableFuture

1. CompletableFuture 的定位

CompletableFuture 可以理解为:

Future 的增强版 + 异步任务编排工具。

它支持:

异步执行
任务回调
任务串联
任务合并
多个任务并行
异常兜底

八、CompletableFuture 常用方法详解

1. runAsync:执行无返回值任务

CompletableFuture<Void> future =
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println("异步发送短信:" + Thread.currentThread().getName());
            }
        }, executor);

future.join();

适合:

发送短信
写日志
清理缓存
异步通知

因为没有返回值,所以类型是:CompletableFuture<Void>

2. supplyAsync:执行有返回值任务

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            System.out.println("查询用户:" + Thread.currentThread().getName());
            return "用户信息:张三";
        }, executor);

String result = future.join();

System.out.println(result);

supplyAsync 适合有返回结果的任务。

3. thenApply:接收结果,并返回新结果

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            return "用户1001";
        }, executor).thenApply(userInfo -> {
            return userInfo + " 的订单信息";
        });

String result = future.join();

System.out.println(result);

输出:用户1001 的订单信息

特点:接收上一步结果  返回一个新结果

4. thenAccept:接收结果,不返回新结果

CompletableFuture<Void> future =
        CompletableFuture.supplyAsync(() -> {
            return "订单创建成功";
        }, executor).thenAccept(result -> {
            System.out.println("发送通知:" + result);
        });

future.join();

特点:接收上一步结果 没有返回值

5. thenRun:不接收结果,也不返回结果

CompletableFuture<Void> future =
        CompletableFuture.supplyAsync(() -> {
            return "订单结果";
        }, executor).thenRun(() -> {
            System.out.println("订单流程结束,记录日志");
        });

future.join();

特点:

不关心上一步结果
只是在上一步完成后继续执行

6. thenApply 和 thenApplyAsync 的区别

这是重点。

thenApply()  不一定开启新线程,通常由上一步任务完成的线程继续执行。

thenApplyAsync() 会把下一步重新提交到线程池执行。

示例:

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            System.out.println("第一步:" + Thread.currentThread().getName());
            return "用户信息";
        }, executor).thenApplyAsync(result -> {
            System.out.println("第二步:" + Thread.currentThread().getName());
            return result + " + 订单信息";
        }, executor);

选择建议:

7. thenCombine:合并两个任务结果

如果两个任务互不依赖,可以并行执行,最后合并结果。

CompletableFuture<UserDTO> userFuture =
        CompletableFuture.supplyAsync(() -> {
            return userService.queryUserInfo(1001L);
        }, executor);

CompletableFuture<OrderDTO> orderFuture =
        CompletableFuture.supplyAsync(() -> {
            return orderService.queryOrderInfo(1001L);
        }, executor);

CompletableFuture<UserOrderDTO> resultFuture =
        userFuture.thenCombine(orderFuture, (UserDTO user, OrderDTO order) -> {
            UserOrderDTO dto = new UserOrderDTO();
            dto.setUser(user);
            dto.setOrder(order);
            return dto;
        });
UserOrderDTO result = resultFuture.join();

thenCombine 的意思是:

等两个任务都完成后,拿到两个任务结果,再合并成一个新结果。

8. allOf:等待多个任务全部完成

CompletableFuture<UserDTO> userFuture =
        CompletableFuture.supplyAsync(() -> userService.queryUserInfo(1001L), executor);

CompletableFuture<OrderDTO> orderFuture =
        CompletableFuture.supplyAsync(() -> orderService.queryOrderInfo(1001L), executor);

CompletableFuture<CouponDTO> couponFuture =
        CompletableFuture.supplyAsync(() -> couponService.queryCoupon(1001L), executor);

CompletableFuture.allOf(userFuture, orderFuture, couponFuture).join();

UserDTO user = userFuture.join();
OrderDTO order = orderFuture.join();
CouponDTO coupon = couponFuture.join();

注意:

CompletableFuture.allOf(...)

返回的是:

CompletableFuture<Void>

每个任务结果还需要分别:

userFuture.join();
orderFuture.join();
couponFuture.join();

9. anyOf:任意一个任务完成即可

CompletableFuture<String> future1 =
        CompletableFuture.supplyAsync(() -> {
            sleep(3000);
            return "服务A返回结果";
        }, executor);

CompletableFuture<String> future2 =
        CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "服务B返回结果";
        }, executor);

Object result = CompletableFuture.anyOf(future1, future2).join();

System.out.println(result);

输出大概率是: 服务B返回结果

适合场景:

多个服务查同一份数据
谁先返回就用谁
容灾查询

九、CompletableFuture 异常处理

异步任务也可能失败。

如果不处理异常:

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "success";
        }, executor);

String result = future.join();

join() 会抛出:CompletionException

1. exceptionally:异常兜底

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return "success";
        }, executor).exceptionally(ex -> {
            System.out.println("异步任务异常:" + ex.getMessage());
            return "默认结果";
        });

String result = future.join();

System.out.println(result);

输出: 默认结果 

exceptionally 只在异常时执行。

2. handle:成功失败都处理

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            return "正常结果";
        }, executor).handle((result, ex) -> {
            if (ex != null) {
                return "失败兜底结果";
            }
            return result;
        });

String result = future.join();

System.out.println(result);

handle 的特点:

成功也执行
失败也执行
可以返回新的结果
3. whenComplete:只做观察,不改变结果
CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            return "订单创建成功";
        }, executor).whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("任务失败:" + ex.getMessage());
            } else {
                System.out.println("任务成功:" + result);
            }
        });

String result = future.join();

whenComplete 适合:

打印日志
监控埋点
记录任务状态

十、线程池:生产环境不建议直接 new Thread

前面的代码里,我们经常写:new Thread(task).start();

但是生产环境不建议频繁这么写。

原因是:

线程创建销毁有成本
线程数量不可控
高并发下可能创建大量线程,压垮服务器
不方便统一管理线程

所以实际开发中更推荐使用线程池。

1. ExecutorService 执行 Runnable

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorRunnableDemo {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        pool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("执行 Runnable 任务:" + Thread.currentThread().getName());
            }
        });

        pool.shutdown();
    }
}

execute() 用于执行没有返回值的任务。

2. ExecutorService 执行 Callable

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorCallableDemo {

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        Future<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("执行 Callable 任务:" + Thread.currentThread().getName());
                Thread.sleep(1000);
                return "订单查询结果";
            }
        });

        String result = future.get();

        System.out.println("获取结果:" + result);

        pool.shutdown();
    }
}

这里:pool.submit(callable);

返回的是: Future<String>

然后通过:future.get(); 获取结果。

3. execute 和 submit 的区别

异常表现也不同:

execute() 执行任务异常,通常会直接打印异常。
submit() 执行任务异常,异常会封装进 Future,调用 get() 时才抛出。

4. ThreadPoolExecutor:推荐的线程池创建方式

虽然下面写法简单:

ExecutorService pool = Executors.newFixedThreadPool(10);

但是生产环境更推荐手动创建 ThreadPoolExecutor

原因是 Executors 一些方法底层使用无界队列,任务过多时可能造成内存压力。

推荐写法

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolConfigDemo {

    public static ThreadPoolExecutor buildExecutor() {
        return new ThreadPoolExecutor(
                10,
                20,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

 核心参数说明

new ThreadPoolExecutor(
        corePoolSize,
        maximumPoolSize,
        keepAliveTime,
        unit,
        workQueue,
        threadFactory,
        handler
);

线程池执行流程

假设:

corePoolSize = 10
maximumPoolSize = 20
queue = 100

执行流程:

常见拒绝策略

十一、CompletableFuture + 线程池实战:商品详情页聚合查

1. 业务场景

商品详情页需要查询:

商品基本信息
库存信息
价格信息
优惠券信息
评价信息

这些查询之间没有强依赖,可以并行执行。

2. 线程池配置

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AsyncExecutorConfig {

    public static ThreadPoolExecutor buildExecutor() {
        return new ThreadPoolExecutor(
                10,
                20,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(200),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

3. DTO 示例

public class ProductDetailDTO {

    private ProductDTO product;
    private StockDTO stock;
    private PriceDTO price;
    private CouponDTO coupon;
    private CommentDTO comment;

    public ProductDTO getProduct() {
        return product;
    }

    public void setProduct(ProductDTO product) {
        this.product = product;
    }

    public StockDTO getStock() {
        return stock;
    }

    public void setStock(StockDTO stock) {
        this.stock = stock;
    }

    public PriceDTO getPrice() {
        return price;
    }

    public void setPrice(PriceDTO price) {
        this.price = price;
    }

    public CouponDTO getCoupon() {
        return coupon;
    }

    public void setCoupon(CouponDTO coupon) {
        this.coupon = coupon;
    }

    public CommentDTO getComment() {
        return comment;
    }

    public void setComment(CommentDTO comment) {
        this.comment = comment;
    }
}

简单 DTO:

public class ProductDTO {
    private Long productId;
    private String productName;

    public ProductDTO(Long productId, String productName) {
        this.productId = productId;
        this.productName = productName;
    }
}
public class StockDTO {
    private Integer stock;

    public StockDTO(Integer stock) {
        this.stock = stock;
    }
}
public class StockDTO {
    private Integer stock;

    public StockDTO(Integer stock) {
        this.stock = stock;
    }
}
public class CouponDTO {
    private String couponName;

    public CouponDTO(String couponName) {
        this.couponName = couponName;
    }
}
public class CommentDTO {
    private Integer commentCount;

    public CommentDTO(Integer commentCount) {
        this.commentCount = commentCount;
    }
}

4. 模拟 Service

public class ProductService {

    public ProductDTO queryProduct(Long productId) {
        sleep(200);
        return new ProductDTO(productId, "iPhone 15");
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class StockService {

    public StockDTO queryStock(Long productId) {
        sleep(300);
        return new StockDTO(100);
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class PriceService {

    public PriceDTO queryPrice(Long productId) {
        sleep(200);
        return new PriceDTO(5999);
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class CouponService {

    public CouponDTO queryCoupon(Long productId) {
        sleep(400);
        return new CouponDTO("满5000减300");
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class CouponService {

    public CouponDTO queryCoupon(Long productId) {
        sleep(400);
        return new CouponDTO("满5000减300");
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5. 聚合查询实现

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

public class ProductDetailService {

    private final ProductService productService = new ProductService();
    private final StockService stockService = new StockService();
    private final PriceService priceService = new PriceService();
    private final CouponService couponService = new CouponService();
    private final CommentService commentService = new CommentService();

    private final ThreadPoolExecutor executor = AsyncExecutorConfig.buildExecutor();

    public ProductDetailDTO queryProductDetail(Long productId) {

        CompletableFuture<ProductDTO> productFuture =
                CompletableFuture.supplyAsync(() -> {
                    return productService.queryProduct(productId);
                }, executor);

        CompletableFuture<StockDTO> stockFuture =
                CompletableFuture.supplyAsync(() -> {
                    return stockService.queryStock(productId);
                }, executor);

        CompletableFuture<PriceDTO> priceFuture =
                CompletableFuture.supplyAsync(() -> {
                    return priceService.queryPrice(productId);
                }, executor);

        CompletableFuture<CouponDTO> couponFuture =
                CompletableFuture.supplyAsync(() -> {
                    return couponService.queryCoupon(productId);
                }, executor).exceptionally(ex -> {
                    System.out.println("优惠券查询失败:" + ex.getMessage());
                    return null;
                });

        CompletableFuture<CommentDTO> commentFuture =
                CompletableFuture.supplyAsync(() -> {
                    return commentService.queryComment(productId);
                }, executor).exceptionally(ex -> {
                    System.out.println("评价查询失败:" + ex.getMessage());
                    return new CommentDTO(0);
                });

        CompletableFuture.allOf(
                productFuture,
                stockFuture,
                priceFuture,
                couponFuture,
                commentFuture
        ).join();

        ProductDetailDTO detailDTO = new ProductDetailDTO();
        detailDTO.setProduct(productFuture.join());
        detailDTO.setStock(stockFuture.join());
        detailDTO.setPrice(priceFuture.join());
        detailDTO.setCoupon(couponFuture.join());
        detailDTO.setComment(commentFuture.join());

        return detailDTO;
    }
}

6. 测试代码

public class ProductDetailTest {

    public static void main(String[] args) {
        ProductDetailService productDetailService = new ProductDetailService();

        long start = System.currentTimeMillis();

        ProductDetailDTO detailDTO = productDetailService.queryProductDetail(1001L);

        long end = System.currentTimeMillis();

        System.out.println("查询完成,耗时:" + (end - start) + "ms");
        System.out.println(detailDTO);
    }
}

串行查询大约需要:200 + 300 + 200 + 400 + 300 = 1400ms

并行查询大约需要:取最慢任务耗时,大约 400ms

这就是 CompletableFuture + 线程池 的典型应用场景。

十二、异步编程中的事务问题

异步编程一定要注意事务。

错误示例:

@Transactional
public void createOrder(Long userId, Long productId) {

    orderMapper.insertOrder(userId, productId);

    CompletableFuture.runAsync(() -> {
        stockMapper.deductStock(productId);
    }, executor);

    accountMapper.deductBalance(userId);
}

很多人以为:

createOrder() 方法加了 @Transactional,
所以里面所有数据库操作都在一个事务里。

但这是错误的。

因为:

Spring 事务默认绑定当前线程。
CompletableFuture 开启的是新线程。
新线程不会自动加入外层 @Transactional 事务。

也就是说:stockMapper.deductStock(productId); 通常不在外层事务中

如果外层事务回滚,异步线程里的库存扣减不一定回滚,可能造成数据不一致。

核心写流程建议同步事务执行

@Transactional
public void createOrder(Long userId, Long productId) {
    orderMapper.insertOrder(userId, productId);
    stockMapper.deductStock(productId);
    accountMapper.deductBalance(userId);
}

如果是跨服务场景,要考虑:

可靠消息
本地消息表
TCC
Saga
补偿机制
幂等控制
最终一致性

十三、异步编程最佳实践

1. 不要所有任务都异步

适合异步的任务:

耗时任务
非核心任务
可以延迟完成的任务
失败后可以补偿的任务
多个互不依赖的查询任务

2. 一定要使用自定义线程池

不要大量使用:CompletableFuture.supplyAsync(() -> queryData());

因为默认使用的是:ForkJoinPool.commonPool()

这是公共线程池,不适合承载大量业务任务。

推荐:

ompletableFuture.supplyAsync(() -> queryData(), executor);

3. 一定要处理异常

不要写成:

CompletableFuture.supplyAsync(() -> {
    return remoteService.query();
}, executor);

推荐加异常处理:

CompletableFuture.supplyAsync(() -> {
    return remoteService.query();
}, executor).exceptionally(ex -> {
    System.out.println("查询失败:" + ex.getMessage());
    return defaultValue;
});

4. join 也会阻塞

虽然 CompletableFuture 是异步的,但是 future.join();

仍然会阻塞当前线程。

如果任务还没有完成,当前线程会等待。

所以: 异步的是任务执行。 阻塞的是最终等待结果的 join。

5. 注意线程池隔离

不同业务最好使用不同线程池。

比如:

订单线程池
短信线程池
报表线程池
文件处理线程池

不要所有异步任务共用一个线程池。

否则某个慢任务堆积,可能拖垮其他业务。

十四、核心知识点总结

更多推荐