RxJava 入门指南
RxJava 入门指南
目录
- RxJava 是什么
- 核心概念:观察者模式
- RxJava 四种响应类型
- 创建操作符
- 转换操作符
- 过滤操作符
- 组合操作符
- 错误处理
- 线程调度
- 背压问题
- 实战例子
- RxJava 与 Retrofit 结合
- 项目中的 RxJava 使用解析
1. RxJava 是什么
定义
RxJava 是 Reactive Extensions 的 Java 实现,是一个响应式编程库。
官方定义
RxJava = Reactive Extensions for Java
A library for composing asynchronous and event-based programs using observable sequences for the Java VM.
核心特点
- 异步编程:轻松处理网络请求、数据库操作等耗时任务
- 链式操作:用操作符组合数据转换逻辑
- 声明式:描述"做什么"而不是"怎么做"
- 事件流:处理一系列事件而不是单个请求
解决的问题
- 避免回调地狱(Callback Hell)
- 简化异步代码编写
- 轻松处理多线程切换
- 统一的事件处理方式
2. 核心概念:观察者模式
观察者模式四要素
被观察者 (Observable) → 发射数据 → 观察者 (Observer) 接收数据
│ │
└── 订阅 (subscribe) ────────────────────┘
RxJava 中的对应关系
| 角色 | 传统观察者模式 | RxJava |
|---|---|---|
| 被观察者 | Subject | Observable / Single / Maybe |
| 观察者 | Observer | Observer / Subscriber |
| 订阅 | attach() | subscribe() |
| 通知 | update() | onNext() / onComplete() / onError() |
数据流向
Observable 发射数据:
onNext(1) → onNext(2) → onNext(3) → onComplete()
Observer 接收数据:
收到 1 → 收到 2 → 收到 3 → 完成
3. RxJava 四种响应类型
完整对比表
| 类型 | 发射数量 | 发射完成 | 发射错误 | 使用场景 |
|---|---|---|---|---|
| Observable | 0~N 个 | onComplete | onError | 流式数据、多个结果 |
| Single | 1 个 | 内置 | onError | 单次请求、一个结果 |
| Maybe | 0~1 个 | 可选 | onError | 可能为空的结果 |
| Completable | 0 个 | 必须调用 | onError | 只关心完成与否 |
3.1 Observable
最通用的类型,可以发射零到多个数据。
// 创建 Observable
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("第一");
emitter.onNext("第二");
emitter.onNext("第三");
emitter.onComplete();
});
// 订阅
observable.subscribe(
item -> System.out.println("收到: " + item), // onNext
error -> System.out.println("错误: " + error), // onError
() -> System.out.println("完成") // onComplete
);
// 输出:
// 收到: 第一
// 收到: 第二
// 收到: 第三
// 完成
3.2 Single
只发射一个数据或错误,类似 Promise。
Single<String> single = Single.create(emitter -> {
// 只发射一次
emitter.onSuccess("Hello");
});
// 订阅
single.subscribe(
item -> System.out.println("收到: " + item), // onSuccess
error -> System.out.println("错误: " + error) // onError
);
// 输出: 收到: Hello
3.3 Maybe
发射零个或一个数据。
// 有结果
Maybe<String> hasResult = Maybe.create(emitter -> {
emitter.onSuccess("有结果");
});
// 无结果
Maybe<String> noResult = Maybe.create(emitter -> {
emitter.onComplete(); // 没有发射数据,直接完成
});
hasResult.subscribe(
item -> System.out.println("收到: " + item),
error -> System.out.println("错误: " + error),
() -> System.out.println("完成但没数据")
);
3.4 Completable
不发射任何数据,只关心操作是否完成。
Completable completable = Completable.create(emitter -> {
// 执行操作
saveToDatabase();
// 标记完成
emitter.onComplete();
});
completable.subscribe(
() -> System.out.println("操作完成"),
error -> System.out.println("错误: " + error)
);
3.5 类型转换
// Single → Observable
Single.just("Hello")
.toObservable() // 转为 Observable
// Observable → Single (取第一个)
observable
.firstOrError() // 转为 Single
4. 创建操作符
4.1 just()
直接发射指定的数据,最多 10 个。
// 发射单个数据
Single.just("Hello")
// 发射多个数据 (会按顺序依次发射)
Observable.just(1, 2, 3, 4, 5)
.subscribe(System.out::println);
// 输出: 1 2 3 4 5
4.2 fromArray / fromIterable()
从数组或列表创建。
String[] array = {"A", "B", "C"};
Observable.fromArray(array)
.subscribe(System.out::println);
List<String> list = Arrays.asList("X", "Y", "Z");
Observable.fromIterable(list)
.subscribe(System.out::println);
4.3 range()
发射范围内的整数。
// 发射 1 到 10
Observable.range(1, 10)
.subscribe(System.out::println);
// 输出: 1 2 3 4 5 6 7 8 9 10
4.4 interval()
定时发射。
// 每秒发射一个递增的整数
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
// 注意:这个 Observable 会无限发射,需要及时取消订阅
4.5 empty / never / error()
// empty: 立即完成,不发射数据
Observable.empty()
.subscribe(
item -> System.out.println("收到"),
err -> System.out.println("错误"),
() -> System.out.println("完成")
);
// 输出: 完成
// never: 不发射任何数据,也不终止
Observable.never()
.subscribe(
item -> System.out.println("收到"),
err -> System.out.println("错误"),
() -> System.out.println("完成")
);
//什么都不输出
// error: 发射一个错误
Observable.error(new RuntimeException("出错了"))
.subscribe(
item -> System.out.println("收到"),
err -> System.out.println("错误: " + err.getMessage())
);
// 输出: 错误: 出错了
5. 转换操作符
5.1 map()
对每个数据项进行转换。
// 示例:将整数转换为字符串
Observable.just(1, 2, 3)
.map(i -> "数字: " + i)
.subscribe(System.out::println);
// 输出:
// 数字: 1
// 数字: 2
// 数字: 3
5.2 flatMap()
将每个数据项转换为另一个 Observable,然后合并所有 Observable。
// 示例:获取用户列表,再获取每个用户的订单
Observable.just(1, 2, 3) // 用户ID列表
.flatMap(userId -> api.getUser(userId)) // 转为用户 Observable
.subscribe(user -> System.out.println(user.getName()));
// 场景:查询数据库
Observable.just("table1", "table2")
.flatMap(tableName -> db.query(tableName))
.subscribe(row -> System.out.println(row));
5.3 concatMap()
类似 flatMap,但保证顺序(串行)。
// flatMap:多个请求同时发出,无顺序
Observable.just(1, 2, 3)
.flatMap(id -> api.getUser(id)) // 并行
.subscribe(System.out::println);
// concatMap:一个请求完成后再发下一个,有顺序
Observable.just(1, 2, 3)
.concatMap(id -> api.getUser(id)) // 串行
.subscribe(System.out::println);
5.4 scan()
对数据做累积计算。
// 累加
Observable.just(1, 2, 3, 4, 5)
.scan((sum, item) -> sum + item)
.subscribe(System.out::println);
// 输出: 1 3 6 10 15
// ↑ ↑ ↑ ↑ ↑
// 1 1+2 3+3 6+4 10+5
5.5 groupBy()
分组。
// 按成绩分组
Observable.just(95, 45, 78, 88, 55)
.groupBy(score -> score >= 60 ? "及格" : "不及格")
.subscribe(group -> group.subscribe(item ->
System.out.println(group.getKey() + ": " + item)
));
// 输出:
// 不及格: 45
// 不及格: 55
// 及格: 95
// 及格: 78
// 及格: 88
6. 过滤操作符
6.1 filter()
过滤数据。
Observable.just(1, 2, 3, 4, 5, 6)
.filter(i -> i % 2 == 0) // 只保留偶数
.subscribe(System.out::println);
// 输出: 2 4 6
6.2 distinct()
去重。
Observable.just(1, 2, 2, 3, 3, 3, 4)
.distinct()
.subscribe(System.out::println);
// 输出: 1 2 3 4
6.3 take()
只取前 N 个。
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(System.out::println);
// 输出: 1 2 3
6.4 skip()
跳过前 N 个。
Observable.just(1, 2, 3, 4, 5)
.skip(2)
.subscribe(System.out::println);
// 输出: 3 4 5
6.5 first() / last()
// first: 只取第一个
Observable.just(1, 2, 3, 4, 5)
.first(0) // 默认值为0
.subscribe(System.out::println);
// 输出: 1
// last: 只取最后一个
Observable.just(1, 2, 3, 4, 5)
.last(0) // 默认值为0
.subscribe(System.out::println);
// 输出: 5
6.6 debounce()
防抖,延迟发射。
// 搜索场景:用户输入停止 300ms 后才发送请求
searchInput
.debounce(300, TimeUnit.MILLISECONDS)
.flatMap(query -> api.search(query))
.subscribe(results -> display(results));
7. 组合操作符
7.1 merge()
合并多个 Observable(并行)。
Observable<Integer> obs1 = Observable.just(1, 2, 3);
Observable<Integer> obs2 = Observable.just(4, 5, 6);
Observable.merge(obs1, obs2)
.subscribe(System.out::println);
// 输出: 1 2 3 4 5 6 (顺序取决于谁先完成)
7.2 concat()
连接多个 Observable(串行)。
Observable<Integer> obs1 = Observable.just(1, 2, 3);
Observable<Integer> obs2 = Observable.just(4, 5, 6);
Observable.concat(obs1, obs2)
.subscribe(System.out::println);
// 输出: 1 2 3 4 5 6 (严格按顺序)
7.3 zip()
配对合并,将多个 Observable 的数据一一配对。
Observable<String> names = Observable.just("张三", "李四", "王五");
Observable<Integer> ages = Observable.just(20, 25, 30);
Observable.zip(names, ages, (name, age) -> name + ":" + age)
.subscribe(System.out::println);
// 输出:
// 张三:20
// 李四:25
// 王五:30
7.4 combineLatest()
取各 Observable 最后一个最新值组合。
Observable<String> names = Observable.just("张三", "李四", "王五");
Observable<Integer> ages = Observable.just(20, 25);
Observable.combineLatest(names, ages, (name, age) -> name + ":" + age)
.subscribe(System.out::println);
// 输出:
// 李四:20 (names发射到李四时,ages最新是20)
// 王五:25 (names发射到王五时,ages最新是25)
8. 错误处理
8.1 onErrorReturn()
发生错误时返回一个默认值。
api.getUser(1)
.onErrorReturn(error -> {
System.out.println("发生错误: " + error.getMessage());
return new User("默认用户");
})
.subscribe(user -> System.out.println(user.getName()));
// 如果 API 调用失败,输出: 默认用户
8.2 onErrorResumeNext()
发生错误时切换到另一个 Observable。
api.getUser(1)
.onErrorResumeNext(error -> {
System.out.println("切换到备用数据源");
return localDatabase.getUser(1); // 返回本地数据库的数据
})
.subscribe(user -> System.out.println(user.getName()));
8.3 retry()
重试。
// 重试 3 次
api.getUser(1)
.retry(3)
.subscribe(
user -> System.out.println(user.getName()),
error -> System.out.println("重试3次后仍失败: " + error.getMessage())
);
// 无限重试
api.getUser(1)
.retry()
.subscribe(...);
// 条件重试
api.getUser(1)
.retry((retryCount, error) -> retryCount < 3 && error instanceof TimeoutException)
.subscribe(...);
8.4 retryWhen()
更复杂的重试策略。
api.getUser(1)
.retryWhen(errors -> errors
.zipWith(Observable.range(1, 3), (error, retryCount) -> retryCount)
.flatMap(retryCount -> Observable.timer(retryCount * 1000, TimeUnit.MILLISECONDS)))
.subscribe(...);
// 指数退避: 1秒后重试 → 2秒后重试 → 3秒后重试
9. 线程调度
9.1 常用调度器
| 调度器 | 说明 | 使用场景 |
|---|---|---|
| Schedulers.io() | I/O 线程池 | 网络请求、文件读写、数据库操作 |
| Schedulers.computation() | 计算线程池 | CPU 密集型计算 |
| Schedulers.newThread() | 每次新建线程 | 耗时一次性任务 |
| Schedulers.single() | 单线程 | 需要顺序执行的任务 |
| AndroidSchedulers.mainThread() | 主线程 | Android UI 更新 |
9.2 subscribeOn() 和 observeOn()
// subscribeOn(): 指定上游(数据发射)的线程
// observeOn(): 指定下游(接收处理)的线程
Observable.just(1, 2, 3)
.map(this::processInBackground) // 在 io 线程
.observeOn(Schedulers.io())
.map(this::processInBackground) // 在 io 线程
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
.subscribe(result -> updateUI(result)); // 在主线程更新 UI
9.3 线程切换示例
// 完整示例:网络请求 + 数据处理 + UI 更新
Observable.just("url1", "url2", "url3")
.subscribeOn(Schedulers.io()) // 网络请求在 IO 线程
.observeOn(Schedulers.io()) // 切换到 IO 线程处理数据
.map(url -> downloadImage(url)) // 下载图片
.map(bytes -> compressImage(bytes)) // 压缩图片
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
.subscribe(
bitmap -> imageView.setImageBitmap(bitmap), // 更新 UI
error -> Toast.makeText(context, "加载失败", Toast.LENGTH_SHORT).show()
);
10. 背压问题
10.1 什么是背压
当数据发射速度 > 处理速度 时,会导致数据积压、内存溢出。
发射端: → → → → → → → → → → → (快速发射)
处理端: → → → → (处理慢)
↓ ↓ ↓ ↓
[积压数据越来越多...]
10.2 解决方案
方案一:换用支持背压的 Flowable
// Observable 不支持背压,会积压数据
// Flowable 支持背压,有背压策略
Flowable<Integer> flowable = Flowable.create(emitter -> {
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}, BackpressureStrategy.DROP); // 背压策略:丢弃溢出的数据
方案二:使用操作符限流
// window: 分批处理
observable
.window(100) // 每100个一批
.subscribe(batch -> process(batch));
// buffer: 缓冲
observable
.buffer(100) // 缓冲100个再处理
.subscribe(list -> process(list));
// throttle: 节流
observable
.throttleLatest(100, TimeUnit.MILLISECONDS) // 100ms 内只取最新
.subscribe(...);
10.3 背压策略
| 策略 | 说明 |
|---|---|
| BackpressureStrategy.DROP | 丢弃溢出数据 |
| BackpressureStrategy.LATEST | 只保留最新数据 |
| BackpressureStrategy.BUFFER | 缓冲所有数据(可能内存溢出) |
| BackpressureStrategy.ERROR | 抛出 MissingBackpressureException |
11. 实战例子
11.1 网络请求链
// 场景:搜索 → 获取详情 → 获取评论
api.search(keyword)
.flatMap(searchResults -> {
if (searchResults.isEmpty()) {
return Observable.empty();
}
return api.getDetail(searchResults.get(0).getId());
})
.flatMap(detail -> api.getComments(detail.getId()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
comments -> displayComments(comments),
error -> showError(error)
);
11.2 合并请求
// 场景:同时获取用户信息和文章列表
Observable<User> userObs = api.getUser(userId);
Observable<List<Article>> articlesObs = api.getArticles(userId);
Observable.zip(userObs, articlesObs, (user, articles) ->
new UserProfile(user, articles)
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(profile -> displayProfile(profile));
11.3 搜索防抖
// 场景:实时搜索,用户停止输入 300ms 后才发送请求
searchInput
.debounce(300, TimeUnit.MILLISECONDS) // 防抖
.distinctUntilChanged() // 过滤重复输入
.filter(text -> text.length() >= 2) // 至少2个字符
.flatMap(text -> api.search(text)) // 发送请求
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
results -> displayResults(results),
error -> showError(error)
);
11.4 倒计时
// 场景:发送验证码后倒计时 60 秒
Observable.interval(1, TimeUnit.SECONDS)
.map(i -> 60 - i)
.take(60) // 0 时停止
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
seconds -> button.setText(seconds + "秒后可重发"),
error -> {},
() -> button.setText("发送验证码")
);
12. RxJava 与 Retrofit 结合
12.1 添加依赖
// build.gradle
dependencies {
// Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.9.0'
// Retrofit 默认的 Gson 转换器
implementation 'com.squareup.retrofit2:converter-gson:2.9.0'
// RxJava 适配器
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.9.0'
// RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
}
12.2 定义 API 接口
public interface GitHubApi {
@GET("users/{user}/repos")
Single<List<Repo>> getRepos(@Path("user") String user);
@GET("repos/{owner}/{repo}")
Single<Repo> getRepo(
@Path("owner") String owner,
@Path("repo") String repo
);
}
12.3 配置 Retrofit
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.client(okHttpClient)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 关键!
.build();
GitHubApi api = retrofit.create(GitHubApi.class);
12.4 调用
// 获取用户仓库
api.getRepos("square")
.subscribeOn(Schedulers.io()) // 网络请求在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 结果切换到主线程
.subscribe(
repos -> listView.setAdapter(new RepoAdapter(repos)),
error -> Toast.makeText(this, "加载失败", Toast.LENGTH_SHORT).show()
);
12.5 链式请求
// 获取仓库详情和它的前 10 个 issue
api.getRepo("square", "okhttp")
.flatMap(repo -> api.getIssues("square", "okhttp")
.map(issues -> new RepoDetail(repo, issues)))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
detail -> display(detail),
error -> showError(error)
);
13. 项目中的 RxJava 使用解析
本章节结合你的 ChatGPT SDK 项目,解析 RxJava 在实际项目中的使用
13.1 项目结构
src/main/java/cn/bugstack/chatgpt/
├── IOpenAiApi.java # API 接口定义(核心)
├── session/
│ ├── OpenAiSession.java # 会话接口
│ └── defaults/
│ └── DefaultOpenAiSession.java # 会话实现
└── domain/
└── chat/
├── ChatCompletionRequest.java
└── ChatCompletionResponse.java
13.2 API 接口定义 - Single 的使用
文件: IOpenAiApi.java
public interface IOpenAiApi {
String v1_chat_completions = "v1/chat/completions";
@POST(v1_chat_completions)
Single<ChatCompletionResponse> completions(@Body ChatCompletionRequest chatCompletionRequest);
}
解析:
- 使用
Single<ChatCompletionResponse>作为返回类型 Single表示单次请求、一个响应,适合 API 调用场景- 相比
Observable,Single更简洁,明确表示"只发射一次"
13.3 Retrofit 配置 - RxJava 适配器
文件: HttpClientTest.java
IOpenAiApi openAiApi = new Retrofit.Builder()
.baseUrl("https://api.openai-proxy.org/")
.client(okHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava 适配器
.addConverterFactory(JacksonConverterFactory.create())
.build()
.create(IOpenAiApi.class);
解析:
RxJava2CallAdapterFactory.create()是关键,它让 Retrofit 返回Single而不是Call- 这样定义的方法返回
Single<T>而不是Call<T>
13.4 调用方式 - blockingGet vs subscribe
文件: HttpClientTest.java
// 方式一:blockingGet() - 同步阻塞(项目中使用的)
Single<ChatCompletionResponse> responseSingle = openAiApi.completions(chatCompletion);
ChatCompletionResponse response = responseSingle.blockingGet();
// 方式二:subscribe() - 纯异步(非阻塞)
openAiApi.completions(chatCompletion)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
response -> System.out.println("成功:" + response),
error -> System.out.println("失败:" + error)
);
解析:
blockingGet()会阻塞等待结果,是"伪异步"subscribe()是真正的异步回调写法- 项目中用
blockingGet()是为了简化测试,实际项目建议用subscribe()
13.5 完整调用示例
文件: ApiTest.java
@Test
public void test_chat_completions() {
// 1. 创建请求参数
ChatCompletionRequest chatCompletion = ChatCompletionRequest
.builder()
.messages(Collections.singletonList(
Message.builder()
.role(Constants.Role.USER)
.content("1+1")
.build()
))
.model(ChatCompletionRequest.Model.GPT_4.getCode())
.build();
// 2. 发起请求(返回 Single)
ChatCompletionResponse chatCompletionResponse = openAiSession.completions(chatCompletion);
// 3. 处理结果
chatCompletionResponse.getChoices().forEach(e -> {
log.info("测试结果:{}", e.getMessage());
});
}
解析:
openAiSession.completions()内部调用了openAiApi.completions()- 返回
Single<ChatCompletionResponse>,自动转换为ChatCompletionResponse
13.6 流式响应 - SSE 与 RxJava
文件: ApiTest.java
@Test
public void test_chat_completions_stream() throws JsonProcessingException, InterruptedException {
// 1. 创建流式请求
ChatCompletionRequest chatCompletion = ChatCompletionRequest
.builder()
.stream(true) // 开启流式
.messages(Collections.singletonList(
Message.builder()
.role(Constants.Role.USER)
.content("写一个java冒泡排序")
.build()
))
.model(ChatCompletionRequest.Model.GPT_4.getCode())
.build();
// 2. 发起流式请求
EventSource eventSource = openAiSession.chatCompletions(chatCompletion, new EventSourceListener() {
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
log.info("测试结果:{}", data); // 流式数据陆续到达
}
});
// 等待
new CountDownLatch(1).await();
}
解析:
- 流式响应使用
EventSource(来自 OkHttp 的 SSE 支持) EventSourceListener的onEvent会被多次调用,每次收到一块数据- 这时 RxJava 的
Single已经不够用了,需要用Observable或原生 SSE
13.7 核心流程图
┌─────────────────────────────────────────────────────────┐
│ 调用端 │
│ openAiSession.completions(chatCompletion) │
└─────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ DefaultOpenAiSession │
│ return openAiApi.completions(chatCompletion) │
│ 返回 Single<ChatCompletionResponse> │
└─────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ IOpenAiApi │
│ @POST("v1/chat/completions") │
│ Single<ChatCompletionResponse> completions(...) │
└─────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Retrofit + RxJava2CallAdapter │
│ OkHttp 执行网络请求 │
│ Jackson 解析 JSON │
│ 转换为 Single<ChatCompletionResponse> │
└─────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ OkHttpClient │
│ 实际发送 HTTP 请求到 OpenAI API │
└─────────────────────────────────────────────────────────┘
13.8 为什么选择 Single?
| 选择 | 原因 |
|---|---|
Single 而非 Observable |
API 调用只返回一次结果,不需要发射多个数据 |
Single 而非 Call |
可以用 RxJava 操作符(map、flatMap 等) |
blockingGet() |
简化测试代码,实际项目应用 subscribe() |
13.9 扩展:如何改成纯异步
如果想让项目使用更标准的异步写法,可以这样改:
// 修改 IOpenAiSession 接口
public interface OpenAiSession {
Observable<ChatCompletionResponse> completions(ChatCompletionRequest request);
}
// 使用
openAiSession.completions(request)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
response -> display(response),
error -> handleError(error)
);
附录:常用操作符速查表
创建类
| 操作符 | 说明 |
|---|---|
| just() | 直接发射指定数据 |
| fromArray/Iterable() | 从数组/列表创建 |
| range() | 发射范围整数 |
| interval() | 定时发射 |
| empty/never/error() | 特殊 Observable |
转换类
| 操作符 | 说明 |
|---|---|
| map() | 转换每个数据 |
| flatMap() | 展开并合并 |
| concatMap() | 展开并串行合并 |
| scan() | 累积计算 |
过滤类
| 操作符 | 说明 |
|---|---|
| filter() | 过滤 |
| distinct() | 去重 |
| take() | 取前 N 个 |
| skip() | 跳过前 N 个 |
| debounce() | 防抖 |
组合类
| 操作符 | 说明 |
|---|---|
| merge() | 并行合并 |
| concat() | 串行连接 |
| zip() | 配对合并 |
| combineLatest() | 最新值组合 |
学习建议
- 先理解概念:观察者模式、发布-订阅模式
- 从简单开始:先学会 just()、subscribe()、map()
- 多动手实践:用 RxJava 改写现有代码
- 理解线程:subscribeOn() 和 observeOn() 是关键
- 进阶学习:背压、操作符原理
更多推荐


所有评论(0)