一、库的引入

GitHub下载地址:https://github.com/ReactiveX/RxJava

目前最新版本如图:


根据版本号,引入Rxjava库到我的项目里,如图:


同时为了更好的兼容Android,我也引入了Rxandroid,github地址如下:

https://github.com/ReactiveX/RxAndroid

这样,Rxjava与Rxandriod库的引入以告大吉,接下来就开始使用了。

二、基本使用方法

RxJava使用的是观察者模式。是由:

观察者:监视着被观察者,当被观察者发生变化时通知观察者,然后观察者执行相应的操作;

被观察者:被监视的对象,当某个状态改变时告诉观察者;

订阅(或注册、关联):将观察者与被观察者建立联系。

它三者的关系就好比一个Button的点击事件:

观察者:OnClickListener;

被观察者:Button;

订阅(或注册):setOnClickListener();

而将其对应到RxJava的对象为:

观察者:Observer;

被观察者:Observable;

订阅(活注册):subscribe();

创建方法,以简单打印字符串为例展开:

方法一:Create 

示例代码如下:

public void create_one(View view){
        //创建观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) { //最先回调,没有执行onNext、onComplete、onError也会回调
//                d.dispose();//移除订阅关系,执行该方法后,下面的onNext、onError、onComplete都不会执行。
                boolean disposed = d.isDisposed();//判断是否取消了订阅关系,为真就是没有订阅,假就是订阅中
                Log.d(TAG, "onSubscribe:" + d.toString()+";disposed值为:"+disposed);
            }

            @Override
            public void onNext(String s) {//被观察者调用onNext时,这里就会回调
                Log.d(TAG, "onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {//发送错误时调用
                Log.d(TAG, "onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {//数据接收完成时调用
                Log.d(TAG, "onComplete:");
            }
        };

        //创建被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //e只有三个方法onNext、onError、onComplete
                e.onNext("Hello");//发送数据
//                e.onError(new Exception("error"));//发送出错
                e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,Observer的onComplete才会执行,onError同理。
            }
        });

        //订阅,管着观察者与被观察者
        observable.subscribe(observer);
    }
打印日志输出如下:

12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onSubscribe:null;disposed值为:false
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onNext:Hello
12-26 10:01:53.815 5128-5128/demo.face.comi.io.rxjavademo D/MainActivity: onComplete:


方法二:Create

示例代码如下:

public void create_two(View view){
        //创建被观察者,同方法一
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //e只有三个方法onNext、onError、onComplete
                e.onNext("Hello 2");//发送数据
                e.onComplete();//发送完成。这个方法与OnError只能执行一个,谁在前就执行谁。手动调用这个方法,observable添加的Action才会执行。
                //e.onError(new Exception("error"));//发送出错,手动调用这个方法后,observable添加的Consumer<Throwable>才会执行。
            }
        });
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept:s值为:" + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "accept:throwable值为:" + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "run工作");
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.e(TAG,"disposable值为:"+disposable.isDisposed());
//                disposable.dispose();//移除订阅关系,执行该方法后,上面的Consumer<String>、Consumer<Throwable>、Action都不会执行。
            }
        });
    }

打印日志输出如下:

12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: disposable值为:false
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为:Hello 2
12-26 10:47:33.679 14442-14442/demo.face.comi.io.rxjavademo E/MainActivity: run工作

方法三:just

示例代码如下:

 /**
     * 方法三:just
     */
    public void just(View view){
        //生成被观察者
        Observable<String> observable = Observable.just("just1", "just2");
        //定义观察者,包含订阅
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {//这个accept就等于观察者的onNext
                Log.e(TAG, "accept:s值为" + s);
            }
        });
    }
执行后打印日志如下:
12-26 11:02:00.577 16781-16781/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为just1
12-26 11:02:00.578 16781-16781/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为just2

方法四:fromArray

示例代码如下:

 public void fromArray(View view){
        //生成被观察者
        Observable<String> observable = Observable.fromArray("from1", "from2", "from3");
        //定义观察者,包含订阅
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept:s值为" + s);
            }
        });
    }
打印日志如下:

12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from1
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from2
12-26 11:12:50.160 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为from3


方法五:fromCallable

示例代码如下:

public void fromCallable(View view){
        //生成被观察者
        Observable<String> observable = Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "fromCallable";
            }
        });
        //定义观察者,包含订阅
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept:s值为" + s);
            }
        });
    }

打印日志如下:

12-26 11:15:21.169 18638-18638/demo.face.comi.io.rxjavademo E/MainActivity: accept:s值为fromCallable

方法列表如下:


三、调度器Scheduler与线程控制

调度器种类:

常用的是Schedulers.io()进行耗时操作、AndroidSchedulers.mainThread()更新ui.

1、Schedulers.immediate();

直接在当前线程运行,相当于不指定线程,默认的Scheduler.

2、Schedulers.newThread();

启动新现成,在新的线程中执行操作。

3、Schedulers.io();

I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler,行为模式和newThread()差不多,区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io()比newThread()更有效率。不要把计算工作放在io(),可以避免创建不必要的线程。

4、Schedulers.computation();

计算所使用的Scheduler。这个计算是指Cpu密集型计算,即不会被I/O等操作限制性的操作,例如图形的计算。这个Sheduler使用的是固定的线程池,大小为Cpu核数。不要把I/O放在computation中,否则I/O操作等待时间会浪费cpu。用于计算任务,如事件循环和回调处理,不要用于IO操作,默认线程数等于处理器的数量。

5、Schedulsers.from(executor)

使用指定的Executor作为调度器。

6. Schedulers.trampoline()
当其它排队的任务完成后,在当前线程排队开始执行

7. AndroidSchedulers.mainThread()
在RxAndroid中,他指定操作将在Android主线程中执行。

指定线程:

1、observerOn(Schedulers)

指定观察者Observer在哪个线程执行

2、subscribeOn(Scheduler)

指定被观察者Observable在哪个线程执行

线程多次随意切换:
observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

示例代码如下:

public void schedulers(View view){
        Observable.just(1,2,2)//创建被观察者
                .subscribeOn(Schedulers.io())//指定被观察者运行在io线程
                .observeOn(AndroidSchedulers.mainThread())//指定下面的观察者运行在主线程中
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG,"integer的值为:"+integer.intValue());
                    }
                });
    }

代码运行结果如下:

12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:1
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2
12-26 13:44:34.728 3377-3377/demo.face.comi.io.rxjavademo E/MainActivity: integer的值为:2

除了将这些调度器传递给RxJava的Observable操作符,你也可以用它们调度你自己的任务。例如:Scheduler.Worker

连接地址:http://wiki.jikexueyuan.com/project/rx-docs/Scheduler.html

四、操作符,比较复杂也非常强大

操作符理解为可以控制流程的方法。

1、操作符的分类


2、变换操作符

变换操作符是用来变换类型的。

种类如下:


map操作符,后面的Function有两个泛型参数,第一个是输入类型,第二个是转换后输出返回的类型,例如下面的示例程序,apply()方法要返回的是第二个参数的类型。

class User{
        private String name;
        private String password;

        public User(String name, String password) {
            this.name = name;
            this.password = password;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getPassword() {
            return password;
        }

        public void setPassword(String password) {
            this.password = password;
        }
    }

    //模拟网络登录
    private boolean login(User user){
        if(user.getName().equals("liming")&&user.getPassword().equals("123")){
            return true;
        }
        return false;
    }
    /**
     * 操作符map
     */
    public void operator_map(View view){
        User user=new User("liming","123");
        Observable.just(user)
            .map(new Function<User, Boolean>() {//操作符map将User转换为需要的结果boolean
                @Override
                public Boolean apply(User user) throws Exception {
                    return login(user);//进行网络登录
                }
            }).subscribeOn(Schedulers.io())//网络登录需要在io中操作
            .observeOn(AndroidSchedulers.mainThread())//更新ui,需要在主线程中操作
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean isLogin) throws Exception {
                    Log.e(TAG,"登录是否成功:"+isLogin);
                }
        });
    }

打印日志输出如下:

12-26 15:17:11.509 14096-14096/demo.face.comi.io.rxjavademo E/MainActivity: 登录是否成功:true

flatMap操作符:

比Map要强大,可以做到map不能操作的事,可以递级处理数据,然后传递数据,例如下面的例子,先用户登录,然后获取用户特征值,示例代码如下:(注意所使用User类与login方法同上)

 /**
     * 模拟根据用户是否登录成功,返回用户的轮廓
     */
    private UserProfile profile(boolean b){
        if(b){
            return new UserProfile("漂亮");
        }else{
            return new UserProfile("好丑");
        }
    }

    /**
     * 操作符-flatmap
     * @param view
     */
    public void operator_flatmap(View view){
        User user=new User("xiaobao","123456");
        Observable.just(user).flatMap(new Function<User, ObservableSource<Boolean>>() {
            @Override
            public ObservableSource<Boolean> apply(User user) throws Exception {
                return Observable.just(login(user));//用户登录
            }
        }).flatMap(new Function<Boolean, ObservableSource<UserProfile>>() {
            @Override
            public ObservableSource<UserProfile> apply(Boolean aBoolean) throws Exception {
                return Observable.just(profile(aBoolean));//获取用户特征
            }
        }).subscribeOn(Schedulers.io())//控制被监控对象在io线程中
                .observeOn(AndroidSchedulers.mainThread())//监控在主线程中
        .subscribe(new Consumer<UserProfile>() {
            @Override
            public void accept(UserProfile userProfile) throws Exception {//返回用户特征
                //相当于onNext
                Log.e(TAG, "userProfile值为:" + userProfile.profile);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                //onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                //onComplete
            }
        });
    }
打印日志如下:

12-26 15:57:28.089 19512-19512/demo.face.comi.io.rxjavademo E/MainActivity: userProfile值为:好丑
buffer操作符:

缓存发射的数据为一个list,到观察者的时候参数就是一个list。

示例代码如下:

public void operator_buffer(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i=0;i<10;i++){
                    e.onNext(i+"");
                }
                e.onComplete();
            }
        }).buffer(500, TimeUnit.MILLISECONDS)//缓冲500毫秒内发射的数据
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                //这里strings为list,就是缓冲了500毫秒内发射的String
                Log.e(TAG,"accept的数据大小为:"+strings.size()+";数据为:"+strings.toString());
            }
        });
    }
打印日志如下:
12-26 16:21:27.925 22722-22722/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据大小为:10;数据为:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

3、过滤操作符

debounce操作符:

被观察者连续发射的数据的时间间隔 如果在指定时间 就被过滤拦截。

示例如下:

public void operator_debounce(View view){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                if(e.isDisposed()) return;
                try{
                    for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
                        e.onNext(i);//发射数据
                        Thread.sleep(100*i);
                    }
                }catch (Exception error){
                    e.onError(error);
                }
            }
        }).subscribeOn(Schedulers.computation())
                .debounce(500,TimeUnit.MILLISECONDS)//如果发射数据间隔小于500就被过滤拦截掉
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "accept的数据为:" + integer);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG,"出错:"+throwable.toString());
                    }
                });
    }
打印日志如下:
12-26 16:37:23.194 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:6
12-26 16:37:23.795 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:37:24.494 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:37:25.295 24858-25101/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9

filter操作符:

过滤数据,返回真即使满足条件,不拦截,否者拦截,观察者接收不到。

示例代码如下:

public void operator_filter(View view){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                if(e.isDisposed()) return;
                try{
                    for(int i=1;i<10;i++){//发生数据间隔时间分别为100、200。。。。。1000毫秒
                        e.onNext(i);//发射数据
                        Thread.sleep(100*i);
                    }
                    e.onComplete();
                }catch (Exception error){
                    e.onError(error);
                }
            }
        }).subscribeOn(Schedulers.computation())
        .debounce(500,TimeUnit.MILLISECONDS)
        .filter(new Predicate<Integer>() {//在debounce的基础上加过滤,必须大于6
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer>6;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept的数据为:" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG,"出错:"+throwable.toString());
            }
        });
    }
打印日志如下:

12-26 16:45:25.356 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:7
12-26 16:45:26.056 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:8
12-26 16:45:26.856 26249-26403/demo.face.comi.io.rxjavademo E/MainActivity: accept的数据为:9
take操作符

示例代码如下:

public void operator_take(View view){
        Observable.just(1,2,3,4,5,6,7)
                .take(3)//发射前3个数据
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer){
                        Log.e(TAG,"onNext的数据为:"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

打印日志如下:

12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:1
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:2
12-26 16:58:02.756 28112-28112/demo.face.comi.io.rxjavademo E/MainActivity: onNext的数据为:3

过滤操作符实际使用案例:防止点击按钮连续点击、搜索引擎过滤,避免搜索字段的变化连续请求网络。以下为按钮防连续点击为例,这个功能需要集成RxBinding库。

示例代码如下:

RxView.clicks(btn_operator_filterLivingExample)
                .throttleFirst(2,TimeUnit.SECONDS)//2秒内的点击只拿第一个,他的都过滤调
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e(TAG,"被点击了。。。");
                    }
                });

4、组合操作符


startWith操作符

举例如下:在abc之前插入一个d,所以d是最先发出去的数据。

 public void operator_startWith(View view){
        Observable.just("a","b","c")
                .startWith("d")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG,"accept:"+s);
                    }
                });
    }

打印日志如下:

12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:d
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:a
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:b
12-26 17:59:10.056 5013-5013/demo.face.comi.io.rxjavademo E/MainActivity: accept:c

merge操作符:

合并观察者

示例代码如下:

   public void operator_merge(View view){
        Observable<String> o1 = Observable.just("a","b","c");
        Observable<String> o2 = Observable.just("d","e","f");
        Observable.merge(o1,o2).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept: "+s);
            }
        });
    }
打印日志如下:

12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: d
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: e
12-26 18:07:00.206 6414-6414/demo.face.comi.io.rxjavademo E/MainActivity: accept: f
combineLatest操作符

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

示例代码如下:

public void operator_combineLatest(View view){
        Observable<String> o1 = Observable.just("a","b","c");
        Observable<String> o2 = Observable.just("d","e","f");

        //第一个String就是o1 最后的数据,第二个String是o2的每个数据源,第三个String是结合之后返回的类型
        Observable.combineLatest(o1, o2, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                //s就是o1 最后的数据,s2 就是o2的每个数据源
                return s+s2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept: "+s);
            }
        });
    }
打印日志如下:
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: cd
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: ce
12-26 18:14:38.961 7590-7590/demo.face.comi.io.rxjavademo E/MainActivity: accept: cf

组合操作符实例:

利用combineLatest实现注册的时候所有输入信息(姓名、邮箱、年龄等)合法后才点亮注册按钮,示例代码如下:

public void operator_combineDemo(View view){
        //skip过滤操作符,这里是过滤掉第一个字符的意思,也就是第一个字符不算数。
        Observable<CharSequence> observableName = RxTextView.textChanges(etName).skip(1);
        Observable<CharSequence> observableEmail = RxTextView.textChanges(etEmail).skip(1);
        Observable<CharSequence> observableAge= RxTextView.textChanges(etAge).skip(1);
        Observable.combineLatest(observableName, observableEmail, observableAge, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
            @Override
            public Boolean apply(CharSequence name, CharSequence email, CharSequence age) throws Exception {
                //当输入框name长度大于3,email长度大于5,age长度大于0,这些长度都是在过滤掉第一个字符的前提下。只有这些条件都满足下才返回true
                return name.length()>3&&email.length()>5&&age.length()>0;
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                if(aBoolean){//返回true才能让注册按钮可以被点击
                    Log.e(TAG,"验证通过,按钮可以点击了");
                }
            }
        });
    }

5、错误处理操作符


onErrorReturn操作符

让Observable遇到错误时发射一个特殊的项并且正常终止,onErrorRetrun能够捕获在它之前发生的异常,它之后流中的操作发生的异常就它就不会管了。

public void operator_onErrorReturn(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i=0;i<=4;i++){
                    if(i==2){
                        e.onError(new Throwable("出现错误了"));
                    }else{
                        e.onNext(i+"");
                    }
                    try{
                        Thread.sleep(1000);
                    }catch (Exception ex){
                        ex.printStackTrace();
                    }
                }
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .onErrorReturn(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) throws Exception {
                Log.e(TAG, "在onErrorReturn处理了: "+throwable.toString() );
                return "10";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "收到消息: " + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "结果错误: " + throwable.toString());
            }
        });
    }

打印日志如下:

12-27 13:51:53.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:51:54.634 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 在onErrorReturn处理了: java.lang.Throwable: 出现错误了
12-27 13:51:55.635 9079-9119/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 10

onErrorRsumeNext操作符

和onErrorNext不同的是,onErrorResumeNext是返回一个重新定义的Observable,onErrorNext返回的是发射的数据。

注意onErrorResumeNext拦截的错误是Throwable,不能拦截Exception。 不然它会将错误传递给观察者的onError方法。要拦截Exception请用onExceptionResumeNext。

示例代码如下:

public void operator_onErrorResumeNext(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i = 0; i<= 3 ;i++){
                    if(i == 2){
                        //这里是Throwable
                        e.onError(new Throwable("出现错误了"));
                    }else{
                        e.onNext(i+"");
                    }
                    try{
                        Thread.sleep(1000);
                    }catch (Exception ex){
                        ex.printStackTrace();
                    }
                }

                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
                    @Override
                    public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
                        //拦截到错误之后,重新定义了被观察者
                        return Observable.just("重新定义了被观察者");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "收到消息: " + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "结果错误: " + throwable.toString());
                    }
                });
    }

打印日志如下:

12-27 13:58:32.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 13:58:33.695 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 13:58:34.697 9630-9746/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 重新定义了被观察者

onExceptionResumeNext操作符

onExceptionResumeNext 和 onErrorResumeNext基本一样,也是收到错误重新定义了新的被观察者。但是有一点不用: 如果onErrorResumeNext收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,onExceptionResumeNext则会继续拦截。

注意onExceptionResumeNext拦截的错误是Exception,不能拦截Throwable。 不然它会将错误传递给观察者的onError方法。要拦截Throwable请用onErrorResumeNext。

示例代码如下:

public void operator_onExceptionResumeNext(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i = 0; i<= 3 ;i++){
                    if(i == 2){
                        //注意这里是Exception
                        e.onError(new Exception("出现错误了"));
                    }else{
                        e.onNext(i+"");
                    }
                    try{
                        Thread.sleep(1000);
                    }catch (Exception ex){
                        ex.printStackTrace();
                    }
                }
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .onExceptionResumeNext(new Observable<String>() {
                    @Override
                    protected void subscribeActual(Observer<? super String> observer) {
                        observer.onNext("错误替换的消息");
                        observer.onComplete();
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "收到消息: " + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "结果错误: " + throwable.toString());
                    }
                });
    }
输出结果如下:

12-27 14:10:10.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:10:11.213 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:10:12.215 10927-10994/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 错误替换的消息

retry操作符

重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截

它有五种参数方法: 
retry(): 让被观察者重新发射数据,要是一直错误就一直发送了 
retry(BiPredicate): interger是第几次重新发送,Throwable是错误的内容 
retry(long time): 最多让被观察者重新发射数据多少次 
retry(long time,Predicate predicate): 最多让被观察者重新发射数据多少次,在predicate里面进行判断拦截 返回是否继续 
retry(Predicate predicate): 在predicate里面进行判断拦截 返回是否继续

示例代码如下:
public void operator_retry(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i = 0; i<= 3 ;i++){
                    if(i == 2){
                        e.onError(new Exception("出现错误了"));
                    }else{
                        e.onNext(i+"");
                    }
                    try{
                        Thread.sleep(1000);
                    }catch (Exception ex){
                        ex.printStackTrace();
                    }
                }
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(Throwable throwable) throws Exception {
                        Log.e(TAG, "retry错误: "+throwable.toString());

                        //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
                        //返回真就是让被观察者重新发射请求
                        return true;
                    }
                })
//                .retry(new BiPredicate<Integer, Throwable>() {
//                    @Override
//                    public boolean test(Integer integer, Throwable throwable) throws Exception {
//                        Log.e(TAG, "retry错误: "+integer+" "+throwable.toString());
//
//                        //返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//                        //返回真就是让被观察者重新发射请求
//                        return true;
//                    }
//                })
//                .retry(3)//最多让被观察者重新发射数据3次
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "收到消息: " + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "结果错误: " + throwable.toString());
                    }
                });
    }
打印日志如下:

12-27 14:27:06.456 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:07.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:08.457 11814-11952/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:08.459 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:09.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:10.460 11814-11959/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了
12-27 14:27:10.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:27:11.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:27:12.465 11814-11961/demo.face.comi.io.rxjavademo E/MainActivity: retry错误: java.lang.Exception: 出现错误了

......

......

一直这样循环下去

retryWhen操作符

retryWhen和retry类似,区别是:retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。

这里如果里面的throwableObservable不进行处理,那么onNext也会拦截处理,这里有个坑。

示例代码如下

public void operator_retryWhen(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for (int i = 0; i <= 3; i++) {
                    if (i == 2) {
                        e.onError(new Exception("出现错误了"));
                    } else {
                        e.onNext(i + "");
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {

                        //这里可以发送新的被观察者 Observable
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Throwable throwable) throws Exception {

                                //如果发射的onError就终止
                                return Observable.error(new Throwable("retryWhen终止啦"));
                            }
                        });

                    }
                })

                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "收到消息: " + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "结果错误: " + throwable.toString());
                    }
                });
    }

日志输出如下:
12-27 14:45:55.551 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 14:45:56.551 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 14:45:57.554 13167-13358/demo.face.comi.io.rxjavademo E/MainActivity: 结果错误: java.lang.Throwable: retryWhen终止啦

6、辅助操作符



delay操作符

整体延迟一段指定的时间再发射来自Observable的发射物。就是延迟。

它有6种方法参数:

  • delay(Function):
  • delay(long delay,TimeUnit unit): 指定延迟多长时间
  • delay(long delay,TimeUnit unit,mScheduler scheduler): 指定延迟多长时间并添加调度器
  • delay(long delay,TimeUnit unit,boolean delayError): 指定延迟多长时间。delayError参数如果为假 就直接抛出onError,为真就如常延迟执行。
  • delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
  • delay(ObservableSource ,Function):
示例如下:

public void operator_delay(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i=0; i<=3 ;i++){
                    e.onNext(i+"");
                }
                try{
                    Thread.sleep(1000);
                }catch (Exception ex){
                    ex.printStackTrace();
                }
                e.onComplete();
            }
        })
                .delay(3000, TimeUnit.MILLISECONDS)
                //delayError参数如果为假就直接抛出onError,为真就如常延迟执行
//                .delay(3000,TimeUnit.MILLISECONDS,true)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "收到消息: " + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "收到错误: " + throwable.toString());
                    }
                });
    }

隔了三秒输出如下日志:
12-27 15:28:10.560 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 0
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 2
12-27 15:28:10.561 17754-17788/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 3

delaySubscription操作符

该操作符跟delay的差别就是:delaySubscription只做一件事,延迟订阅。

do操作符

do操作符有很多个,就相当于生命周期。例如doOnNext在onNext的时候回调。

例如下面的例子:

public void operator_do(View view){
        Observable.just("1","2")
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG,"doOnNext:"+s);
                    }
                })
                .doAfterNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG,"doAfterNext:"+s);
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG,"doOnComplete:");
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {//订阅之后回调的方法
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.e(TAG,"doOnSubscribe");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG,"doAfterTerminate");
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG,"doFinally:");
                    }
                })
                //Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
                .doOnEach(new Consumer<Notification<String>>() {
                    @Override
                    public void accept(Notification<String> stringNotification) throws Exception {
                        Log.e(TAG, "doOnEach: "+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError"));
                    }
                })
                //订阅后可以进行取消订阅
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnLifecycle: "+disposable.isDisposed());
                        //disposable.dispose();
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doOnLifecycle run: ");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "收到消息: " + s);
                    }
                });
    }
执行结果如下:
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnSubscribe
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnLifecycle: false
12-27 15:46:57.702 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnNext:1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onNext
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterNext:1
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnNext:2
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onNext
12-27 15:46:57.703 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: 收到消息: 2
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterNext:2
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnComplete:
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doOnEach: onComplete
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doFinally:
12-27 15:46:57.704 18820-18820/demo.face.comi.io.rxjavademo E/MainActivity: doAfterTerminate
materialize操作符

materialize把 被观察者Observable转换为Notification通知对象。dematerialize相反了。 注意用了materialize之后,onNext会回调多了一个数据,因为onComplete也回调到这里了。

示例代码如下

public void operator_materialize(View view){
        Observable.just("1","2")
                .materialize()
                .subscribe(new Consumer<Notification<String>>() {
                    @Override
                    public void accept(@NonNull Notification<String> stringNotification) throws Exception {
                        //这时候的数据就是一个Notification对象了
                        Log.e(TAG, (stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")+": "+stringNotification.getValue());
                    }
                });
    }
日志输出如下:

12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 1
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onNext: 2
12-27 16:48:32.658 21842-21842/demo.face.comi.io.rxjavademo E/MainActivity: onComplete: null
TimeInterval操作符

获取数据发送的时间间隔,就是把数据转换为数据发送的间隔Timed。

有4个参数方法:

  • timeInterval(): 转换为时间Timed,默认时间单位为毫秒
  • timeInterval(Scheduler): 转换为时间Timed,可以设置调度器
  • timeInterval(TimeUnit): 转换为时间Timed,可以设置时间单位
  • timeInterval(TimeUnit,Scheduler): 转换为时间Timed,可以设置时间单位和调度器
例子如下:
public void operator_timeinterval(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for(int i=0; i<3; i++){
                    e.onNext(i+"");
                    Thread.sleep(1000);
                }
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .timeInterval()
                .subscribe(new Consumer<Timed<String>>() {
                    @Override
                    public void accept(@NonNull Timed<String> stringTimed) throws Exception {
                        Log.e(TAG, "accept: "+stringTimed.time());
                    }
                });
    }
日志输出如下:
12-27 16:55:17.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 10
12-27 16:55:18.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 1001
12-27 16:55:19.608 22853-22896/demo.face.comi.io.rxjavademo E/MainActivity: accept: 1000
timestamp操作符
给发射的每个数据添加时间,转换了为Timed,和timeInterval的参数一致,但是timestamp获取到的time是时间戳,需要自己转换。
示例代码如下:
public void operator_timestamp(View view){
        Observable.just("a","b")
                .timestamp()
                .subscribe(new Consumer<Timed<String>>() {
                    @Override
                    public void accept(@NonNull Timed<String> stringTimed) throws Exception {
                        //转换时间
                        String date = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss", Locale.CHINA)
                                .format(new Date(stringTimed.time()));
                        Log.e(TAG, "accept: "+date);
                    }
                });
    }

日志输出如下:
12-27 16:59:40.259 23241-23241/demo.face.comi.io.rxjavademo E/MainActivity: accept: 2017-12-27 04:59:40
12-27 16:59:40.261 23241-23241/demo.face.comi.io.rxjavademo E/MainActivity: accept: 2017-12-27 04:59:40
7、条件操作符

amb/ambArray/ambWith操作符

给定多个Observable,只让第一个发射数据的Observable发射全部数据。

ambWith和ambArray差不多,Observable.amb(o1,o2)和o1.ambWith(o2)是一样的效果。

有两个参数方法: 
- amb(Iterable); 
- ambArray();

示例代码如下:
public void operator_amb(View view){
        Observable o1 = Observable.just("a","b","c").delay(1000, TimeUnit.MILLISECONDS);
        Observable o2 = Observable.just("d","e","f");

        Observable.ambArray(o1,o2).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String o) throws Exception {
                Log.e(TAG, "accept: "+o);
            }
        });
    }
输出结果如下:
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-27 17:09:38.495 24304-24304/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
由于o1延迟发送,所以o2先发送,而又由于ambArray只会让第一个Observable发送全部数据,所以结果就是abc。
defaultIfEmpty操作符
被观察者没有onNext发送数据就调用了onComplete,就发射defaultlfEmpty里面的数据
示例代码如下:
public void operator_defaultIfEmpty(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onComplete();
            }
        }).defaultIfEmpty("默认数据")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: "+s);
                    }
                });
    }
输入日志如下:
12-27 17:13:44.928 24752-24752/demo.face.comi.io.rxjavademo E/MainActivity: accept: 默认数据

switchIfEmpty操作符

如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源
示例代码如下:
public void operator_switchIfEmpty(View view){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onComplete();
            }
        }).switchIfEmpty(Observable.just("a","b","c"))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: "+s);
                    }
                });
    }
执行结果如下:
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: a
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: b
12-27 17:25:43.048 25581-25581/demo.face.comi.io.rxjavademo E/MainActivity: accept: c
skipUntil操作符
SkipUntil 订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable
//丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
    public void operator_skipUntil(View view) {
        //skipUntil里面的Observable发射了之后,原始的Observable每隔一秒循环发射的数据才开始被接收到
        Observable.interval(1, TimeUnit.SECONDS)
                .skipUntil(Observable.just("1")).delay(5,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "accept:" + aLong);
                    }
                });
    }

直到等待5秒后才开始输出如下日志:
12-27 17:42:21.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:0
12-27 17:42:22.107 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:1
12-27 17:42:23.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:2
12-27 17:42:24.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:3
12-27 17:42:25.108 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:4
12-27 17:42:26.107 29184-29228/demo.face.comi.io.rxjavademo E/MainActivity: accept:5

.......
skipWhile操作符
和skipUntil不同的是,skipWhile可以判断,返回假才让数据发出去。
示例代码如下:
public void operator_skipWhile(View view){
        Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(Schedulers.newThread())
                .skipWhile(new Predicate<Long>() {
                    @Override
                    public boolean test(@NonNull Long aLong) throws Exception {
                        return aLong < 5;
                        //返回假,原始的Observable发射的数据才可以接收到
                    }
                })
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: "+aLong);
                    }
                });
    }
输出日志如下:
12-27 17:46:26.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 5
12-27 17:46:27.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 6
12-27 17:46:28.342 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 7
12-27 17:46:29.341 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 8
12-27 17:46:30.342 29731-29954/demo.face.comi.io.rxjavademo E/MainActivity: accept: 9

........

takeUntil() / takeWhile() / takeWhileWithIndex()操作符

和skilUntil、shikWhil相反,他们是开始先发射原始数据,到takeUntil的第二个Observable发射了一个数据或一个通知就不发射原来的数据了。

8、布尔操作符

9、算术和聚合操作符



代码下载地址:

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐