Rxjava2使用总结

前言

Rxjava2 是 函数式编程响应式编程 两种牛逼的思想结合的产物,也是在 Java 中这两种思想的一种实现。

函数响应式编程 (Functional Reactive Programming:FRP):是一种通过一系列函数的组合调用来发射,转变,监听,响应数据流的编程范式。

在 Rxjava2 中提供了 5种 观察者模式来实现这种函数响应式编程思想。

ObservableSource/Observer

可通过 onNext 方法发送单条数据或者数据序列,通过 onComplete 发送完成通知或通过 onError 发送异常通知,不支持 背压策略

抽象类 Observable 是接口 ObservableSource 下的一个抽象实现,我们可以通过 Observable 创建一个可观察对象发射数据流。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello World");
        emitter.onComplete();
    }
});

调用 Observable.create 方法,创建一个可观察对象,并通过 onNext 发送一条数据 “Hello World”,然后通过 onComplete 发送完成通知。

创建一个观察者 Observer 来接受并响应可观察对象发射的数据。

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("接受完成");
    }
};

在 onNext 方法中接收到可观察对象发射的数据 “Hello World”, 并做出响应打印到控制台。

Observer 订阅 Observable。

observable.subscribe(observer);

通过 Observable.create 创建可观察对象时,我们可以发现具体执行发射动作的是由接口 ObservableEmitter 的实例化对象完成的,而 ObservableEmitter<T> 继承自 接口 Emitter<T>,查看源码接口Emitter 的具体代码如下:

public interface Emitter<T> {
        //用来发送数据,可多次调用,每调用一次发送一条数据
    void onNext(@NonNull T value);
        //用来发送异常通知,只发送一次,若多次调用只发送第一条
    void onError(@NonNull Throwable error);
        //用来发送完成通知,只发送一次,若多次调用只发送第一条
    void onComplete();
}

接口 Observer 中的三个方法(onNext, onError, onComplete)正好与 Emitter 中的三个方法相对应,对于 Emitter 中对应方法发送的数据或通知进行响应。

使用 just 操作符简化如下:

public void demo3() {
    Observable.just("Hello World")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.println(s);
                }
            });
}

Consumer 通过其函数 accept 只接收可观察对象发射的数据,不接收异常信息或完成信息。

事件订阅后都会返回一个 Disposable 对象:

public interface Disposable {
        void dispose();
        boolean isDisposed();
}

其中 isDisposed() 方法用来判断当前订阅是否失效,dispose() 方法用来取消当前订阅。

Publisher/Subscriber

ObservableSource/Observer 基础上进行了改进,可通过 背压策略 处理背压问题,但效率没有第一组高。

FlowablePublisherSubscriber 这一组观察者模式中 Publisher 的典型实现。在使用 Flowable 的时候,可观察对象不再是 Observable, 而是 Flowable, 观察者不再是 Observer,而是Subscriber。Flowable 与 Subscriber 之间依然通过 subscribe() 进行关联。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
        System.out.println("发射----> 1");
        e.onNext(1);
        System.out.println("发射----> 2");
        e.onNext(2);
        System.out.println("发射----> 3");
        e.onNext(3);
        System.out.println("发射----> 完成");
        e.onComplete();
    }
}, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数
.subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {   //onSubscribe回调的参数不是Disposable而是Subscription
        s.request(Long.MAX_VALUE);            //注意此处,暂时先这么设置
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("接收----> " + integer);
    }

    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
        System.out.println("接收----> 完成");
    }
});

注意第13行的 BackpressureStrategy.BUFFER 策略设置。

BUFFER 是处理背压的默认策略,在其内部维护了一个缓存池 SpscLinkedArrayQueue,其大小不限,此策略下,如果 Flowable 默认的异步缓存池满了,会通过此缓存池暂存数据,它与 Observable 的异步缓存池一样,可以无限制向里添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。

DROP 在此策略下,如果 Flowable 的异步缓存池满了,会丢掉上游发送的数据。

LATEST 与 Drop 策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST 都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到 Flowable 最新发射的一条数据。

ERROR 此策略下,如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。

MISSING 此策略不会对对缓存池的状态进行判断,所以在此策略下,通过 Create 方法创建的 Flowable 相当于没有指定背压策略,不会对通过 onNext 发射的数据做缓存或丢弃处理,需要下游通过背压操作符 onBackpressureBuffer(), onBackpressureDrop(), onBackpressureLatest() 指定背压策略。

方法对应策略
onBackpressureBuffer()BackpressureStrategy.BUFFER
onBackpressureDrop()BackpressureStrategy.DROP
onBackpressureLatest()BackpressureStrategy.LATEST

注意:以下三组是新的响应式关系的实现,在 Rxjava1 中没有,可看做是 ObservableSource/Observer 的简化版。

SingleSource/SingleObserver

不能发送数据序列或完成通知,只能通过 onSuccess 方法发送单条数据,或者通过 onError 发送异常通知。

Single.create(new SingleOnSubscribe<Integer>() {
    @Override
    public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
        emitter.onSuccess(0);
    }
}).subscribe(new SingleObserver<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(Integer integer) {
        System.out.println(integer);
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
});

CompletableSource/CompletableObserve

不能发送任何形式的数据(单条数据或数据序列),只能通过 onComplete 发送完成通知或者通过 onError 发送异常通知。

 Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(CompletableEmitter emitter) throws Exception {
        emitter.onComplete();
    }
}).subscribe(new CompletableObserver() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onComplete() {
        System.out.println("执行完成");
    }

    @Override
    public void onError(Throwable e) {

    }
});

MaybeSource/MaybeObserver

可通过 onSuccess 发送单条数据,通过 onComplete 发送完成通知或者通过 onError 发送一条异常通知。

Maybe.create(new MaybeOnSubscribe<Integer>() {
    @Override
    public void subscribe(MaybeEmitter<Integer> emitter) throws Exception {
        emitter.onSuccess(1);
        emitter.onComplete();
    }
}).subscribe(new MaybeObserver<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(Integer integer) {
        System.out.println(integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        System.out.println("执行完成");
    }
});

线程调度

Observable<T> subscribeOn(Scheduler scheduler) 

subscribeOn 通过接收一个 Scheduler 参数,来指定对数据的处理运行在特定的线程调度器 Scheduler 上。若多次设定,则只有一次起作用。

Observable<T> observeOn(Scheduler scheduler)

observeOn 同样接收一个 Scheduler 参数,来指定下游操作运行在特定的线程调度器 Scheduler 上。若多次设定,每次均起作用。

Scheduler 种类:

Schedulers.io()

用于 IO 密集型的操作,例如读写 SD 卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。

Schedulers.newThread()

在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用 Schedulers.io() 的地方,都可以使用 Schedulers.newThread(),但是,Schedulers.newThread() 的效率没有 Schedulers.io() 高。

Schedulers.computation()

用于 CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如 xml, json 文件的解析,Bitmap 图片的压缩取样等,具有固定的线程池,大小为 CPU 的核数。不可以用于 I/O 操作,因为 I/O 操作的等待时间会浪费 CPU。

Schedulers.trampoline()

在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。

Schedulers.single()

拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。

Scheduler.from(@NonNull Executor executor)

指定一个线程调度器,由此调度器来控制任务的执行策略。

AndroidSchedulers.mainThread()

在 Android UI 线程中执行任务,为 Android 开发定制。

subscribeOn 来指定对数据的处理运行在特定的线程调度器 Scheduler 上,直到遇到 observeOn 改变线程调度器若多次设定,则只有一次起作用。observeOn 指定下游操作运行在特定的线程调度器 Scheduler 上。若多次设定,每次均起作用。