RxJava:JVM 上的响应式编程利器
文章目录
RxJava:JVM 上的响应式编程利器
RxJava 是 JVM 平台上实现 Reactive Extensions 的开源库,用于编写异步和基于事件的程序,目前斩获了 48,000+ 的 Star。

RxJava 的核心思路是把数据和事件封装为可观察序列(Observable Sequence),开发者通过链式调用各种操作符来组合、转换、过滤这些序列,而不用手动管理线程同步和并发细节。
它扩展了观察者模式,支持对数据/事件序列进行声明式组合,底层的线程调度、同步、线程安全等问题由库本身处理。

五种基础类型
RxJava 4 提供了五种核心响应式类型,各自适用不同场景:
Flowable:0 到 N 个元素,支持背压(Backpressure),基于 Reactive Streams 规范Observable:0 到 N 个元素,不支持背压,适合短序列和 GUI 交互Single:只发出一个元素或一个错误Completable:不发出元素,只关心完成或出错Maybe:可能发出零个、一个元素,或一个错误
4.x 版本更新
RxJava 4.x 是当前主力开发版本,带来了一批重要变化:
- 原生支持 Java 26,运行时无需任何第三方依赖
- 基于
java.util.concurrent.Flow实现,与 JDK 标准对齐 - 新增虚拟线程(Virtual Thread)支持,提供
Schedulers.virtual()等调度器 - 新增
Streamable<T>类型,围绕虚拟线程构建异步流式操作 - 引入 Java Cleaner API 检测资源泄漏并自动清理
- 完整支持 JPMS 模块系统和 OSGi
线程调度
RxJava 通过 Scheduler 抽象来管理并发,不直接操作 Thread 或 ExecutorService。内置的调度器包括:
Schedulers.computation():计算密集型任务,固定线程池Schedulers.io():IO 和阻塞操作,动态线程池Schedulers.single():单线程顺序执行Schedulers.trampoline():当前线程排队执行,常用于测试
使用 subscribeOn 指定上游数据产生的线程,observeOn 指定下游消费的线程,一个典型的后台请求切前台的写法如下:
Flowable.fromCallable(() -> {
Thread.sleep(1000);
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
背压处理
当上游产生数据的速度超过下游消费速度时,背压机制可以避免内存溢出。Flowable 专门用于支持背压场景,提供了 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等策略。Observable 和其他类型不涉及背压,因为它们要么处理短序列,要么缓冲开销可控。
并行处理
RxJava 中的并行意味着把一个流拆成多个独立子流分别执行,再将结果合并回单一序列。flatMap 可以把每个元素映射为独立的 Flowable 并发运行;Flowable.parallel() 配合 runOn 则是另一种并行写法:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
接入方式
Gradle 引用一行即可:
implementation "io.reactivex.rxjava4:rxjava:4.x.y"
Maven 用户:
<dependency>
<groupId>io.reactivex.rxjava4</groupId>
<artifactId>rxjava</artifactId>
<version>x.y.z</version>
</dependency>
RxJava 在 Android 开发、后端微服务、消息中间件等领域有大量实践。对于需要处理异步数据流和复杂并发逻辑的 Java 项目,它是一个经过生产验证的选择。
RxJava 在 Android 开发、后端微服务、消息中间件等领域有大量实践。对于需要处理异步数据流和复杂并发逻辑的 Java 项目,它是一个经过生产验证的选择。
更多推荐
所有评论(0)