android RxJava2版本使用简介
一、库的引入GitHub下载地址:https://github.com/ReactiveX/RxJava目前最新版本如图:根据版本号,引入Rxjava库到我的项目里,如图:同时为了更好的兼容Android,我也引入了Rxandroid,github地址如下:https://github.com/ReactiveX/RxAndroid这样,Rxjava与R
一、库的引入
GitHub下载地址:https://github.com/ReactiveX/RxJava
目前最新版本如图:
根据版本号,引入Rxjava库到我的项目里,如图:
同时为了更好的兼容Android,我也引入了Rxandroid,github地址如下:
https://github.com/ReactiveX/RxAndroid
这样,Rxjava与Rxandriod库的引入以告大吉,接下来就开始使用了。
二、基本使用方法
RxJava使用的是观察者模式。是由:
观察者:监视着被观察者,当被观察者发生变化时通知观察者,然后观察者执行相应的操作;
被观察者:被监视的对象,当某个状态改变时告诉观察者;
订阅(或注册、关联):将观察者与被观察者建立联系。
它三者的关系就好比一个Button的点击事件:
观察者:OnClickListener;
被观察者:Button;
订阅(或注册):setOnClickListener();
而将其对应到RxJava的对象为:
观察者:Observer;
被观察者:Observable;
订阅(活注册):subscribe();
创建方法,以简单打印字符串为例展开:
方法一:Create
示例代码如下:
public void create_one(View view){
//创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) { //最先回调,没有执行onNext、onComplete、onError也会回调
// d.dispose();//移除订阅关系,执行该方法后,下面的onNext、onError、onComplete都不会执行。
boolean disposed = d.isDisposed();//判断是否取消了订阅关系,为真就是没有订阅,假就是订阅中
Log.d(TAG, "onSubscribe:" + d.toString()+";disposed值为:"+disposed);
}
@Override
public void onNext(String s) {//被观察者调用onNext时,这里就会回调
Log.d(TAG, "onNext:" + s);
}
@Override
public void onError(Throwable e) {//发送错误时调用
Log.d(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {//数据接收完成时调用
Log.d(TAG, "onComplete:");
}
};
//创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//e只有三个方法onNext、onError、onComplete
e.onNext("Hello");//发送数据
// e.onError(new Exception("error"));//发送出错
e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,Observer的onComplete才会执行,onError同理。
}
});
//订阅,管着观察者与被观察者
observable.subscribe(observer);
}
打印日志输出如下:
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onSubscribe:null;disposed值为:false
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onNext:Hello
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onComplete:
方法二:Create
示例代码如下:
public void create_two(View view){
//创建被观察者,同方法一
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//e只有三个方法onNext、onError、onComplete
e.onNext("Hello 2");//发送数据
e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,observable添加的Action才会执行。
//e.onError(new Exception("error"));//发送出错,手动调用这个方法后,observable添加的Consumer<Throwable>才会执行。
}
});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:s值为:" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "accept:throwable值为:" + throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "run工作");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG,"disposable值为:"+disposable.isDisposed());
// disposable.dispose();//移除订阅关系,执行该方法后,上面的Consumer<String>、Consumer<Throwable>、Action都不会执行。
}
});
}
打印日志输出如下:
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: disposable值为:false
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为:Hello 2
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: run工作
方法三:just
示例代码如下:
/**
* 方法三:just
*/
public void just(View view){
//生成被观察者
Observable<String> observable = Observable.just("just1", "just2");
//定义观察者,包含订阅
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {//这个accept就等于观察者的onNext
Log.e(TAG, "accept:s值为" + s);
}
});
}
执行后打印日志如下:
12-26 11:02:00.577 16781-16781/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为just1
12-26 11:02:00.578 16781-16781/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为just2
方法四:fromArray
示例代码如下:
public void fromArray(View view){
//生成被观察者
Observable<String> observable = Observable.fromArray("from1", "from2", "from3");
//定义观察者,包含订阅
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:s值为" + s);
}
});
}
打印日志如下:
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from1
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from2
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from3
方法五:fromCallable
示例代码如下:
public void fromCallable(View view){
//生成被观察者
Observable<String> observable = Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "fromCallable";
}
});
//定义观察者,包含订阅
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:s值为" + s);
}
});
}
打印日志如下:
12-26 11:15:21.169 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为fromCallable
方法列表如下:
三、调度器Scheduler与线程控制
调度器种类:
常用的是Schedulers.io()进行耗时操作、AndroidSchedulers.mainThread()更新ui.
1、Schedulers.immediate();
直接在当前线程运行,相当于不指定线程,默认的Scheduler.
2、Schedulers.newThread();
启动新现成,在新的线程中执行操作。
3、Schedulers.io();
I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler,行为模式和newThread()差不多,区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io()比newThread()更有效率。不要把计算工作放在io(),可以避免创建不必要的线程。
4、Schedulers.computation();
计算所使用的Scheduler。这个计算是指Cpu密集型计算,即不会被I/O等操作限制性的操作,例如图形的计算。这个Sheduler使用的是固定的线程池,大小为Cpu核数。不要把I/O放在computation中,否则I/O操作等待时间会浪费cpu。用于计算任务,如事件循环和回调处理,不要用于IO操作,默认线程数等于处理器的数量。
5、Schedulsers.from(executor)
使用指定的Executor作为调度器。
6. Schedulers.trampoline()
当其它排队的任务完成后,在当前线程排队开始执行
7. AndroidSchedulers.mainThread()
在RxAndroid中,他指定操作将在Android主线程中执行。
指定线程:
1、observerOn(Schedulers)
指定观察者Observer在哪个线程执行
2、subscribeOn(Scheduler)
指定被观察者Observable在哪个线程执行
线程多次随意切换:
observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
示例代码如下:
public void schedulers(View view){
Observable.just(1,2,2)//创建被观察者
.subscribeOn(Schedulers.io())//指定被观察者运行在io线程
.observeOn(AndroidSchedulers.mainThread())//指定下面的观察者运行在主线程中
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"integer的值为:"+integer.intValue());
}
});
}
代码运行结果如下:
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:1
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2
除了将这些调度器传递给RxJava的Observable操作符,你也可以用它们调度你自己的任务。例如:Scheduler.Worker
连接地址:http://wiki.jikexueyuan.com/project/rx-docs/Scheduler.html
四、操作符,比较复杂也非常强大
操作符理解为可以控制流程的方法。
1、操作符的分类
2、变换操作符
变换操作符是用来变换类型的。
种类如下:
map操作符,后面的Function有两个泛型参数,第一个是输入类型,第二个是转换后输出返回的类型,例如下面的示例程序,apply()方法要返回的是第二个参数的类型。
class User{
private String name;
private String password;
public User(String name, String password) {
this.name = name;
this.password = password;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
//模拟网络登录
private boolean login(User user){
if(user.getName().equals("liming")&&user.getPassword().equals("123")){
return true;
}
return false;
}
/**
* 操作符map
*/
public void operator_map(View view){
User user=new User("liming","123");
Observable.just(user)
.map(new Function<User, Boolean>() {//操作符map将User转换为需要的结果boolean
@Override
public Boolean apply(User user) throws Exception {
return login(user);//进行网络登录
}
}).subscribeOn(Schedulers.io())//网络登录需要在io中操作
.observeOn(AndroidSchedulers.mainThread())//更新ui,需要在主线程中操作
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean isLogin) throws Exception {
Log.e(TAG,"登录是否成功:"+isLogin);
}
});
}
打印日志输出如下:
12-26 15:17:11.509 14096-14096/demo.face.comi.io.rxjavademo E/MainActivity: 登录是否成功:true
比Map要强大,可以做到map不能操作的事,可以递级处理数据,然后传递数据,例如下面的例子,先用户登录,然后获取用户特征值,示例代码如下:(注意所使用User类与login方法同上)
/**
* 模拟根据用户是否登录成功,返回用户的轮廓
*/
private UserProfile profile(boolean b){
if(b){
return new UserProfile("漂亮");
}else{
return new UserProfile("好丑");
}
}
/**
* 操作符-flatmap
* @param view
*/
public void operator_flatmap(View view){
User user=new User("xiaobao","123456");
Observable.just(user).flatMap(new Function<User, ObservableSource<Boolean>>() {
@Override
public ObservableSource<Boolean> apply(User user) throws Exception {
return Observable.just(login(user));//用户登录
}
}).flatMap(new Function<Boolean, ObservableSource<UserProfile>>() {
@Override
public ObservableSource<UserProfile> apply(Boolean aBoolean) throws Exception {
return Observable.just(profile(aBoolean));//获取用户特征
}
}).subscribeOn(Schedulers.io())//控制被监控对象在io线程中
.observeOn(AndroidSchedulers.mainThread())//监控在主线程中
.subscribe(new Consumer<UserProfile>() {
@Override
public void accept(UserProfile userProfile) throws Exception {//返回用户特征
//相当于onNext
Log.e(TAG, "userProfile值为:" + userProfile.profile);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//onError
}
}, new Action() {
@Override
public void run() throws Exception {
//onComplete
}
});
}
打印日志如下:
12-26 15:57:28.089 19512-19512/demo.face.comi.io.rxjavademo E/MainActivity: userProfile值为:好丑
buffer操作符:
缓存发射的数据为一个list,到观察者的时候参数就是一个list。
示例代码如下:
public void operator_buffer(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0;i<10;i++){
e.onNext(i+"");
}
e.onComplete();
}
}).buffer(500, TimeUnit.MILLISECONDS)//缓冲500毫秒内发射的数据
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
//这里strings为list,就是缓冲了500毫秒内发射的String
Log.e(TAG,"accept的数据大小为:"+strings.size()+";数据为:"+strings.toString());
}
});
}
打印日志如下:
12-26 16:21:27.925 22722-22722/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据大小为:10;数据为:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
3、过滤操作符
debounce操作符:
被观察者连续发射的数据的时间间隔 如果在指定时间 就被过滤拦截。
示例如下:
public void operator_debounce(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
if(e.isDisposed()) return;
try{
for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
e.onNext(i);//发射数据
Thread.sleep(100*i);
}
}catch (Exception error){
e.onError(error);
}
}
}).subscribeOn(Schedulers.computation())
.debounce(500,TimeUnit.MILLISECONDS)//如果发射数据间隔小于500就被过滤拦截掉
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept的数据为:" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG,"出错:"+throwable.toString());
}
});
}
打印日志如下:
12-26 16:37:23.194 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:6
12-26 16:37:23.795 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:37:24.494 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:37:25.295 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9
filter操作符:
过滤数据,返回真即使满足条件,不拦截,否者拦截,观察者接收不到。
示例代码如下:
public void operator_filter(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
if(e.isDisposed()) return;
try{
for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
e.onNext(i);//发射数据
Thread.sleep(100*i);
}
e.onComplete();
}catch (Exception error){
e.onError(error);
}
}
}).subscribeOn(Schedulers.computation())
.debounce(500,TimeUnit.MILLISECONDS)
.filter(new Predicate<Integer>() {//在debounce的基础上加过滤,必须大于6
@Override
public boolean test(Integer integer) throws Exception {
return integer>6;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept的数据为:" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG,"出错:"+throwable.toString());
}
});
}
打印日志如下:
12-26 16:45:25.356 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:45:26.056 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:45:26.856 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9
take操作符
示例代码如下:
public void operator_take(View view){
Observable.just(1,2,3,4,5,6,7)
.take(3)//发射前3个数据
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer){
Log.e(TAG,"onNext的数据为:"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
打印日志如下:
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:1
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:2
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:3
过滤操作符实际使用案例:防止点击按钮连续点击、搜索引擎过滤,避免搜索字段的变化连续请求网络。以下为按钮防连续点击为例,这个功能需要集成RxBinding库。
示例代码如下:
RxView.clicks(btn_operator_filterLivingExample)
.throttleFirst(2,TimeUnit.SECONDS)//2秒内的点击只拿第一个,他的都过滤调
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG,"被点击了。。。");
}
});
4、组合操作符
startWith操作符
举例如下:在abc之前插入一个d,所以d是最先发出去的数据。
public void operator_startWith(View view){
Observable.just("a","b","c")
.startWith("d")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG,"accept:"+s);
}
});
}
打印日志如下:
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:d
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:a
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:b
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:c
merge操作符:
合并观察者
示例代码如下:
public void operator_merge(View view){
Observable<String> o1 = Observable.just("a","b","c");
Observable<String> o2 = Observable.just("d","e","f");
Observable.merge(o1,o2).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}
打印日志如下:
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: d
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: e
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: f
combineLatest操作符
当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
示例代码如下:
public void operator_combineLatest(View view){
Observable<String> o1 = Observable.just("a","b","c");
Observable<String> o2 = Observable.just("d","e","f");
//第一个String就是o1 最后的数据,第二个String是o2的每个数据源,第三个String是结合之后返回的类型
Observable.combineLatest(o1, o2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
//s就是o1 最后的数据,s2 就是o2的每个数据源
return s+s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}
打印日志如下:
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: cd
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: ce
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: cf
组合操作符实例:
利用combineLatest实现注册的时候所有输入信息(姓名、邮箱、年龄等)合法后才点亮注册按钮,示例代码如下:
public void operator_combineDemo(View view){
//skip过滤操作符,这里是过滤掉第一个字符的意思,也就是第一个字符不算数。
Observable<CharSequence> observableName = RxTextView.textChanges(etName).skip(1);
Observable<CharSequence> observableEmail = RxTextView.textChanges(etEmail).skip(1);
Observable<CharSequence> observableAge= RxTextView.textChanges(etAge).skip(1);
Observable.combineLatest(observableName, observableEmail, observableAge, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean apply(CharSequence name, CharSequence email, CharSequence age) throws Exception {
//当输入框name长度大于3,email长度大于5,age长度大于0,这些长度都是在过滤掉第一个字符的前提下。只有这些条件都满足下才返回true
return name.length()>3&&email.length()>5&&age.length()>0;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
if(aBoolean){//返回true才能让注册按钮可以被点击
Log.e(TAG,"验证通过,按钮可以点击了");
}
}
});
}
5、错误处理操作符
onErrorReturn操作符
让Observable遇到错误时发射一个特殊的项并且正常终止,onErrorRetrun能够捕获在它之前发生的异常,它之后流中的操作发生的异常就它就不会管了。
public void operator_onErrorReturn(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0;i<=4;i++){
if(i==2){
e.onError(new Throwable("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
Log.e(TAG, "在onErrorReturn处理了: "+throwable.toString() );
return "10";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}
打印日志如下:
12-27 13:51:53.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:51:54.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 在onErrorReturn处理了: java.lang.Throwable: 出现错误了
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 10
onErrorRsumeNext操作符
和onErrorNext不同的是,onErrorResumeNext是返回一个重新定义的Observable,onErrorNext返回的是发射的数据。
注意onErrorResumeNext拦截的错误是Throwable,不能拦截Exception。 不然它会将错误传递给观察者的onError方法。要拦截Exception请用onExceptionResumeNext。
示例代码如下:
public void operator_onErrorResumeNext(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
//这里是Throwable
e.onError(new Throwable("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
//拦截到错误之后,重新定义了被观察者
return Observable.just("重新定义了被观察者");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}
打印日志如下:
12-27 13:58:32.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:58:33.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:58:34.697 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 重新定义了被观察者
onExceptionResumeNext操作符
onExceptionResumeNext 和 onErrorResumeNext基本一样,也是收到错误重新定义了新的被观察者。但是有一点不用: 如果onErrorResumeNext收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,onExceptionResumeNext则会继续拦截。
注意onExceptionResumeNext拦截的错误是Exception,不能拦截Throwable。 不然它会将错误传递给观察者的onError方法。要拦截Throwable请用onErrorResumeNext。
示例代码如下:
public void operator_onExceptionResumeNext(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
//注意这里是Exception
e.onError(new Exception("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.onExceptionResumeNext(new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("错误替换的消息");
observer.onComplete();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}
输出结果如下:
12-27 14:10:10.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:10:11.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:10:12.215 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 错误替换的消息
retry操作符
重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截
它有五种参数方法:
- retry(): 让被观察者重新发射数据,要是一直错误就一直发送了
- retry(BiPredicate): interger是第几次重新发送,Throwable是错误的内容
- retry(long time): 最多让被观察者重新发射数据多少次
- retry(long time,Predicate predicate): 最多让被观察者重新发射数据多少次,在predicate里面进行判断拦截 返回是否继续
- retry(Predicate predicate): 在predicate里面进行判断拦截 返回是否继续
public void operator_retry(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
e.onError(new Exception("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.retry(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
Log.e(TAG, "retry错误: "+throwable.toString());
//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})
// .retry(new BiPredicate<Integer, Throwable>() {
// @Override
// public boolean test(Integer integer, Throwable throwable) throws Exception {
// Log.e(TAG, "retry错误: "+integer+" "+throwable.toString());
//
// //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
// //返回真就是让被观察者重新发射请求
// return true;
// }
// })
// .retry(3)//最多让被观察者重新发射数据3次
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}
打印日志如下:
12-27 14:27:06.456 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:07.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:08.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:08.459 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:09.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:10.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:10.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:11.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:12.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
......
......
一直这样循环下去
retryWhen操作符
retryWhen和retry类似,区别是:retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
这里如果里面的throwableObservable不进行处理,那么onNext也会拦截处理,这里有个坑。
示例代码如下
public void operator_retryWhen(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i <= 3; i++) {
if (i == 2) {
e.onError(new Exception("出现错误了"));
} else {
e.onNext(i + "");
}
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
//这里可以发送新的被观察者 Observable
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
//如果发射的onError就终止
return Observable.error(new Throwable("retryWhen终止啦"));
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}
日志输出如下:
12-27 14:45:55.551 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:45:56.551 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:45:57.554 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 结果错误: java.lang.Throwable: retryWhen终止啦
6、辅助操作符
delay操作符
整体延迟一段指定的时间再发射来自Observable的发射物。就是延迟。
它有6种方法参数:
- delay(Function):
- delay(long delay,TimeUnit unit): 指定延迟多长时间
- delay(long delay,TimeUnit unit,mScheduler scheduler): 指定延迟多长时间并添加调度器
- delay(long delay,TimeUnit unit,boolean delayError): 指定延迟多长时间。delayError参数如果为假 就直接抛出onError,为真就如常延迟执行。
- delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
- delay(ObservableSource ,Function):
public void operator_delay(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0; i<=3 ;i++){
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
e.onComplete();
}
})
.delay(3000, TimeUnit.MILLISECONDS)
//delayError参数如果为假就直接抛出onError,为真就如常延迟执行
// .delay(3000,TimeUnit.MILLISECONDS,true)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "收到错误: " + throwable.toString());
}
});
}
隔了三秒输出如下日志:
12-27 15:28:10.560 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 2
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 3
delaySubscription操作符
该操作符跟delay的差别就是:delaySubscription只做一件事,延迟订阅。
do操作符
do操作符有很多个,就相当于生命周期。例如doOnNext在onNext的时候回调。
例如下面的例子:
public void operator_do(View view){
Observable.just("1","2")
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG,"doOnNext:"+s);
}
})
.doAfterNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG,"doAfterNext:"+s);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doOnComplete:");
}
})
.doOnSubscribe(new Consumer<Disposable>() {//订阅之后回调的方法
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG,"doOnSubscribe");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doAfterTerminate");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doFinally:");
}
})
//Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
.doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> stringNotification) throws Exception {
Log.e(TAG, "doOnEach: "+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError"));
}
})
//订阅后可以进行取消订阅
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnLifecycle: "+disposable.isDisposed());
//disposable.dispose();
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnLifecycle run: ");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
});
}
执行结果如下:
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnSubscribe
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnLifecycle: false
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnNext:1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onNext
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterNext:1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnNext:2
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onNext
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 2
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterNext:2
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnComplete:
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onComplete
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doFinally:
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterTerminate
materialize操作符
materialize把 被观察者Observable转换为Notification通知对象。dematerialize相反了。 注意用了materialize之后,onNext会回调多了一个数据,因为onComplete也回调到这里了。
示例代码如下
public void operator_materialize(View view){
Observable.just("1","2")
.materialize()
.subscribe(new Consumer<Notification<String>>() {
@Override
public void accept(@NonNull Notification<String> stringNotification) throws Exception {
//这时候的数据就是一个Notification对象了
Log.e(TAG, (stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+": "+stringNotification.getValue());
}
});
}
日志输出如下:
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 1
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 2
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onComplete: null
TimeInterval操作符
获取数据发送的时间间隔,就是把数据转换为数据发送的间隔Timed。
有4个参数方法:
- timeInterval(): 转换为时间Timed,默认时间单位为毫秒
- timeInterval(Scheduler): 转换为时间Timed,可以设置调度器
- timeInterval(TimeUnit): 转换为时间Timed,可以设置时间单位
- timeInterval(TimeUnit,Scheduler): 转换为时间Timed,可以设置时间单位和调度器
public void operator_timeinterval(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i=0; i<3; i++){
e.onNext(i+"");
Thread.sleep(1000);
}
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.timeInterval()
.subscribe(new Consumer<Timed<String>>() {
@Override
public void accept(@NonNull Timed<String> stringTimed) throws Exception {
Log.e(TAG, "accept: "+stringTimed.time());
}
});
}
日志输出如下:
12-27 16:55:18.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 1001
12-27 16:55:19.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 1000
timestamp操作符
public void operator_timestamp(View view){
Observable.just("a","b")
.timestamp()
.subscribe(new Consumer<Timed<String>>() {
@Override
public void accept(@NonNull Timed<String> stringTimed) throws Exception {
//转换时间
String date = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss", Locale.CHINA)
.format(new Date(stringTimed.time()));
Log.e(TAG, "accept: "+date);
}
});
}
日志输出如下:
12-27 16:59:40.261 23241-23241/demo.face.comi.io.rxjavademo E/MainActivity: accept: 2017-12-27 04:59:40
给定多个Observable,只让第一个发射数据的Observable发射全部数据。
ambWith和ambArray差不多,Observable.amb(o1,o2)和o1.ambWith(o2)是一样的效果。
有两个参数方法:
- amb(Iterable);
- ambArray();
public void operator_amb(View view){
Observable o1 = Observable.just("a","b","c").delay(1000, TimeUnit.MILLISECONDS);
Observable o2 = Observable.just("d","e","f");
Observable.ambArray(o1,o2).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
Log.e(TAG, "accept: "+o);
}
});
}
输出结果如下:
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
public void operator_defaultIfEmpty(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
}).defaultIfEmpty("默认数据")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}
输入日志如下:
12-27 17:13:44.928 24752-24752/demo.face.comi.io.rxjavademo E/MainActivity: accept: 默认数据
switchIfEmpty操作符
public void operator_switchIfEmpty(View view){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
}).switchIfEmpty(Observable.just("a","b","c"))
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
}
执行结果如下:
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
skipUntil操作符
//丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
public void operator_skipUntil(View view) {
//skipUntil里面的Observable发射了之后,原始的Observable每隔一秒循环发射的数据才开始被接收到
Observable.interval(1, TimeUnit.SECONDS)
.skipUntil(Observable.just("1")).delay(5,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept:" + aLong);
}
});
}
直到等待5秒后才开始输出如下日志:
12-27 17:42:22.107 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:1
12-27 17:42:23.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:2
12-27 17:42:24.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:3
12-27 17:42:25.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:4
12-27 17:42:26.107 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:5
public void operator_skipWhile(View view){
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(Schedulers.newThread())
.skipWhile(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
return aLong < 5;
//返回假,原始的Observable发射的数据才可以接收到
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);
}
});
}
输出日志如下:
12-27 17:46:26.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 5
12-27 17:46:27.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 6
12-27 17:46:28.342 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 7
12-27 17:46:29.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 8
12-27 17:46:30.342 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 9
takeUntil() / takeWhile() / takeWhileWithIndex()操作符
8、布尔操作符
更多推荐
所有评论(0)