从基础 API 到复杂管道组合,一份文档覆盖 RAC 全部实战用法。


目录

从基础 API 到复杂管道组合,一份文档覆盖 RAC 全部实战用法。

第一章:基础 API

1.1 信号监听(RACObserve / KVO 替代)

1.2 通知(NSNotification)

1.3 代理(Delegate)

1.4 UI 控件绑定

UIButton 点击事件

UITextField 输入监听

手势识别

1.5 序列(Sequence)

1.6 生命周期信号

rac_willDeallocSignal — 对象释放时自动取消

rac_prepareForReuseSignal — Cell 复用时自动取消

1.7 定时器(Timer)

第二章:高阶函数

2.1 信号映射

map — 值变换(返回对象)

flattenMap — 值变换(返回信号)

map vs flattenMap

2.2 信号过滤

filter — 条件过滤

ignore — 忽略指定值

distinctUntilChanged — 去重(值不变不触发)

2.3 信号合并

combineLatest — 所有信号的最新值组合

combineLatest + reduce — 聚合成单一值

merge — 合流(谁发谁触发)

zip — 配对触发

合并方式对比

2.4 信号连接

concat — 顺序连接

then — 只要后续信号的结果

concat vs then

2.5 信号时间操作

timeout — 超时

delay — 延迟发送

throttle — 防抖

时间操作对比

2.6 取消与截断

takeUntil — 直到某信号发出时停止

take — 只取前 N 个值

takeLast — 只取最后 N 个值

截断方式对比

第三章:工作流场景

场景 1:串行依赖(flattenMap)

核心问题

流程图

代码

一句话总结

场景 2:并行请求 + 全部完成后合并(zip)

核心问题

流程图

代码

一句话总结

场景 3:竞速请求(merge + take:1)

核心问题

流程图

代码

一句话总结

场景 4:多条件联动(combineLatest + filter)

核心问题

流程图

代码

一句话总结

场景 5:错误重试 + 降级(retry + catch)

核心问题

流程图

代码

retry 注意事项

一句话总结

场景 6:节流 + 去重(throttle + distinctUntilChanged)

核心问题

流程图

代码

一句话总结

场景 7:超时处理(timeout + catch)

核心问题

流程图

代码

一句话总结

场景 8:事件合流(merge)

核心问题

流程图

代码

一句话总结

场景 9:状态机(scanWithStart:reduce:)

核心问题

流程图

代码

scan vs map

一句话总结

场景 10:复杂管道(串行 + 并行 + 降级 组合)

核心问题

流程图

代码

一句话总结

场景 11:冷热转换(multicast)

核心问题

解决方案

为什么用 RACReplaySubject?

一句话总结

场景 12:取消订阅(takeUntil)

核心问题

流程图

代码

一句话总结

场景 13:延迟与线程调度(delay + subscribeOn + deliverOn)

核心问题

流程图

代码

subscribeOn vs deliverOn

一句话总结

场景 14:信号开关(switchToLatest)

核心问题

流程图

代码

switchToLatest vs flattenMap

一句话总结

场景 15:顺序追加(concat)

核心问题

流程图

代码

concat vs merge vs zip

一句话总结

第四章:复杂管道组合

组合 A:串行 + 并行 + 聚合渲染

组合 B:并行 + 各自降级 + 聚合

组合 C:串行 + 超时 + 重试 + 最终降级

组合 D:并行竞速 + 超时兜底

组合 E:串行依赖 + 并行分支 + 条件降级 + 线程调度

组合 F:多级串行 + 中途分叉并行 + 合流继续串行

组合速查表

第五章:速查索引

按需求查算子

按分类查算子



第一章:基础 API

1.1 信号监听(RACObserve / KVO 替代)

一句代码替代 KVO,代码和功能在一起,简单易读。

// 监听 myString 属性变化
[RACObserve(self, myString) subscribeNext:^(id _Nullable x) {
    NSLog(@"myString 新值:%@", x);
}];
  • RACObserve(TARGET, KEYPATH):RAC 提供的宏定义,监听属性变化,返回 RACSignal 类型信号
  • subscribeNext::订阅信号,只有订阅后才能收到值的变化

1.2 通知(NSNotification)

// 监听通知
[[[NSNotificationCenter defaultCenter]
    rac_addObserverForName:@"NotificationName" object:nil]
    subscribeNext:^(NSNotification * _Nullable x) {
        NSLog(@"%@", x);
    }];

// 发送通知
[[NSNotificationCenter defaultCenter]
    postNotificationName:@"NotificationName"
                  object:@"999"
                userInfo:@{@"key": @"6666"}];

1.3 代理(Delegate)

// 将 delegate 方法转为信号
[[self rac_signalForSelector:@selector(textFieldDidBeginEditing:)
                fromProtocol:@protocol(UITextFieldDelegate)]
    subscribeNext:^(RACTuple * _Nullable x) {
        UITextField *field = [x first]; // RACTuple 元组取值
        NSLog(@"开始编辑:%@", field.text);
    }];
  • rac_signalForSelector:fromProtocol::将代理方法转为信号
  • 回调参数是 RACTuple 元组类型,用 [x first][x second] 等取值

1.4 UI 控件绑定

UIButton 点击事件
@weakify(self);
[[button rac_signalForControlEvents:UIControlEventTouchUpInside]
    subscribeNext:^(id x) {
        @strongify(self);
        NSLog(@"按钮被点击了");
    }];

MVVM 中的关键:通过 rac_signalForControlEvents 实现 UI 层向 ViewModel 层的数据传输,配合 RACObserve 实现双向绑定。

UITextField 输入监听
[self.textField.rac_textSignal subscribeNext:^(NSString * _Nullable x) {
    NSLog(@"当前输入:%@", x);
}];
手势识别
UITapGestureRecognizer *tap = [[UITapGestureRecognizer alloc] init];
[label addGestureRecognizer:tap];

[[tap rac_gestureSignal] subscribeNext:^(id x) {
    NSLog(@"Label 被点击");
}];

1.5 序列(Sequence)

集合类型通过 rac_sequence 属性转为序列后遍历。

// 数组遍历
NSArray *names = @[@"Niki", @"Pan", @"Ding"];
[names.rac_sequence.signal subscribeNext:^(id _Nullable x) {
    NSLog(@"%@", x); // 依次打印每个元素
}];

// 字典遍历
NSDictionary *dict = @{@"name": @"Niki", @"age": @"18"};
[dict.rac_sequence.signal subscribeNext:^(id _Nullable x) {
    RACTwoTuple *tuple = (RACTwoTuple *)x;
    NSLog(@"key = %@, value = %@", tuple[0], tuple[1]);
}];

1.6 生命周期信号

rac_willDeallocSignal — 对象释放时自动取消
// 通知监听绑定页面生命周期,dealloc 时自动停止
[[[[[NSNotificationCenter defaultCenter]
    rac_addObserverForName:TCSheetViewBlockUserNotification object:nil]
    takeUntil:self.rac_willDeallocSignal]
    deliverOnMainThread]
    subscribeNext:^(NSNotification *x) {
        // 处理通知
    }];
rac_prepareForReuseSignal — Cell 复用时自动取消
// Cell 中监听 model 变化,复用时自动取消旧的订阅
[[RACObserve(cellModel.data, title)
    takeUntil:self.rac_prepareForReuseSignal]
    subscribeNext:^(NSString *x) {
        self.label.text = x;
    }];

为什么需要:Cell 被复用时旧数据的监听不会自动停止,可能导致数据残留或内存泄漏。rac_prepareForReuseSignal 在 Cell 被复用时自动发出信号,配合 takeUntil 清理旧订阅。


1.7 定时器(Timer)

// 启动 60 秒倒计时
- (void)startTimerWithDuration:(NSInteger)seconds {
    [self.timerDisposable dispose]; // 释放之前的定时器
    __block NSInteger count = 0;
    @weakify(self);
    self.timerDisposable = [[[RACSignal
        interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]]
        take:seconds] // 限定次数
        subscribeNext:^(NSDate * _Nullable x) {
            @strongify(self);
            count++;
            if (count >= seconds) {
                [self.timerDisposable dispose];
            }
        }];
}

// 取消定时器
- (void)cancelTimer {
    [self.timerDisposable dispose];
    self.timerDisposable = nil;
}

第二章:高阶函数

2.1 信号映射

map — 值变换(返回对象)

将信号中的值转换为另一种类型。map 的 Block 必须返回对象类型

[[self.textField.rac_textSignal map:^id _Nullable(NSString * _Nullable value) {
    return @(value.length); // 字符串 → 长度
}] subscribeNext:^(id _Nullable x) {
    NSLog(@"字符串长度:%@", x);
}];
flattenMap — 值变换(返回信号)

将源信号的值映射成一个新信号,常用于串行依赖场景。

// 基础用法:和 map 类似,但返回的是信号
[[self.textField.rac_textSignal flattenMap:^RACSignal *(NSString * _Nullable value) {
    return [RACReturnSignal return:@(value.length)];
}] subscribeNext:^(id _Nullable x) {
    NSLog(@"字符串长度:%@", x);
}];

// 核心用法:串行依赖(前一步结果给下一步)
[[self login] flattenMap:^RACSignal *(NSString *token) {
    return [self fetchUserInfoWithToken:token]; // 用登录结果请求用户信息
}];
map vs flattenMap

API

返回值

场景

map

对象(id)

简单数据变换(格式转换、取字段)

flattenMap

信号(RACSignal)

串行依赖(A 结果 → B 请求)


2.2 信号过滤

filter — 条件过滤
// 只有输入长度 > 3 时才触发
[[self.textField.rac_textSignal filter:^BOOL(NSString *value) {
    return value.length > 3;
}] subscribeNext:^(id x) {
    NSLog(@"%@", x);
}];
ignore — 忽略指定值
// 忽略输入为 "n" 的信号
[[self.textField.rac_textSignal ignore:@"n"]
    subscribeNext:^(NSString *x) {
        NSLog(@"%@", x); // 输入 "n" 时这里不会执行
    }];
distinctUntilChanged — 去重(值不变不触发)
RACSubject *subject = [RACSubject subject];
[[subject distinctUntilChanged] subscribeNext:^(id x) {
    NSLog(@"%@", x); // 输出 a, b, c(第二次 b 被忽略)
}];

[subject sendNext:@"a"];
[subject sendNext:@"b"];
[subject sendNext:@"b"]; // 和上次相同,被忽略
[subject sendNext:@"c"];

常用场景:UI 刷新防抖——值发生变化时才刷新,避免重复渲染。


2.3 信号合并

combineLatest — 所有信号的最新值组合

必须每个信号都至少发送过一次 sendNext,才会触发合并信号。任一信号发新值时,重新组合所有信号的最新值。

RACSubject *s1 = [RACSubject subject];
RACSubject *s2 = [RACSubject subject];
RACSubject *s3 = [RACSubject subject];

[[RACSignal combineLatest:@[s1, s2, s3]]
    subscribeNext:^(RACTuple *x) {
        NSLog(@"%@", x); // 元组:(s1最新值, s2最新值, s3最新值)
    }];

[s1 sendNext:@"a"];
[s2 sendNext:@"1"];
[s3 sendNext:@"!"]; // 首次触发:(a, 1, !)
[s1 sendNext:@"b"]; // 触发:(b, 1, !)
combineLatest + reduce — 聚合成单一值
[[RACSignal combineLatest:@[s1, s2, s3]
    reduce:^(id v1, id v2, id v3) {
        return [NSString stringWithFormat:@"%@,%@,%@", v1, v2, v3];
    }]
    subscribeNext:^(id x) {
        NSLog(@"%@", x); // 输出:"a,1,!" "b,1,!" ...
    }];
merge — 合流(谁发谁触发)

多个信号合为一个,任意一个发值就触发,不做配对。

RACSignal *mergeSg = [RACSignal merge:@[s1, s2, s3]];

[mergeSg subscribeNext:^(id x) {
    NSLog(@"%@", x); // 每次 sendNext 都会触发,共 9 次
}];
zip — 配对触发

所有信号都发出值后才配对,按发送顺序一一配对。

RACSignal *zipSg = [RACSignal zip:@[s1, s2, s3]];

[zipSg subscribeNext:^(RACTuple *x) {
    NSLog(@"%@", x); // 三个信号各发一次后才触发
}];
合并方式对比

API

触发条件

输出

场景

combineLatest

每个信号至少发一次后,任一信号发新值

所有信号最新值的元组

多条件联动(表单校验)

merge

任一信号发值即触发

当前发值的那个值

多事件共享处理

zip

所有信号都发出值后配对

配对的元组

并行请求全部完成后合并


2.4 信号连接

concat — 顺序连接

前一个信号 completed 后,才订阅下一个信号。严格保证顺序

RACSignal *s1 = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    [subscriber sendNext:@"信号1"];
    [subscriber sendCompleted]; // 必须 completed 才会触发下一个
    return nil;
}];

RACSignal *s2 = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    [subscriber sendNext:@"信号2"];
    [subscriber sendCompleted];
    return nil;
}];

[[s1 concat:s2] subscribeNext:^(id x) {
    NSLog(@"%@", x); // 依次输出:信号1、信号2
}];
then — 只要后续信号的结果

前一个信号的值全部忽略,completed 后只返回 then 中的信号。

[[s2 then:^RACSignal *{
    return s1;
}] subscribeNext:^(id x) {
    NSLog(@"%@", x); // 只输出 "信号1",s2 的值被忽略
}];
concat vs then

API

前一个信号的值

后一个信号的值

场景

concat

✅ 保留

✅ 保留

分页加载(每页结果都要)

then

❌ 忽略

✅ 保留

只关心最终结果(登录→拿用户信息,只要用户信息)


2.5 信号时间操作

timeout — 超时

超过设定时间未收到值,自动发 error

RACSignal *signal = [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    // 延时 3 秒才发送(模拟慢接口)
    [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
        [subscriber sendNext:@"delay"];
        [subscriber sendCompleted];
    }];
    return nil;
}] timeout:2 onScheduler:[RACScheduler mainThreadScheduler]]; // 2 秒超时

[signal subscribeNext:^(id x) {
    NSLog(@"%@", x);
} error:^(NSError *error) {
    NSLog(@"超时了:%@", error); // 2 秒后触发(因为信号 3 秒才发)
}];
delay — 延迟发送

每个值延迟固定时间后再发出。

[[[self fetchData] delay:0.3] // 延迟 0.3 秒
    subscribeNext:^(id data) {
        [self showData:data];
    }];
throttle — 防抖

在一段"安静期"后才发出最新值,适合搜索输入场景。

[[self.searchField.rac_textSignal throttle:0.5] // 0.5 秒内无新输入才发出
    subscribeNext:^(NSString *keyword) {
        [self searchWithKeyword:keyword];
    }];
时间操作对比

API

行为

场景

timeout:

超时未收到值则 sendError

慢接口兜底

delay:

每个值延迟固定时间发出

等动画结束后刷新

throttle:

安静期后发出最新值

搜索防抖

sample:

采样信号发值时取源信号最新值

定时采样


2.6 取消与截断

takeUntil — 直到某信号发出时停止
RACSubject *cancel = [RACSubject subject];

[[someSignal takeUntil:cancel] subscribeNext:^(id x) {
    NSLog(@"%@", x);
}];

// 需要停止时
[cancel sendNext:@"stop"]; // 订阅立即结束
// 之后 someSignal 再发值也不会被接收
// 如需恢复,必须重新执行订阅代码
take — 只取前 N 个值
[[signal take:3] subscribeNext:^(id x) {
    NSLog(@"%@", x); // 只收前 3 个值,之后自动 completed
}];
takeLast — 只取最后 N 个值

注意:必须等信号 completed 后才能确定"最后 N 个"。

RACSignal *combined = [[loginSignal concat:detailSignal] takeLast:1];

[combined subscribeNext:^(id x) {
    NSLog(@"最后的结果:%@", x); // 只拿 detailSignal 的最后一个值
}];
截断方式对比

API

行为

场景

takeUntil:signal

收到指定信号时结束

页面生命周期绑定

take:N

收到 N 个值后结束

竞速取最快的(merge + take:1

takeLast:N

completed 后取最后 N 个值

只关心最终结果

takeUntilBlock:

满足条件时结束

条件停止


第三章:工作流场景

以下 15 个场景覆盖了 iOS 开发中最常见的 RAC 信号组合模式,每个场景包含:核心问题 → 流程图 → 代码 → 一句话总结


场景 1:串行依赖(flattenMap)

核心问题

登录成功后用 Token 请求用户信息,用户信息拿到后加载首页。三步必须按顺序串行,且前一步的结果是后一步的输入。

流程图
登录 ──token──→ 获取用户信息 ──user──→ 加载首页
     flattenMap           flattenMap
代码
@weakify(self);
[[[self login]
    flattenMap:^RACSignal *(NSString *token) {
        return [self fetchUserInfoWithToken:token]; // token → 用户信息
    }]
    flattenMap:^RACSignal *(TCUserProfile *user) {
        return [self loadHomePageWithUser:user]; // 用户信息 → 首页
    }]
    subscribeNext:^(id homeData) {
        @strongify(self);
        [self renderHomePage:homeData];
    } error:^(NSError *error) {
        @strongify(self);
        [self showError:error]; // 任一步失败都走这里
    }];
一句话总结

flattenMap 就是"流水线传送带"——前一步的产出自动喂给下一步,任一环节出错整条线停。


场景 2:并行请求 + 全部完成后合并(zip)

核心问题

首页需要同时请求用户信息和商品列表,两个请求独立并行,全部完成后才一次性渲染。

流程图
用户信息 ──────(1.2s)──→ ┐
                         ├── zip ──→ 渲染首页
商品列表 ──(0.8s)──────→ ┘
等较慢的那个完成后,打包结果一起给你
代码
RACSignal *userSignal = [self fetchUserInfo];
RACSignal *productsSignal = [self fetchProducts];

@weakify(self);
[[RACSignal zip:@[userSignal, productsSignal]]
    subscribeNext:^(RACTuple *tuple) {
        @strongify(self);
        RACTupleUnpack(TCUserProfile *user, NSArray *products) = tuple;
        [self renderWithUser:user products:products];
    }];
一句话总结

zip 就是"等齐了再走"——所有信号都完成后,把结果打包成元组一次性交付。


场景 3:竞速请求(merge + take:1)

核心问题

同时发起缓存读取和网络请求,谁先返回用谁,不用等另一个。

流程图
缓存 ──(50ms)──→ ✅ 先到,用这个!
网络 ──(800ms)──→ (忽略)
merge + take:1 = 取第一个到达的结果
代码
RACSignal *cache = [self readFromCache];
RACSignal *network = [self fetchFromNetwork];

@weakify(self);
[[[RACSignal merge:@[cache, network]] take:1]
    subscribeNext:^(id data) {
        @strongify(self);
        [self renderWithData:data]; // 谁先到用谁
    }];
一句话总结

merge + take:1 就是"赛跑取冠军"——多个信号同时出发,只要最快的那个结果。


场景 4:多条件联动(combineLatest + filter)

核心问题

首页加载依赖三个条件:登录完成 + 网络可用 + 权限获取。三个条件异步到达,全部满足时才触发加载,任一条件后续变化都重新检查。

流程图
登录状态:  ──❌──✅────────────✅──
网络状态:  ────────✅──❌──✅──────
权限状态:  ──────────✅──────✅──
组合判断:  ──────────────────✅──→ 加载首页
三个都亮绿灯的那一刻才触发
代码
@weakify(self);
[[[RACSignal
    combineLatest:@[
        RACObserve(self, isLoggedIn),
        RACObserve(self, isNetworkAvailable),
        RACObserve(self, isPermissionGranted)
    ]]
    filter:^BOOL(RACTuple *tuple) {
        return [tuple.first boolValue] && [tuple.second boolValue] && [tuple.third boolValue];
    }]
    flattenMap:^RACSignal *(RACTuple *tuple) {
        return [self loadHomePage];
    }]
    subscribeNext:^(id data) {
        @strongify(self);
        [self showHomePage:data];
    }];
一句话总结

combineLatest 就是"所有条件都亮绿灯才放行"——持续监听多个状态,全部满足时触发。


场景 5:错误重试 + 降级(retry + catch)

核心问题

网络请求可能偶发失败,需要自动重试;重试多次仍然失败,走降级方案(读缓存/默认值)。

流程图
请求 ──✗失败──→ 重试1 ──✗──→ 重试2 ──✗──→ 重试3
                                         ↓ 全部失败
                                    降级读缓存 ──→ 兜底结果
代码
[[[self fetchDataFromServer]
    retry:3]  // 失败后自动重新订阅,最多 3 次
    catch:^RACSignal *(NSError *error) {
        NSLog(@"主接口彻底失败: %@", error);
        return [self readFromCache]; // 兜底信号
    }]
    subscribeNext:^(id data) {
        [self renderWithData:data];
    }];
retry 注意事项
  • retry:N 表示最多重新订阅 N 次(总共执行 N 次)
  • 每次重试会重新执行信号创建闭包里的代码
  • 如果信号内部有状态(如请求计数),要注意闭包捕获
一句话总结

retry 是"再试几次",catch 是"实在不行就换个方案"——组合起来实现请求容错。


场景 6:节流 + 去重(throttle + distinctUntilChanged)

核心问题

搜索输入框:用户每敲一个字符就触发搜索太浪费。需要:

  1. 防抖:用户停止输入 0.5s 后才触发搜索
  2. 去重:最终文本和上次相同,不重复请求
流程图
输入: i─ip─iph─iphone─iphone─iphone16
       ↓(0.5s内无新输入才发出)
节流: ─────────────────iphone──iphone16
       ↓(和上次相同的值过滤掉)
去重: ─────────────────iphone──iphone16
代码
[[[self.searchField.rac_textSignal
    throttle:0.5]
    distinctUntilChanged]
    subscribeNext:^(NSString *keyword) {
        [self searchWithKeyword:keyword];
    }];
一句话总结

throttle + distinct 就是"等你说完了我再动,而且同样的话不重复做"——搜索输入的标准套路。


场景 7:超时处理(timeout + catch)

核心问题

某些接口可能响应很慢甚至无响应。设定时间上限,超时后自动走降级方案。

流程图
慢接口(5s) ──────────────────────→ (太晚了,没人要了)
超时(2s)   ────── ⏰timeout!
                       ↓
              降级快速请求 ──→ 兜底结果
代码
[[[self fetchSlowAPI]
    timeout:2.0 onScheduler:RACScheduler.mainThreadScheduler]
    catch:^RACSignal *(NSError *error) {
        if (error.code == RACSignalErrorTimedOut) {
            return [self fetchFallbackAPI];
        }
        return [RACSignal error:error];
    }]
    subscribeNext:^(id data) {
        [self renderWithData:data];
    }];
一句话总结

timeout 就是"我只等你 N 秒,过时不候"——给信号设截止日期,超时自动 sendError。


场景 8:事件合流(merge)

核心问题

登录、登出、Token 过期、强制刷新……多个不同事件都需要触发同一段逻辑

流程图
登录事件   ──🔔──────────────────────
Token过期  ────────🔔────────────────
登出事件   ──────────────🔔──────────
合流       ──🔄───🔄─────🔄──→ 刷新UI
代码
RACSignal *login = [[NSNotificationCenter defaultCenter]
    rac_addObserverForName:TCAccountLoginNotification object:nil];
RACSignal *logout = [[NSNotificationCenter defaultCenter]
    rac_addObserverForName:TCAccountLogoutNotification object:nil];
RACSignal *tokenExpired = [[NSNotificationCenter defaultCenter]
    rac_addObserverForName:TCTokenExpiredNotification object:nil];

@weakify(self);
[[RACSignal merge:@[login, logout, tokenExpired]]
    subscribeNext:^(id x) {
        @strongify(self);
        [self refreshPage];
    }];
一句话总结

merge 就是"多条水管汇入一个池子"——多个事件源共享同一个处理逻辑。


场景 9:状态机(scanWithStart:reduce:)

核心问题

每个新事件不是独立的,需要和之前的状态做运算后产生新状态(购物车累加、聊天消息列表追加、页码递增等)。

流程图
操作: +100  +50  -30  +200  -20
状态:  100  150  120   320  300
每个新事件 + 当前状态 → 输出新状态
代码
RACSubject *cartActions = [RACSubject subject];

[[cartActions
    scanWithStart:@(0) reduce:^id(NSNumber *total, NSString *action) {
        NSInteger delta = [action integerValue];
        return @(total.integerValue + delta);
    }]
    subscribeNext:^(NSNumber *newTotal) {
        self.totalPriceLabel.text = [NSString stringWithFormat:@"¥%@", newTotal];
    }];

[cartActions sendNext:@"+100"]; // 输出 100
[cartActions sendNext:@"+50"];  // 输出 150
[cartActions sendNext:@"-30"];  // 输出 120
scan vs map

API

有无状态

用途

map

无状态,每个值独立变换

数据格式转换

scanWithStart:reduce:

有状态,累积计算

购物车、计数器、列表追加

一句话总结

scanWithStart 就是 reduce 的流式版本——每来一个新值就和历史状态做运算,输出新状态。


场景 10:复杂管道(串行 + 并行 + 降级 组合)

核心问题

真实业务不会只有纯串行或纯并行,往往是混合的:先串行登录 → 然后并行请求多个接口(各自带超时/降级)→ 全部完成后串行渲染。

流程图
          ┌─用户信息(失败→降级为"游客")─┐
登录 ──→  ├                            ├──→ 渲染页面
          └─远程配置(2s超时)           ─┘
代码
@weakify(self);
RACSignal *pipeline = [[self login]
    flattenMap:^RACSignal *(NSString *token) {
        RACSignal *userSignal = [[self fetchUser]
            catch:^RACSignal *(NSError *e) {
                return [RACSignal return:@"游客"];
            }];
        
        RACSignal *configSignal = [[self fetchConfig]
            timeout:2.0 onScheduler:RACScheduler.mainThreadScheduler];
        
        return [RACSignal zip:@[userSignal, configSignal]];
    }];

[[pipeline
    flattenMap:^RACSignal *(RACTuple *tuple) {
        RACTupleUnpack(NSString *user, NSDictionary *config) = tuple;
        return [self renderPageWithUser:user config:config];
    }]
    subscribeNext:^(id result) {
        @strongify(self);
        [self showPage];
    } error:^(NSError *error) {
        [self showError:error];
    }];
一句话总结

复杂管道就是把 flattenMap(串行)、zip(并行)、catch(降级)、timeout(超时)像乐高一样组合——对应真实业务的完整请求链路。


场景 11:冷热转换(multicast)

核心问题

RAC 中信号默认是冷信号——每次被订阅都会重新执行信号内部代码。一个网络请求被 3 处订阅就会发 3 次请求。

// ⚠️ 冷信号:订阅几次就请求几次
[request subscribeNext:^(id x) { /* 更新UI */ }];   // 第1次请求
[request subscribeNext:^(id x) { /* 写日志 */ }];   // 第2次请求(重复!)
[request subscribeNext:^(id x) { /* 埋点 */ }];     // 第3次请求(重复!)
解决方案
RACMulticastConnection *conn = [request multicast:[RACReplaySubject subject]];

[conn.signal subscribeNext:^(id x) { /* 更新UI */ }];
[conn.signal subscribeNext:^(id x) { /* 写日志 */ }];
[conn.signal subscribeNext:^(id x) { /* 埋点 */ }];

[conn connect]; // 只执行 1 次网络请求,3 个订阅者共享结果
为什么用 RACReplaySubject?
时间线: ──[connect]──────[请求完成]──────────
订阅A:  subscribe ──────── 收到 ✅
订阅B:  subscribe ──────── 收到 ✅
订阅C:  ──────────── subscribe ── 也收到 ✅(Replay 回放)

普通 Subject 后来的订阅者会错过之前的值,ReplaySubject 会缓存并回放。

一句话总结

multicast 就是"一份工作多人看结果"——避免冷信号被多次订阅时重复执行副作用。


场景 12:取消订阅(takeUntil)

核心问题

长时间运行的订阅(轮询、监听通知、WebSocket)在页面退出时需要自动停止,否则会内存泄漏或产生野指针。

流程图
轮询: ──🔄──🔄──🔄──🔄──┐
取消信号:                🛑 → completed
takeUntil: 收到取消信号后自动 dispose
代码
// 方式1:页面 dealloc 时自动取消
[[[RACSignal interval:5.0 onScheduler:RACScheduler.mainThreadScheduler]
    takeUntil:self.rac_willDeallocSignal]
    subscribeNext:^(NSDate *date) {
        [self pollNewMessages];
    }];

// 方式2:手动发取消信号
RACSubject *cancelSignal = [RACSubject subject];

[[someSignal takeUntil:cancelSignal]
    subscribeNext:^(id x) { ... }];

// 需要取消时
[cancelSignal sendNext:@"stop"]; // 订阅立即结束
一句话总结

takeUntil 就是"直到某个信号出现就停"——最优雅的生命周期管理方式。


场景 13:延迟与线程调度(delay + subscribeOn + deliverOn)

核心问题
  • 耗时操作需要在子线程执行
  • UI 更新必须回到主线程
  • 某些操作需要延迟执行
流程图
子线程  ──[处理数据]──→ deliverOnMainThread
主线程                         ↓
         ←──delay(0.3s)──← [更新UI]
代码
[[[[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    // 子线程执行耗时操作
    UIImage *processed = [self heavyImageProcess:rawImage];
    [subscriber sendNext:processed];
    [subscriber sendCompleted];
    return nil;
}]
    subscribeOn:[RACScheduler schedulerWithPriority:RACSchedulerPriorityDefault]]
    deliverOnMainThread]
    subscribeNext:^(UIImage *image) {
        self.imageView.image = image; // 主线程更新 UI
    }];
subscribeOn vs deliverOn

API

影响范围

作用

subscribeOn:

信号创建侧(上游)

信号内部代码在哪个线程执行

deliverOn: / deliverOnMainThread

订阅侧(下游)

subscribeNext 回调在哪个线程

一句话总结

subscribeOn 管上游在哪干活,deliverOn 管下游在哪收货,delay 管什么时候送达。


场景 14:信号开关(switchToLatest)

核心问题

动态切换数据源。Tab 切换时,从"推荐"切到"关注",切换后旧源的数据应该被忽略。

流程图
开关: ──[缓存源]──────────[网络源]──────
缓存: ──商品A──商品B──商品C(忽略)───────
网络: ─────────────────最新X──最新Y────
输出: ──商品A──商品B────最新X──最新Y────
代码
RACSubject *dataSourceSwitch = [RACSubject subject];

[[dataSourceSwitch switchToLatest]
    subscribeNext:^(id data) {
        [self renderCell:data];
    }];

[dataSourceSwitch sendNext:self.recommendSignal]; // Tab 1
[dataSourceSwitch sendNext:self.followSignal];     // Tab 2(推荐的后续数据自动忽略)
switchToLatest vs flattenMap

API

行为

场景

switchToLatest

只监听最新发出的内部信号,旧的自动取消

Tab 切换

flattenMap + flatten

所有内部信号都监听(不取消旧的)

需要所有结果

一句话总结

switchToLatest 就是"电视换台"——切到新频道后,旧频道的内容自动不再接收。


场景 15:顺序追加(concat)

核心问题

分页加载:必须按顺序请求(第1页→第2页→第3页),前一页完成后才请求下一页。

流程图
第1页 ──completed──→ 第2页 ──completed──→ 第3页
  ↓next                ↓next                ↓next
[1,2,3]             [4,5,6]             [7,8,9]
concat: 顺序保证,前一个 completed 后才订阅下一个
代码
RACSignal *page1 = [self loadPage:1];
RACSignal *page2 = [self loadPage:2];
RACSignal *page3 = [self loadPage:3];

[[[page1 concat:page2] concat:page3]
    subscribeNext:^(NSArray *items) {
        [self.dataSource addObjectsFromArray:items];
        [self.tableView reloadData];
    } completed:^{
        NSLog(@"全部分页加载完成");
    }];
concat vs merge vs zip

API

顺序保证

触发时机

场景

concat

✅ 严格顺序

前一个 completed 后才订阅下一个

分页加载

merge

❌ 谁先到谁先

同时订阅所有

多事件合流

zip

❌ 配对触发

每个信号都发值后

并行请求合并

一句话总结

concat 就是"排队"——前一个信号 completed 后才轮到下一个,保证严格的先后顺序。


第四章:复杂管道组合

真实业务往往是串行、并行、降级、超时的混合体。以下是几种常见组合模式,可以像乐高一样拼接使用。


组合 A:串行 + 并行 + 聚合渲染

场景:先登录(串行),登录成功后同时拉取 3 个模块数据(并行),全部到齐后一次性渲染。

登录 ──→ ┌─ 商品列表 ─┐
         ├─ 购物车   ─┤──zip──→ 一次性渲染
         └─ 消息数   ─┘
@weakify(self);
[[[self login]
    flattenMap:^RACSignal *(NSString *token) {
        return [RACSignal zip:@[
            [self fetchProducts],
            [self fetchCart],
            [self fetchUnreadCount]
        ]];
    }]
    subscribeNext:^(RACTuple *tuple) {
        @strongify(self);
        RACTupleUnpack(NSArray *products, NSDictionary *cart, NSNumber *unread) = tuple;
        [self renderProducts:products cart:cart unreadCount:unread];
    }];

组合 B:并行 + 各自降级 + 聚合

场景:同时请求多个接口,每个接口独立降级(不因一个失败影响其他),全部到齐后渲染。

┌─ 用户信息(失败→"游客") ──────────────┐
├─ Banner(失败→默认Banner) ────────────┤──zip──→ 渲染
└─ 推荐(3s超时→热门兜底) ─────────────┘
RACSignal *userSignal = [[self fetchUserInfo]
    catch:^RACSignal *(NSError *e) {
        return [RACSignal return:@{@"name": @"游客", @"level": @"0"}];
    }];

RACSignal *bannerSignal = [[self fetchBanner]
    catch:^RACSignal *(NSError *e) {
        return [RACSignal return:[self defaultBannerData]];
    }];

RACSignal *recommendSignal = [[[self fetchRecommend]
    timeout:3.0 onScheduler:RACScheduler.mainThreadScheduler]
    catch:^RACSignal *(NSError *e) {
        return [self fetchHotItems];
    }];

@weakify(self);
[[RACSignal zip:@[userSignal, bannerSignal, recommendSignal]]
    subscribeNext:^(RACTuple *tuple) {
        @strongify(self);
        RACTupleUnpack(NSDictionary *user, NSArray *banners, NSArray *items) = tuple;
        [self renderHomeWithUser:user banners:banners items:items];
    }];

组合 C:串行 + 超时 + 重试 + 最终降级

场景:先获取 Token(串行),再用 Token 请求数据(带 2s 超时),失败后重试 2 次,仍失败走缓存兜底。

获取Token ──→ 请求数据(2s超时) ──✗→ 重试1 ──✗→ 重试2 ──✗→ 读缓存
@weakify(self);
[[[self fetchToken]
    flattenMap:^RACSignal *(NSString *token) {
        return [[[[self fetchDataWithToken:token]
            timeout:2.0 onScheduler:RACScheduler.mainThreadScheduler]
            retry:2]
            catch:^RACSignal *(NSError *error) {
                NSLog(@"全部失败,走缓存: %@", error);
                return [self readCachedData];
            }];
    }]
    subscribeNext:^(id data) {
        @strongify(self);
        [self renderWithData:data];
    }];

组合 D:并行竞速 + 超时兜底

场景:同时向 CDN 和源站发起请求,谁先回来用谁;但如果 1.5s 内都没回来,直接用本地缓存。

┌─ CDN请求  ──(0.3s)──→ ✅ 用这个
├─ 源站请求 ──(1.2s)──→ (忽略)
└─ 超时兜底 ──(1.5s)──→ (不需要了)
RACSignal *cdn = [self fetchFromCDN];
RACSignal *origin = [self fetchFromOrigin];

RACSignal *race = [[RACSignal merge:@[cdn, origin]] take:1];

@weakify(self);
[[[race timeout:1.5 onScheduler:RACScheduler.mainThreadScheduler]
    catch:^RACSignal *(NSError *error) {
        return [RACSignal return:[self localCacheData]];
    }]
    subscribeNext:^(id data) {
        @strongify(self);
        [self renderWithData:data];
    }];

组合 E:串行依赖 + 并行分支 + 条件降级 + 线程调度

场景:完整的首页加载链路——子线程登录 → 并行拉取 3 个接口(各自带独立超时和降级)→ 回主线程渲染 → 延迟 0.5s 上报埋点。

[子线程] 登录 ──→ ┌─ 用户(2s超时→游客) ────┐
                  ├─ 配置(1s超时→默认配置) ─┤──zip
                  └─ 商品(重试2次→空列表)  ─┘
                            ↓
              [主线程] 渲染页面 ──delay(0.5s)──→ 上报埋点
RACSignal *userSignal = [[[self fetchUserInfo]
    timeout:2.0 onScheduler:RACScheduler.mainThreadScheduler]
    catch:^(NSError *e) { return [RACSignal return:@{@"name": @"游客"}]; }];

RACSignal *configSignal = [[[self fetchConfig]
    timeout:1.0 onScheduler:RACScheduler.mainThreadScheduler]
    catch:^(NSError *e) { return [RACSignal return:[self defaultConfig]]; }];

RACSignal *productsSignal = [[[self fetchProducts]
    retry:2]
    catch:^(NSError *e) { return [RACSignal return:@[]]; }];

@weakify(self);
RACSignal *pipeline = [[[[[self login]
    subscribeOn:[RACScheduler schedulerWithPriority:RACSchedulerPriorityDefault]]
    flattenMap:^RACSignal *(NSString *token) {
        return [RACSignal zip:@[userSignal, configSignal, productsSignal]];
    }]
    deliverOnMainThread]
    flattenMap:^RACSignal *(RACTuple *tuple) {
        @strongify(self);
        RACTupleUnpack(NSDictionary *user, NSDictionary *config, NSArray *products) = tuple;
        [self renderWithUser:user config:config products:products];
        return [[RACSignal return:@YES] delay:0.5];
    }];

[pipeline subscribeNext:^(id x) {
    @strongify(self);
    [self reportPageLoadTrack];
}];

组合 F:多级串行 + 中途分叉并行 + 合流继续串行

场景:初始化 SDK → 串行获取配置 → 根据配置分叉并行加载 A/B 模块 → 合流后串行执行最终初始化。

初始化SDK ──→ 获取配置 ──→ ┌─ 模块A初始化 ─┐
                           └─ 模块B初始化 ─┘
                                  ↓ zip
                           最终初始化完成
@weakify(self);
[[[[self initSDK]
    flattenMap:^RACSignal *(id sdkResult) {
        return [self fetchRemoteConfig];
    }]
    flattenMap:^RACSignal *(NSDictionary *config) {
        RACSignal *moduleA = [self initModuleA:config[@"moduleA"]];
        RACSignal *moduleB = [self initModuleB:config[@"moduleB"]];
        return [RACSignal zip:@[moduleA, moduleB]];
    }]
    flattenMap:^RACSignal *(RACTuple *modules) {
        @strongify(self);
        return [self finalSetupWithModules:modules];
    }]
    subscribeCompleted:^{
        @strongify(self);
        NSLog(@"全部初始化完成");
        [self showMainPage];
    }];

组合速查表

组合模式

关键算子

典型场景

串行 + 并行 + 聚合

flattenMapzip

登录后并行拉取多接口

并行 + 各自降级

catch × N → zip

首页多模块独立容错

串行 + 超时 + 重试 + 降级

flattenMaptimeoutretrycatch

核心接口多重保障

并行竞速 + 超时兜底

merge + take:1timeoutcatch

CDN/源站竞速

串行 + 并行 + 线程调度 + 延迟

subscribeOnflattenMapzipdeliverOnMainThreaddelay

完整首页加载链路

串行 + 分叉并行 + 合流串行

flattenMapzipflattenMap

SDK 多模块初始化


第五章:速查索引

按需求查算子

#

需求

用这个

顺序保证

触发时机

场景

并行/串行

1

前一步结果给下一步

flattenMap

✅ 严格串行

前一个信号 completed 后

登录→获取用户信息→加载首页

串行

2

全部完成后合并

zip

❌ 不保证

每个信号都发出值后配对触发

同时请求用户信息+配置,全部到齐后渲染

并行

3

谁先完成用谁

merge + take:1

❌ 不保证

任一信号发出值时立即触发

缓存 vs 网络竞速,取最快的

并行(竞速)

4

多条件同时满足

combineLatest + filter

❌ 不保证

任一源发新值时重新组合

登录态+网络+权限全部就绪才加载

并行(持续监听)

5

失败重试

retry:

✅ 顺序重试

收到 error 后重新订阅

网络抖动自动重试

串行

6

重试仍失败走兜底

retry: + catch:

✅ 顺序

retry 耗尽后触发 catch

请求3次仍失败→读缓存

串行

7

搜索防抖

throttle:

✅ 保留最新

安静期结束后发出最新值

搜索框输入防抖

串行

8

值没变不重复触发

distinctUntilChanged

✅ 保持原序

值与上次不同时才转发

避免相同搜索词重复请求

串行

9

超时处理

timeout:onScheduler:

超过指定时间未收到值

慢接口兜底

串行

10

多事件共享处理

merge

❌ 不保证

任一信号发值即触发

登录/登出/Token过期统一刷新

并行(合流)

11

累积状态

scanWithStart:reduce:

✅ 保持原序

每个值到达时立即计算

购物车累加、消息列表追加

串行

12

避免重复执行副作用

multicast

connect 调用时执行一次

一次请求多处订阅共享

13

页面退出自动停

takeUntil:rac_willDeallocSignal

取消信号发值时结束

轮询/监听绑定页面生命周期

14

子线程处理

subscribeOn:

✅ 保持原序

订阅时立即切换线程

图片处理、JSON 解析放子线程

串行

15

回主线程更新UI

deliverOnMainThread

✅ 保持原序

值到达时切回主线程

子线程处理完后更新 UI

串行

16

延迟执行

delay:

✅ 保持原序

每个值延迟固定时间发出

等动画结束后刷新

串行

17

动态切换数据源

switchToLatest

❌ 只保证最新源

新信号发出时取消旧信号

Tab 切换、搜索结果替换

串行(切换)

18

按顺序追加

concat

✅ 严格顺序

前一个 completed 后才订阅下一个

分页加载

串行

19

条件过滤

filter:

✅ 保持原序

每个值到达时判断条件

输入长度>3才触发

串行

20

忽略指定值

ignore:

✅ 保持原序

每个值到达时判断

忽略特定输入

串行

21

值变换

map:

✅ 保持原序

每个值到达时变换

字符串→长度

串行

22

前一个完成后只要后续

then:

✅ 严格顺序

前一个 completed 后

只关心最终结果

串行

23

只取前N个值

take:

✅ 保持原序

收到 N 个值后 completed

竞速取冠军

串行

24

只取最后N个值

takeLast:

✅ 保持原序

信号 completed 后回放

只要最终结果

串行

25

KVO 属性监听

RACObserve()

✅ 保持原序

属性值变化时

替代 KVO

串行

26

Cell复用时取消

takeUntil:rac_prepareForReuseSignal

Cell 被复用时

Cell 中数据绑定

按分类查算子

分类

算子

一句话说明

映射

map

值 → 值(同步变换)

flattenMap

值 → 信号(异步串行)

过滤

filter

满足条件才放行

ignore

忽略指定值

distinctUntilChanged

值不变不触发

合并

combineLatest

所有源的最新值组合

merge

多源合一,谁发谁触发

zip

配对触发,等齐了再走

连接

concat

排队,前一个完了下一个

then

前一个完了只要后面的

截断

take:

只要前 N 个

takeLast:

只要最后 N 个

takeUntil:

收到信号就停

错误处理

retry:

失败自动重试

catch:

失败走兜底方案

时间

throttle:

防抖(安静期后发)

delay:

延迟发出

timeout:

超时报错

线程

subscribeOn:

上游在哪个线程干活

deliverOn:

下游在哪个线程收货

冷热

multicast

多订阅共享一次执行

开关

switchToLatest

切到新源,旧源自动停

状态

scanWithStart:reduce:

流式 reduce,累积计算

生命周期

rac_willDeallocSignal

对象释放时触发

rac_prepareForReuseSignal

Cell 复用时触发

UI 绑定

rac_textSignal

输入框文字变化信号

rac_signalForControlEvents:

控件事件信号

rac_gestureSignal

手势触发信号

RACObserve()

属性 KVO 信号

更多推荐