RxJava 入门指南

目录

  1. RxJava 是什么
  2. 核心概念:观察者模式
  3. RxJava 四种响应类型
  4. 创建操作符
  5. 转换操作符
  6. 过滤操作符
  7. 组合操作符
  8. 错误处理
  9. 线程调度
  10. 背压问题
  11. 实战例子
  12. RxJava 与 Retrofit 结合
  13. 项目中的 RxJava 使用解析

1. RxJava 是什么

定义

RxJava 是 Reactive Extensions 的 Java 实现,是一个响应式编程库。

官方定义

RxJava = Reactive Extensions for Java
A library for composing asynchronous and event-based programs using observable sequences for the Java VM.

核心特点

  • 异步编程:轻松处理网络请求、数据库操作等耗时任务
  • 链式操作:用操作符组合数据转换逻辑
  • 声明式:描述"做什么"而不是"怎么做"
  • 事件流:处理一系列事件而不是单个请求

解决的问题

  1. 避免回调地狱(Callback Hell)
  2. 简化异步代码编写
  3. 轻松处理多线程切换
  4. 统一的事件处理方式

2. 核心概念:观察者模式

观察者模式四要素

被观察者 (Observable)  →  发射数据  →  观察者 (Observer) 接收数据
     │                                        │
     └── 订阅 (subscribe) ────────────────────┘

RxJava 中的对应关系

角色 传统观察者模式 RxJava
被观察者 Subject Observable / Single / Maybe
观察者 Observer Observer / Subscriber
订阅 attach() subscribe()
通知 update() onNext() / onComplete() / onError()

数据流向

Observable 发射数据:
    onNext(1) → onNext(2) → onNext(3) → onComplete()

Observer 接收数据:
    收到 1 → 收到 2 → 收到 3 → 完成

3. RxJava 四种响应类型

完整对比表

类型 发射数量 发射完成 发射错误 使用场景
Observable 0~N 个 onComplete onError 流式数据、多个结果
Single 1 个 内置 onError 单次请求、一个结果
Maybe 0~1 个 可选 onError 可能为空的结果
Completable 0 个 必须调用 onError 只关心完成与否

3.1 Observable

最通用的类型,可以发射零到多个数据。

// 创建 Observable
Observable<String> observable = Observable.create(emitter -> {
    emitter.onNext("第一");
    emitter.onNext("第二");
    emitter.onNext("第三");
    emitter.onComplete();
});

// 订阅
observable.subscribe(
    item -> System.out.println("收到: " + item),      // onNext
    error -> System.out.println("错误: " + error),     // onError
    () -> System.out.println("完成")                   // onComplete
);

// 输出:
// 收到: 第一
// 收到: 第二
// 收到: 第三
// 完成

3.2 Single

只发射一个数据或错误,类似 Promise。

Single<String> single = Single.create(emitter -> {
    // 只发射一次
    emitter.onSuccess("Hello");
});

// 订阅
single.subscribe(
    item -> System.out.println("收到: " + item),      // onSuccess
    error -> System.out.println("错误: " + error)      // onError
);

// 输出: 收到: Hello

3.3 Maybe

发射零个或一个数据。

// 有结果
Maybe<String> hasResult = Maybe.create(emitter -> {
    emitter.onSuccess("有结果");
});

// 无结果
Maybe<String> noResult = Maybe.create(emitter -> {
    emitter.onComplete();  // 没有发射数据,直接完成
});

hasResult.subscribe(
    item -> System.out.println("收到: " + item),
    error -> System.out.println("错误: " + error),
    () -> System.out.println("完成但没数据")
);

3.4 Completable

不发射任何数据,只关心操作是否完成。

Completable completable = Completable.create(emitter -> {
    // 执行操作
    saveToDatabase();
    // 标记完成
    emitter.onComplete();
});

completable.subscribe(
    () -> System.out.println("操作完成"),
    error -> System.out.println("错误: " + error)
);

3.5 类型转换

// Single → Observable
Single.just("Hello")
    .toObservable()  // 转为 Observable

// Observable → Single (取第一个)
observable
    .firstOrError()  // 转为 Single

4. 创建操作符

4.1 just()

直接发射指定的数据,最多 10 个。

// 发射单个数据
Single.just("Hello")

// 发射多个数据 (会按顺序依次发射)
Observable.just(1, 2, 3, 4, 5)
    .subscribe(System.out::println);

// 输出: 1 2 3 4 5

4.2 fromArray / fromIterable()

从数组或列表创建。

String[] array = {"A", "B", "C"};
Observable.fromArray(array)
    .subscribe(System.out::println);

List<String> list = Arrays.asList("X", "Y", "Z");
Observable.fromIterable(list)
    .subscribe(System.out::println);

4.3 range()

发射范围内的整数。

// 发射 1 到 10
Observable.range(1, 10)
    .subscribe(System.out::println);

// 输出: 1 2 3 4 5 6 7 8 9 10

4.4 interval()

定时发射。

// 每秒发射一个递增的整数
Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);

// 注意:这个 Observable 会无限发射,需要及时取消订阅

4.5 empty / never / error()

// empty: 立即完成,不发射数据
Observable.empty()
    .subscribe(
        item -> System.out.println("收到"),
        err -> System.out.println("错误"),
        () -> System.out.println("完成")
    );
// 输出: 完成

// never: 不发射任何数据,也不终止
Observable.never()
    .subscribe(
        item -> System.out.println("收到"),
        err -> System.out.println("错误"),
        () -> System.out.println("完成")
    );
//什么都不输出

// error: 发射一个错误
Observable.error(new RuntimeException("出错了"))
    .subscribe(
        item -> System.out.println("收到"),
        err -> System.out.println("错误: " + err.getMessage())
    );
// 输出: 错误: 出错了

5. 转换操作符

5.1 map()

对每个数据项进行转换。

// 示例:将整数转换为字符串
Observable.just(1, 2, 3)
    .map(i -> "数字: " + i)
    .subscribe(System.out::println);

// 输出:
// 数字: 1
// 数字: 2
// 数字: 3

5.2 flatMap()

将每个数据项转换为另一个 Observable,然后合并所有 Observable。

// 示例:获取用户列表,再获取每个用户的订单
Observable.just(1, 2, 3)  // 用户ID列表
    .flatMap(userId -> api.getUser(userId))  // 转为用户 Observable
    .subscribe(user -> System.out.println(user.getName()));

// 场景:查询数据库
Observable.just("table1", "table2")
    .flatMap(tableName -> db.query(tableName))
    .subscribe(row -> System.out.println(row));

5.3 concatMap()

类似 flatMap,但保证顺序(串行)。

// flatMap:多个请求同时发出,无顺序
Observable.just(1, 2, 3)
    .flatMap(id -> api.getUser(id))  // 并行
    .subscribe(System.out::println);

// concatMap:一个请求完成后再发下一个,有顺序
Observable.just(1, 2, 3)
    .concatMap(id -> api.getUser(id))  // 串行
    .subscribe(System.out::println);

5.4 scan()

对数据做累积计算。

// 累加
Observable.just(1, 2, 3, 4, 5)
    .scan((sum, item) -> sum + item)
    .subscribe(System.out::println);

// 输出: 1 3 6 10 15
//       ↑  ↑  ↑  ↑  ↑
//       1 1+2 3+3 6+4 10+5

5.5 groupBy()

分组。

// 按成绩分组
Observable.just(95, 45, 78, 88, 55)
    .groupBy(score -> score >= 60 ? "及格" : "不及格")
    .subscribe(group -> group.subscribe(item ->
        System.out.println(group.getKey() + ": " + item)
    ));

// 输出:
// 不及格: 45
// 不及格: 55
// 及格: 95
// 及格: 78
// 及格: 88

6. 过滤操作符

6.1 filter()

过滤数据。

Observable.just(1, 2, 3, 4, 5, 6)
    .filter(i -> i % 2 == 0)  // 只保留偶数
    .subscribe(System.out::println);

// 输出: 2 4 6

6.2 distinct()

去重。

Observable.just(1, 2, 2, 3, 3, 3, 4)
    .distinct()
    .subscribe(System.out::println);

// 输出: 1 2 3 4

6.3 take()

只取前 N 个。

Observable.just(1, 2, 3, 4, 5)
    .take(3)
    .subscribe(System.out::println);

// 输出: 1 2 3

6.4 skip()

跳过前 N 个。

Observable.just(1, 2, 3, 4, 5)
    .skip(2)
    .subscribe(System.out::println);

// 输出: 3 4 5

6.5 first() / last()

// first: 只取第一个
Observable.just(1, 2, 3, 4, 5)
    .first(0)  // 默认值为0
    .subscribe(System.out::println);

// 输出: 1

// last: 只取最后一个
Observable.just(1, 2, 3, 4, 5)
    .last(0)   // 默认值为0
    .subscribe(System.out::println);

// 输出: 5

6.6 debounce()

防抖,延迟发射。

// 搜索场景:用户输入停止 300ms 后才发送请求
searchInput
    .debounce(300, TimeUnit.MILLISECONDS)
    .flatMap(query -> api.search(query))
    .subscribe(results -> display(results));

7. 组合操作符

7.1 merge()

合并多个 Observable(并行)。

Observable<Integer> obs1 = Observable.just(1, 2, 3);
Observable<Integer> obs2 = Observable.just(4, 5, 6);

Observable.merge(obs1, obs2)
    .subscribe(System.out::println);

// 输出: 1 2 3 4 5 6 (顺序取决于谁先完成)

7.2 concat()

连接多个 Observable(串行)。

Observable<Integer> obs1 = Observable.just(1, 2, 3);
Observable<Integer> obs2 = Observable.just(4, 5, 6);

Observable.concat(obs1, obs2)
    .subscribe(System.out::println);

// 输出: 1 2 3 4 5 6 (严格按顺序)

7.3 zip()

配对合并,将多个 Observable 的数据一一配对。

Observable<String> names = Observable.just("张三", "李四", "王五");
Observable<Integer> ages = Observable.just(20, 25, 30);

Observable.zip(names, ages, (name, age) -> name + ":" + age)
    .subscribe(System.out::println);

// 输出:
// 张三:20
// 李四:25
// 王五:30

7.4 combineLatest()

取各 Observable 最后一个最新值组合。

Observable<String> names = Observable.just("张三", "李四", "王五");
Observable<Integer> ages = Observable.just(20, 25);

Observable.combineLatest(names, ages, (name, age) -> name + ":" + age)
    .subscribe(System.out::println);

// 输出:
// 李四:20  (names发射到李四时,ages最新是20)
// 王五:25  (names发射到王五时,ages最新是25)

8. 错误处理

8.1 onErrorReturn()

发生错误时返回一个默认值。

api.getUser(1)
    .onErrorReturn(error -> {
        System.out.println("发生错误: " + error.getMessage());
        return new User("默认用户");
    })
    .subscribe(user -> System.out.println(user.getName()));

// 如果 API 调用失败,输出: 默认用户

8.2 onErrorResumeNext()

发生错误时切换到另一个 Observable。

api.getUser(1)
    .onErrorResumeNext(error -> {
        System.out.println("切换到备用数据源");
        return localDatabase.getUser(1);  // 返回本地数据库的数据
    })
    .subscribe(user -> System.out.println(user.getName()));

8.3 retry()

重试。

// 重试 3 次
api.getUser(1)
    .retry(3)
    .subscribe(
        user -> System.out.println(user.getName()),
        error -> System.out.println("重试3次后仍失败: " + error.getMessage())
    );

// 无限重试
api.getUser(1)
    .retry()
    .subscribe(...);

// 条件重试
api.getUser(1)
    .retry((retryCount, error) -> retryCount < 3 && error instanceof TimeoutException)
    .subscribe(...);

8.4 retryWhen()

更复杂的重试策略。

api.getUser(1)
    .retryWhen(errors -> errors
        .zipWith(Observable.range(1, 3), (error, retryCount) -> retryCount)
        .flatMap(retryCount -> Observable.timer(retryCount * 1000, TimeUnit.MILLISECONDS)))
    .subscribe(...);
// 指数退避: 1秒后重试 → 2秒后重试 → 3秒后重试

9. 线程调度

9.1 常用调度器

调度器 说明 使用场景
Schedulers.io() I/O 线程池 网络请求、文件读写、数据库操作
Schedulers.computation() 计算线程池 CPU 密集型计算
Schedulers.newThread() 每次新建线程 耗时一次性任务
Schedulers.single() 单线程 需要顺序执行的任务
AndroidSchedulers.mainThread() 主线程 Android UI 更新

9.2 subscribeOn() 和 observeOn()

// subscribeOn(): 指定上游(数据发射)的线程
// observeOn():  指定下游(接收处理)的线程

Observable.just(1, 2, 3)
    .map(this::processInBackground)     // 在 io 线程
    .observeOn(Schedulers.io())
    .map(this::processInBackground)     // 在 io 线程
    .observeOn(AndroidSchedulers.mainThread())  // 切换到主线程
    .subscribe(result -> updateUI(result));  // 在主线程更新 UI

9.3 线程切换示例

// 完整示例:网络请求 + 数据处理 + UI 更新

Observable.just("url1", "url2", "url3")
    .subscribeOn(Schedulers.io())           // 网络请求在 IO 线程
    .observeOn(Schedulers.io())             // 切换到 IO 线程处理数据
    .map(url -> downloadImage(url))         // 下载图片
    .map(bytes -> compressImage(bytes))      // 压缩图片
    .observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
    .subscribe(
        bitmap -> imageView.setImageBitmap(bitmap),  // 更新 UI
        error -> Toast.makeText(context, "加载失败", Toast.LENGTH_SHORT).show()
    );

10. 背压问题

10.1 什么是背压

数据发射速度 > 处理速度 时,会导致数据积压、内存溢出。

发射端: → → → → → → → → → → →  (快速发射)
处理端: →   →   →   →             (处理慢)
        ↓   ↓   ↓   ↓
      [积压数据越来越多...]

10.2 解决方案

方案一:换用支持背压的 Flowable
// Observable 不支持背压,会积压数据
// Flowable 支持背压,有背压策略

Flowable<Integer> flowable = Flowable.create(emitter -> {
    for (int i = 0; i < 10000; i++) {
        emitter.onNext(i);
    }
    emitter.onComplete();
}, BackpressureStrategy.DROP);  // 背压策略:丢弃溢出的数据
方案二:使用操作符限流
// window: 分批处理
observable
    .window(100)  // 每100个一批
    .subscribe(batch -> process(batch));

// buffer: 缓冲
observable
    .buffer(100)  // 缓冲100个再处理
    .subscribe(list -> process(list));

// throttle: 节流
observable
    .throttleLatest(100, TimeUnit.MILLISECONDS)  // 100ms 内只取最新
    .subscribe(...);

10.3 背压策略

策略 说明
BackpressureStrategy.DROP 丢弃溢出数据
BackpressureStrategy.LATEST 只保留最新数据
BackpressureStrategy.BUFFER 缓冲所有数据(可能内存溢出)
BackpressureStrategy.ERROR 抛出 MissingBackpressureException

11. 实战例子

11.1 网络请求链

// 场景:搜索 → 获取详情 → 获取评论

api.search(keyword)
    .flatMap(searchResults -> {
        if (searchResults.isEmpty()) {
            return Observable.empty();
        }
        return api.getDetail(searchResults.get(0).getId());
    })
    .flatMap(detail -> api.getComments(detail.getId()))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        comments -> displayComments(comments),
        error -> showError(error)
    );

11.2 合并请求

// 场景:同时获取用户信息和文章列表

Observable<User> userObs = api.getUser(userId);
Observable<List<Article>> articlesObs = api.getArticles(userId);

Observable.zip(userObs, articlesObs, (user, articles) ->
    new UserProfile(user, articles)
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(profile -> displayProfile(profile));

11.3 搜索防抖

// 场景:实时搜索,用户停止输入 300ms 后才发送请求

searchInput
    .debounce(300, TimeUnit.MILLISECONDS)      // 防抖
    .distinctUntilChanged()                      // 过滤重复输入
    .filter(text -> text.length() >= 2)        // 至少2个字符
    .flatMap(text -> api.search(text))         // 发送请求
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        results -> displayResults(results),
        error -> showError(error)
    );

11.4 倒计时

// 场景:发送验证码后倒计时 60 秒

Observable.interval(1, TimeUnit.SECONDS)
    .map(i -> 60 - i)
    .take(60)              // 0 时停止
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        seconds -> button.setText(seconds + "秒后可重发"),
        error -> {},
        () -> button.setText("发送验证码")
    );

12. RxJava 与 Retrofit 结合

12.1 添加依赖

// build.gradle

dependencies {
    // Retrofit
    implementation 'com.squareup.retrofit2:retrofit:2.9.0'

    // Retrofit 默认的 Gson 转换器
    implementation 'com.squareup.retrofit2:converter-gson:2.9.0'

    // RxJava 适配器
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.9.0'

    // RxJava
    implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
}

12.2 定义 API 接口

public interface GitHubApi {

    @GET("users/{user}/repos")
    Single<List<Repo>> getRepos(@Path("user") String user);

    @GET("repos/{owner}/{repo}")
    Single<Repo> getRepo(
        @Path("owner") String owner,
        @Path("repo") String repo
    );
}

12.3 配置 Retrofit

Retrofit retrofit = new Retrofit.Builder()
    .baseUrl("https://api.github.com/")
    .client(okHttpClient)
    .addConverterFactory(GsonConverterFactory.create())
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())  // 关键!
    .build();

GitHubApi api = retrofit.create(GitHubApi.class);

12.4 调用

// 获取用户仓库
api.getRepos("square")
    .subscribeOn(Schedulers.io())                // 网络请求在 IO 线程
    .observeOn(AndroidSchedulers.mainThread())   // 结果切换到主线程
    .subscribe(
        repos -> listView.setAdapter(new RepoAdapter(repos)),
        error -> Toast.makeText(this, "加载失败", Toast.LENGTH_SHORT).show()
    );

12.5 链式请求

// 获取仓库详情和它的前 10 个 issue

api.getRepo("square", "okhttp")
    .flatMap(repo -> api.getIssues("square", "okhttp")
        .map(issues -> new RepoDetail(repo, issues)))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        detail -> display(detail),
        error -> showError(error)
    );

13. 项目中的 RxJava 使用解析

本章节结合你的 ChatGPT SDK 项目,解析 RxJava 在实际项目中的使用

13.1 项目结构

src/main/java/cn/bugstack/chatgpt/
├── IOpenAiApi.java          # API 接口定义(核心)
├── session/
│   ├── OpenAiSession.java   # 会话接口
│   └── defaults/
│       └── DefaultOpenAiSession.java  # 会话实现
└── domain/
    └── chat/
        ├── ChatCompletionRequest.java
        └── ChatCompletionResponse.java

13.2 API 接口定义 - Single 的使用

文件: IOpenAiApi.java

public interface IOpenAiApi {

    String v1_chat_completions = "v1/chat/completions";

    @POST(v1_chat_completions)
    Single<ChatCompletionResponse> completions(@Body ChatCompletionRequest chatCompletionRequest);
}

解析

  • 使用 Single<ChatCompletionResponse> 作为返回类型
  • Single 表示单次请求、一个响应,适合 API 调用场景
  • 相比 ObservableSingle 更简洁,明确表示"只发射一次"

13.3 Retrofit 配置 - RxJava 适配器

文件: HttpClientTest.java

IOpenAiApi openAiApi = new Retrofit.Builder()
    .baseUrl("https://api.openai-proxy.org/")
    .client(okHttpClient)
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())  // RxJava 适配器
    .addConverterFactory(JacksonConverterFactory.create())
    .build()
    .create(IOpenAiApi.class);

解析

  • RxJava2CallAdapterFactory.create() 是关键,它让 Retrofit 返回 Single 而不是 Call
  • 这样定义的方法返回 Single<T> 而不是 Call<T>

13.4 调用方式 - blockingGet vs subscribe

文件: HttpClientTest.java

// 方式一:blockingGet() - 同步阻塞(项目中使用的)
Single<ChatCompletionResponse> responseSingle = openAiApi.completions(chatCompletion);
ChatCompletionResponse response = responseSingle.blockingGet();

// 方式二:subscribe() - 纯异步(非阻塞)
openAiApi.completions(chatCompletion)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        response -> System.out.println("成功:" + response),
        error -> System.out.println("失败:" + error)
    );

解析

  • blockingGet() 会阻塞等待结果,是"伪异步"
  • subscribe() 是真正的异步回调写法
  • 项目中用 blockingGet() 是为了简化测试,实际项目建议用 subscribe()

13.5 完整调用示例

文件: ApiTest.java

@Test
public void test_chat_completions() {
    // 1. 创建请求参数
    ChatCompletionRequest chatCompletion = ChatCompletionRequest
        .builder()
        .messages(Collections.singletonList(
            Message.builder()
                .role(Constants.Role.USER)
                .content("1+1")
                .build()
        ))
        .model(ChatCompletionRequest.Model.GPT_4.getCode())
        .build();

    // 2. 发起请求(返回 Single)
    ChatCompletionResponse chatCompletionResponse = openAiSession.completions(chatCompletion);

    // 3. 处理结果
    chatCompletionResponse.getChoices().forEach(e -> {
        log.info("测试结果:{}", e.getMessage());
    });
}

解析

  • openAiSession.completions() 内部调用了 openAiApi.completions()
  • 返回 Single<ChatCompletionResponse>,自动转换为 ChatCompletionResponse

13.6 流式响应 - SSE 与 RxJava

文件: ApiTest.java

@Test
public void test_chat_completions_stream() throws JsonProcessingException, InterruptedException {
    // 1. 创建流式请求
    ChatCompletionRequest chatCompletion = ChatCompletionRequest
        .builder()
        .stream(true)  // 开启流式
        .messages(Collections.singletonList(
            Message.builder()
                .role(Constants.Role.USER)
                .content("写一个java冒泡排序")
                .build()
        ))
        .model(ChatCompletionRequest.Model.GPT_4.getCode())
        .build();

    // 2. 发起流式请求
    EventSource eventSource = openAiSession.chatCompletions(chatCompletion, new EventSourceListener() {
        @Override
        public void onEvent(EventSource eventSource, String id, String type, String data) {
            log.info("测试结果:{}", data);  // 流式数据陆续到达
        }
    });

    // 等待
    new CountDownLatch(1).await();
}

解析

  • 流式响应使用 EventSource(来自 OkHttp 的 SSE 支持)
  • EventSourceListeneronEvent 会被多次调用,每次收到一块数据
  • 这时 RxJava 的 Single 已经不够用了,需要用 Observable 或原生 SSE

13.7 核心流程图

┌─────────────────────────────────────────────────────────┐
│                      调用端                              │
│  openAiSession.completions(chatCompletion)              │
└─────────────────────┬───────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────┐
│                 DefaultOpenAiSession                     │
│  return openAiApi.completions(chatCompletion)          │
│  返回 Single<ChatCompletionResponse>                    │
└─────────────────────┬───────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────┐
│                      IOpenAiApi                         │
│  @POST("v1/chat/completions")                          │
│  Single<ChatCompletionResponse> completions(...)       │
└─────────────────────┬───────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────┐
│              Retrofit + RxJava2CallAdapter             │
│  OkHttp 执行网络请求                                    │
│  Jackson 解析 JSON                                     │
│  转换为 Single<ChatCompletionResponse>                  │
└─────────────────────┬───────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────┐
│                    OkHttpClient                         │
│  实际发送 HTTP 请求到 OpenAI API                       │
└─────────────────────────────────────────────────────────┘

13.8 为什么选择 Single?

选择 原因
Single 而非 Observable API 调用只返回一次结果,不需要发射多个数据
Single 而非 Call 可以用 RxJava 操作符(map、flatMap 等)
blockingGet() 简化测试代码,实际项目应用 subscribe()

13.9 扩展:如何改成纯异步

如果想让项目使用更标准的异步写法,可以这样改:

// 修改 IOpenAiSession 接口
public interface OpenAiSession {
    Observable<ChatCompletionResponse> completions(ChatCompletionRequest request);
}

// 使用
openAiSession.completions(request)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        response -> display(response),
        error -> handleError(error)
    );

附录:常用操作符速查表

创建类

操作符 说明
just() 直接发射指定数据
fromArray/Iterable() 从数组/列表创建
range() 发射范围整数
interval() 定时发射
empty/never/error() 特殊 Observable

转换类

操作符 说明
map() 转换每个数据
flatMap() 展开并合并
concatMap() 展开并串行合并
scan() 累积计算

过滤类

操作符 说明
filter() 过滤
distinct() 去重
take() 取前 N 个
skip() 跳过前 N 个
debounce() 防抖

组合类

操作符 说明
merge() 并行合并
concat() 串行连接
zip() 配对合并
combineLatest() 最新值组合

学习建议

  1. 先理解概念:观察者模式、发布-订阅模式
  2. 从简单开始:先学会 just()、subscribe()、map()
  3. 多动手实践:用 RxJava 改写现有代码
  4. 理解线程:subscribeOn() 和 observeOn() 是关键
  5. 进阶学习:背压、操作符原理

更多推荐