图解 RxJava2-【3】observeOn

上篇文章分析了 RxJava 中 subscribeOn 方法的实现原理,然而只使用 subscribeOn 发现上下游都是执行在子线程中。在日常开发中往往是将上游耗时任务通过 subscribeOn 指定在子线程中,下游通常是更新 UI 等需要在主线程中进行,使用 observeOn(AndroidSchedulers.mainThread()) 就能实现,那么它是怎么做到的呢?

例子

基于上篇文章的代码,修改上下游联系,添加 observeOn(AndroidSchedulers.mainThread())

source.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);
Log.e(TAG, "其他任务执行");

打印如下:

此时主线程中的「其他任务」没有被阻塞,上游的耗时任务在子线程 RxNewThreadScheduler-1 中执行,而下游接收任务在主线程中进行,并且事件传递不保证顺序(多次执行输出可能都不同),这也是多线程执行顺序的不确定性特点,上篇已介绍过。

源码分析

source.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

有前两篇分析经历,现在就轻车熟路:执行 Observable.create 、 new Observer 、Schedulers.newThread()、

subscribeOn(Scheduler) 后此时主线程应该是下面的样子:

AndroidSchedulers.mainThread()

AndroidSchedulers 是 RxAndroid 中提供的,使用前需要在 Android Studio 中添加依赖。mainThread() 最后会创建 HandlerScheduler:

new HandlerScheduler(new Handler(Looper.getMainLooper()));

HandlerScheduler 也是 Scheduler 的子类,在初始化 HandlerScheduler 的时候创建了一个持有主线程 Looper 的 Handler ,可以猜想后面线程切换很有可能就是 Handler 机制的那一套。此时的主线程

observeOn(Scheduler scheduler)

public final Observable<T> observeOn(Scheduler scheduler) {
    //bufferSize()默认为128
    return observeOn(scheduler, false, bufferSize());
}


/**
 * @param scheduler 调度器
 * @param delayError 发生异常是否马上结束事件流
 * @param bufferSize 缓存队列的默认大小
 * @return Observable
 */
public final Observable<T> observeOn(Scheduler scheduler, 
    boolean delayError, int bufferSize) {
    //省略判空代码
    //这里的this就是之前传下来的ObservableSubscribeOn(黄焖鸡饭店)
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, 
        scheduler, delayError, bufferSize));
}

该方法返回 Observable ,创建了 ObservableObserveOn(已经习惯了,就这几个英文单词排列组合),它也是 Observable 的子类,结合我们举的例子,就给它起名肯德基,肯德基持有黄焖鸡饭店的引用,初始化如下:

class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, 
        Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    
    //省略其他代码
}

此时的主线程:

subscribe(Observer observer)

由上两篇分析可知,这里会先去执行 ObservableObserveOn(肯德基) 的 subscribeActual(observer) 方法,这里的 observer 是顾客小明:

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //步骤①
        Scheduler.Worker w = scheduler.createWorker();
        //步骤② source 即 ObservableSubscribeOn(黄焖鸡饭店)
        source.subscribe(new ObserveOnObserver<T>(observer, w, 
            delayError, bufferSize));
    }
}

步骤① 和上篇一样,这里也会创建 Worker,具体实现在 HandlerScheduler 中

public Worker createWorker() {
    return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;
        
    HandlerWorker(Handler handler) {
        this.handler = handler;
    }
    //省略其他代码
}

并把之前持有主线程 Looper 的 Handler 传进去。

步骤② 先创建了 ObserveOnObserver(总起这种很操蛋的名字),作为 ObservableObserveOn(肯德基)的内部类,它是 BasicIntQueueDisposable 的子类(保证原子性、拥有操作队列功能、保证一次性操作),实现Observer接口(也是个顾客)。结合例子,就给它起名叫顾客小强,只是这个小强功能比较强大,小强持有小明的引用。

接着执行 ObservableSubscribeOn(黄焖鸡饭店)的 subscribe ,具体实现是 subscribeActual :

public void subscribeActual(final Observer<? super T> s) {
    //小红登场,持有小强的引用
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    //调用小强的onSubscribe,把小红传进去
    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

看下小强(ObserveOnObserver)的 onSubscribe:

public void onSubscribe(Disposable s) {
    //判空操作
    if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        //小红不是 QueueDisposable 类型
        if (s instanceof QueueDisposable) {
            //省略部分代码
        }
        // 创建一个大小为128的队列
        queue = new SpscLinkedArrayQueue<T>(bufferSize);
        //actual是小明,执行小明的 onSubscribe ,所以看到 log 打印了
        actual.onSubscribe(this);
    }
}

这里创建了一个队列,大小为128。到目前为止所有操作都发生在主线程中。

回到上面,继续执行:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

上篇文章已经介绍过了,具体流程如下:

在上篇介绍到这的时候说,接下来的操作都是在子线程中进行的,那此时这里会有什么转折呢?

protected void subscribeActual(Observer<? super T> observer) {
    //创建服务员,并和顾客联系,这里的顾客是小红
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //执行顾客小红的的 onSubscribe 
    observer.onSubscribe(parent);

    try {
        //厨师做菜,并和服务员联系
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

后续还有:服务员端菜(CreateEmitter.onNext) —> 顾客小红拿到菜(SubscribeOnObserver.onNext) ,到这里都是执行在子线程中(卧槽,怎么还没切线程啊,这都快走完了),接着小强拿到菜(ObserveOnObserver.onNext),看下代码:

public void onNext(T t) {
    if (done) {
        return;
    }
    //上游数据类型不是异步的
    if (sourceMode != QueueDisposable.ASYNC) {
        //将上游下发的数据放入队列中
        queue.offer(t);
    }
    //调度
    schedule();
}


void schedule() {
    //保证原子性操作
    if (getAndIncrement() == 0) {
        //执行HandlerWorker的schedule,传入的this也就是小强
        //上面说了小强很强大,除了顾客身份,还是个Runnable
        worker.schedule(this);
    }
}

最后执行 HandlerScheduler 的 schedule

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; 
    //这里的handler持有mainLooper,所以传进去的Runnable会在主线程中执行
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

原来用的是 Handler 机制来完成的,那 Runnable 具体执行的是什么呢?看下小强的 run 方法

public void run() {
    //默认为false
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

void drainNormal() {
    int missed = 1;
    //获取队列引用
    final SimpleQueue<T> q = queue;
    //获取小明引用
    final Observer<? super T> a = actual;

    
    for (;;) {
        //检查是否没有数据要发送
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                //从队列中取数据
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }
            //执行小明的onNext()
            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

就是从队列中取出传过来的数据,交给小明的 onNext 方法执行,所以小明的 onNext 是在主线程中执行,这部分流程如下(Queue 即小强内部维护的队列):

上图的事件调度不保证顺序,只是模拟了其中一种情况。

多次observeOn

source.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.newThread())
        .subscribe(observer);

上面我先把下游接收事件指定在主线程,再指定在一个新的线程,打印如下:

看到此时下游接收事件被成功执行在后指定的新线程,这里是怎么实现的呢?分解下代码

source.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())

执行到这的时候应该是这样的:

这也是上面分析过的, subscribeOn 返回的 Observable 称为黄焖鸡店(ObservableSubscribeOn),observeOn 返回的 Observable 称为肯德基1号店(ObservableObserveOn),肯德基1号店持有黄焖鸡店的引用;接着

.observeOn(Schedulers.newThread())

执行到这的时候应该是这样的

把第二次 observeOn 返回的 Observable 称为肯德基2号店(ObservableObserveOn),肯德基2号店持有1号店的引用;接着

.subscribe(observer);

会先执行肯德基2号店的 subscribeActual 方法,这里的 observer 是小明

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //此时schduler是NewThreadScheduler,创建的worker是NewThreadWorker
        Scheduler.Worker w = scheduler.createWorker();
        //source是肯德基1号店,observer是小明
        source.subscribe(new ObserveOnObserver<T>(observer, w,
            delayError, bufferSize));
    }
}

这里会创建小强(ObserveOnObserver),为了和后面区分开,就叫他2号店小强,2号店小强持有小明的引用,之后执行肯德基1号店的 subscribeActual ,observer 是肯德基2号店小强

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //此时schduler是HandlerScheduler,创建的Worker是HandlerWorker
        Scheduler.Worker w = scheduler.createWorker();
        //source是黄焖鸡店,observer是肯德基2号店小强
        source.subscribe(new ObserveOnObserver<T>(observer, w, 
            delayError, bufferSize));
    }
}

这里又创建了小强,就叫他肯德基1号店小强,1号店小强持有2号店小强的引用,整个过程如下

接着执行黄焖鸡店的 subscribeActual,observer 是肯德基1号店小强

public void subscribeActual(final Observer<? super T> s) {
    //s 是肯德基1号店小强,小红登场,小红持有1号店小强引用
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    //执行1号店小强的onSubscribe方法
    s.onSubscribe(parent);
    //此时schduler是NewThreadScheduler
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

执行到上面 s.onSubscribe(parent) 是应该是这样的

因此 onSubscribe 方法还是执行在主线程中;其实看到这就有点明白了,就是一层层的回调…接着执行后面的流程,直接上图

上图省略了其他事件,并且省略了事件入队的过程,至此分析完毕。

本文转载自 HuYounger 的博客:https://rkhcy.github.io/2017/12/22/RxJava2_3/