图解 RxJava2-【1】异步事件

通过生活中的几个角色来学习 RxJava2 :饭店、厨师、服务员、顾客。

模拟一个情景:饭店有一个很火的套餐,顾客来店默认就要这个套餐(不存在服务员咨询顾客要什么的过程),所以情况应该是这样的。

上面的漫画写成 RxJava2 就是很多入门文章中看到的:事件发起者(上游)

Observable<String> observable;
observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        System.out.println("服务员从厨师那取得 扁食");
        e.onNext("扁食");
        System.out.println("服务员从厨师那取得 拌面");
        e.onNext("拌面");
        System.out.println("服务员从厨师那取得 蒸饺");
        e.onNext("蒸饺");
        System.out.println("厨师告知服务员菜上好了");
        e.onComplete();
    }
});

事件接收者(下游)

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("来个沙县套餐!!!");
    }

    @Override
    public void onNext(String s) {
        System.out.println("服务员端给顾客  " + s);
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
        System.out.println("服务员告诉顾客菜上好了");
    }
};

建立联系:

observable.subscribe(observer);

打印如下:

来个沙县套餐!!!
服务员从厨师那取得 拌面
服务员端给顾客  拌面
服务员从厨师那取得 扁食
服务员端给顾客  扁食
服务员从厨师那取得 蒸饺
服务员端给顾客  蒸饺
厨师告知服务员菜上好了
服务员告诉顾客菜上好了

下面把一些类代入角色结合源码分析,演员表:

角色对应类
饭店Observable
厨师ObservableOnSubscribe
服务员Emitter
顾客Observer

源码分析

最初看源码的时候容易因为各个类名字起得很相似而看晕,因此先把涉及到的类之间的关系画出来。

Observable 是个抽象类,其子类是 ObservableCreate ,如果把 Observable 比成饭店,那 ObservableCreate 就是沙县小吃,看下 Observable 的 create 方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

Observable 的 create 方法只是将接收的 ObservableOnSubscribe 作为参数传递给子类 ObservableCreate 真正实例化:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }    
    ...
}

上面这些代码就是漫画的第一格:饭店要开张 (Observable.create),开张的前提是要有一个会做菜的厨师 (new ObservableOnSubscribe),接着饭店起名叫沙县小吃 (new ObservableCreate),并把这个厨师和沙县小吃建立联系 (this.source = source)。厨师有了,但是他并没有立即开始做菜 (ObservableOnSubscribe.subscribe()),这个也很好理解,现实生活中厨师也是这样,他做不做菜取决于饭店,毕竟是饭店给他开工资;而饭店是否让厨师做菜很大一个原因取决于有没有顾客上门,看下顾客:

顾客没有什么套路,上菜就吃(onNext),菜上完或菜出问题会有相应的提醒(onComplete/onError),对应上面漫画2。接着看饭店接客 observable.subscribe(observer) 的源码:

public final void subscribe(Observer<? super T> observer) {
    //省略部分代码
    subscribeActual(observer);
    //省略部分代码
}

protected abstract void subscribeActual(Observer<? super T> observer);

Observable (饭店) 的 subscribe 方法又会调用 subscribeActual 方法,该方法是个抽象方法,具体实现在子类,看看子类 ObservableCreate (沙县小吃)

protected void subscribeActual(Observer<? super T> observer) {
    //步骤①
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //步骤②
    observer.onSubscribe(parent);

    try {
        //步骤③
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

先看下涉及到的类以及所属关系:

步骤① Emitter 翻译为发射器,这里名字起得也很形象 CreateEmitter (创建发射器) ,即对应服务员,CreateEmitter 创建的时候接收 Observer,就像一个服务员接待一个顾客一样(对应漫画3服务员说话)

步骤② 执行 onSubscribe 方法并接收 CreateEmitter ,所以看到 log 中最先打印该方法的内容,就像顾客认准之后自己的菜是由这个服务员上的(对应漫画3顾客说话)

步骤③ 调用 ObservableOnSubscribe.subscribe ,并接收 CreateEmitter ,就像厨师和该服务员建立联系,之后厨师做的菜都由该服务员端出去。上什么菜取决于 ObservableOnSubscribe.subscribe 中的实现。

分析到这里发现 CreateEmitter (服务员) 起到枢纽作用,看下代码中 e.onNext / e.onComplete 的实现:

public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("..."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

onNext 中首先判空,服务员端个空盘子出来要被顾客锤成麻瓜;接着发送之前需要执行 isDisposed() 判断,可以理解成顾客是否还需要菜,默认情况下是需要的(!isDisposed() 为 true ),当执行完 onComplete() 方法后会执行 dispose() ,表明顾客不再需要菜了,后续的菜服务员不会再端上来给顾客了。

Observer<String> observer = new Observer<String>() {
    Disposable disposable;
    @Override
    public void onSubscribe(Disposable d) {
        this.disposable = d;
        System.out.println("来个沙县套餐!!!");

    }

    @Override
    public void onNext(String s) {
        if (s.equals("拌面")) {
            disposable.dispose();
        }
        System.out.println("服务员端给顾客  " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        System.out.println("服务员告诉顾客菜上好了");
    }
};

打印如下:

来个沙县套餐!!!
服务员从厨师那取得 拌面
服务员端给顾客  拌面
服务员从厨师那取得 扁食
服务员从厨师那取得 蒸饺
厨师告知服务员菜上好了

从上面可以看到一旦执行完 Disposable.dispose() 方法,顾客和服务员就没有后续交流了,就像 Disposable 翻译的那样「一次性」,理解成顾客对服务员说「后续的菜都别上了,你也不要再出现在我面前」;但是服务员和厨师的交流还是保持着,默认情况下厨师并不知道顾客不需要菜了,因此他还是继续做菜,然后交给服务员端出去。当然我们也可以在厨师做下一道菜的之前,判断下顾客还要不要:

Observable<String> observable;
observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        if (!e.isDisposed()) {
            System.out.println("服务员从厨师那取得 拌面");
            e.onNext("拌面");
        }
        if (!e.isDisposed()) {
            System.out.println("服务员从厨师那取得 扁食");
            e.onNext("扁食");
        }
        if (!e.isDisposed()) {
            System.out.println("服务员从厨师那取得 蒸饺");
            e.onNext("蒸饺");
        }
        if (!e.isDisposed()) {
            System.out.println("厨师告知服务员菜上好了");
            e.onComplete();
        }

    }
});

打印如下:

来个沙县套餐!!!
服务员从厨师那取得 拌面
服务员端给顾客  拌面

再看下另一种情况:

Observable<String> observable;
observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        System.out.println("服务员从厨师那取得 拌面");
        e.onNext("拌面");
        System.out.println("服务员从厨师那取得 扁食");
        e.onNext("扁食");
        System.out.println("服务员从厨师那取得 蒸饺");
        e.onNext("蒸饺");
        System.out.println("厨师告知服务员菜上好了");
        e.onComplete();
    }
});
observable.subscribe();

打印如下:

服务员从厨师那取得 拌面
服务员从厨师那取得 扁食
服务员从厨师那取得 蒸饺
厨师告知服务员菜上好了

上面分析了要有顾客厨师才会做菜,这都没顾客怎么也做菜呢?看下源码

public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
            Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, 
        Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Subscription> onSubscribe) {
        
    LambdaSubscriber<T> ls; //默认顾客
    ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;

public final void subscribe(Observer<? super T> observer) {
    subscribeActual(observer);
}

protected abstract void subscribeActual(Observer<? super T> observer);

原来系统会默认创建一个 LambdaObserver(默认顾客) ,服务员从厨师那端的菜会传给这个顾客。所以可以看出厨师做不做菜只取决于饭店(Observable.subscribe),后面的流程和上面分析的一致。另外上面的代码还出现了Consumer、Action类,这些类里也有对事件的处理,可以理解成顾客选择接收服务员的哪些信息,在 functions 包下还有其他实现。

subscribe() 有下面几个重载方法:

//顾客只关心上什么菜
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
    
//顾客关心上什么菜以及菜是不是出问题 
public final Disposable subscribe(Consumer<? super T> onNext, 
    Consumer<? super Throwable> onError) {} 

//顾客关心上什么菜、菜是不是有问题、菜是不是上完了   
public final Disposable subscribe(Consumer<? super T> onNext, 
    Consumer<? super Throwable> onError, Action onComplete) {}

//顾客关心上什么菜、菜是不是有问题、菜是不是上完了、
//onSubscribe()中可以获取Disposable引用,之后选择告诉服务员是否继续上菜
public final Disposable subscribe(Consumer<? super T> onNext, 
    Consumer<? super Throwable> onError,  
    Action onComplete, Consumer<? super Disposable> onSubscribe) {}

//由顾客自己决定关心哪些事件,和上一条效果一样
public final void subscribe(Observer<? super T> observer) {}

如果顾客只关心上什么菜,我们可以这么写:

Consumer<String> consumer = new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println("服务员端给顾客  " + s);
    }
};
observable.subscribe(consumer);

打印如下:

服务员从厨师那取得 拌面
服务员端给顾客  拌面
服务员从厨师那取得 扁食
服务员端给顾客  扁食
服务员从厨师那取得 蒸饺
服务员端给顾客  蒸饺
厨师告知服务员菜上好了

本文转载自 HuYounger 的博客:https://rkhcy.github.io/2017/12/13/RxJava2_1/