从响应式编程谈RxJava

参考链接:

响应式编程

响应式编程是一种基于异步数据流概念的编程模式

响应式编程关键性概念就是事件,在某种程度上,这并不是什么新东西。事件总线(Event buses)或咱们常见的单击事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义操作(原文:side effects,副作用,本文皆翻译为自定义操作)。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和悬停事件数据流。 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。比如,假设你的微博评论就是一个跟单击事件一样的数据流,你能够监听这个流,并做出响应。

最重要的是,有一堆的函数能够 创建(create)任何流,也能将任何流进行 组合(combine)和 过滤(filter)。 这正是 函数式 的魔力所在。一个流能作为另一个流的 输入(input),甚至多个流也可以作为其它流的输入。你能 合并(merge)两个流。你还能通过 过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据 映射(map)到一个新的流中。

响应式编程流模型

一个流就是一个将要发生的以时间为序的 事件序列。它能发射出三种不同的东西:一个 数据值(data value)(某种类型的),一个 错误(error)或者一个 完成(completed)的信号。比如说,当前按钮所在的窗口或视图关闭时,“单击”事件流也就“完成”了。

我们只能异步地捕获这些发出的事件:定义一个针对数据值的函数,在发出一个值时,该函数就会异步地执行;针对发出 错误 时的函数;还有针对发出 完成 时的函数。有时你可以省略这最后两个函数,只专注于针对数据值的函数。监听流的行为叫做订阅。我们定义的这些函数就是观察者。这个流就是被观察的主体 (subject) 或可观察的 (observable) 。这正是观察者设计模式。

RxJava 是什么

RxReactive Extensions 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和 LINQ-style 查询操作。

RxJavaReactive Extension for Java 是最开始根据微软的 RX 为基础,由 Netflix 主导做出的提供在 JVM 上实现响应式编程的一种方式。

RxJava 是一种在 JVM 上实现异步数据处理的库,是基于事件的扩展的观察模式

RxJava 特点

  • jar 包很小 < 1MB
  • 轻量级框架
  • 支持 Java 8 lambda
  • 支持 Java 6+ & Android 2.3+
  • 支持同步和异步
  • 使用简洁
  • 解耦、单一化、不嵌套

扩展的观察者

  • onCompleted() 事件
  • onError() 事件
  • 组合而不是嵌套,避免陷入回调地域

RxAndroid 是什么

是 Rxjava 针对 Android 平台的一个扩展,用于 Android 开发 提供响应式扩展组建快速方便开发 Android 应用程序

Schedulers(调度器)

Schedulers 在 RxAndroid 中解决 Android 主线程问题(针对 Android),解决多线程线程问题。

在不指定特定线程的情况下,RxAndroid 遵循的是线程不变原则,在哪个线程调用 subscribe() 就在哪个线程生产事件,在哪个线程生成事件就在哪个线程消费事件。如果我们要让事件切换线程就要用到 Schedulers.

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

RxJava 和观察者模式

观察者模式的四大要素:

  • Observable 被观察者
  • Observer 观察者
  • subscribe 订阅
  • 事件

观察者模式模型

观察者模式本质上是通过订阅将被观察者产生的事件传递给观察者

RxJava 扩展的观察者模式

RxJava的事件观察模型

RxJava 中相对于观察者模式多了 onCompleted()onError() 两个方法,这两个互斥的方法能唯一确定事件的结束。

RxJava 实践

RxJava 的 Helloword

private void testRxJava(){
    //创建被观察者
    Observable mObservable = Observable.create(new Observable.OnSubscribe<String>() {

        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hellowrod!");
            subscriber.onNext("i'm");
            subscriber.onNext("shuihan");
            subscriber.onCompleted();
        }
    });

    //创建观察者
    Subscriber subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError");
        }

        @Override
        public void onNext(String s) {
            System.out.println("onNext: " + s);
        }
    };

    //订阅事件
    mObservable.subscribe(subscriber);

}

事件流向示意图

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribecall() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者 Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

RxJava 中的不完整回调

Action1Action0 都是 RxJava 的一个接口

Action0 它只有一个方法 call(), 这个方法无参数返回值,可以和无参数的 onCompleted() 结合使用。

Action1 也有一个方法 call(T param), 这个方法有一个参数返回可以和有一个参数的 onNext(T obj)onError(Throwable error) 结合使用。

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

线程控制

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

RxJava操作符

在 Rxjava 中的操作符是很多的,按照用途可以分为几大类,操作符索引列表请参考:RxJava 操作符索引. 下面对部分操作符进行一些操作示例。

注意:下面案例均已改为 RxJava 2.0 的函数案例,所以在你实践之前,请引入 RxJava 2.0 的库

关于 RxJava 2.0 的基础知识,可以参考我的另一篇博文 《Rxjava2使用总结》

create

create 是最基础的创建事件流的方法

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("水寒的博客");
        emitter.onNext("https://dp2px.com");
        emitter.onComplete();
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        LogPlus.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete

just

just 相当于 create 的简写

Observable.just("水寒的博客", "https://dp2px.com").subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        LogPlus.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete

from

from 相当于 just 的集合形式,from 所对应的方法如下:

下面我们先看看 fromArray 的使用:

String[] siteInfo = new String[]{"水寒的博客", "https://dp2px.com"};

Observable.fromArray(siteInfo).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        LogPlus.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete

Java 中的集合比数组更加常用,所以我们再看看 fromIterable 的使用,下面我们以 List<String> 为例:

List<String> siteInfo = new ArrayList<>();
siteInfo.add("水寒的博客");
siteInfo.add("https://dp2px.com");
siteInfo.add("欢迎访问");
siteInfo.add("hugo博客搭建");

Observable.fromIterable(siteInfo).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        LogPlus.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onNext: 欢迎访问
onNext: hugo博客搭建
onComplete

同理,我们可以猜想,如果是 Set<String> 集合,onNext 的顺序就不能保证有序了。

map

map 把一个对象转换成另一个对象(类型转换),比如下面将 Integer 转为 String, 你可能注意到了,这个 map 是一个 转换符,而不是前面的创建符。

Observable.just(1, 2).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        if(integer == 1) return "水寒的博客";
        else if(integer == 2) return "https://dp2px.com";
        return "undefined";
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        LogPlus.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete

flatmap

上面的 map 实现了一个 一对一 的类型转换,除了 map 外我们还留意到有一个 flatmapflatmap 是一个 一对多 或者 多对多 的转换,转换后的是一个 ObservableSource 对象,例如下面将 Integer 转换为 String, 而且特别是这个 一对多, 也就是说 flatmap 可以将事件拆分。

Observable.just(1, 2).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(final Integer integer) throws Exception {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                for(int i = 0; i < integer; i++){
                    emitter.onNext("group-" + integer + ", data-" + i);
                }
                emitter.onComplete();
            }
        });
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        LogPlus.d(TAG, "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
onNext: group-1, data-0
onNext: group-2, data-0
onNext: group-2, data-1
onComplete

concatMap

上面的 flatMap 实际上不能保证数据严格有序的,要保证数据严格有序只能使用 concatMap, 你可能会有疑问了,上面的代码不是有序的吗?之所以有序是因为我们这里的数据转换只是简单的字符串拼接,执行速度非常快,才会产生 flatMap 按顺序发射数据的假象,我们将 flatMap 的转换加上 50 msdelay,再运行一下就会发现出现问题了。 这里就不做示例了,因为只需要将函数名替换即可。

groupby

groupBy 用于分组元素,它可以被用来根据指定的条件将元素分成若干组。它将得到一个 Observable<GroupedObservable<T, M>> 类型的 Observable 。 通常我们会使用 groupBy 配合 flapMap 实现并发。例如下面我们对 1,2,3,4,5 个任务进行了分组。

Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer integer) throws Exception {
        return integer % 2;  //分为两组
    }
}).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(GroupedObservable<Integer, Integer> groupedObservable) {
        groupedObservable.delay(50, TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogPlus.d(TAG, "Group--onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                LogPlus.d(TAG, "Group--onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                LogPlus.d(TAG, "Gorup--onError");
            }

            @Override
            public void onComplete() {
                LogPlus.d(TAG, "Group--onComplete");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

输出结果:

onSubscribe
Group--onSubscribe
Group--onSubscribe
onComplete
Group--onNext: 1
Group--onNext: 2
Group--onNext: 4
Group--onComplete
Group--onNext: 5
Group--onComplete

通过 groupBy 将 数据 分组,再将每组的数据通过 flatMap 调度至一个 线程 来执行。 groupByflatMap 的组合,可以任意控制并发数。

scan

scan 将数据累加来发送事件给观察者

 Observable.just(1, 2, 3, 4, 5).scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer sum, Integer integer) throws Exception {
        return sum + integer;
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogPlus.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        LogPlus.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e) {
        LogPlus.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        LogPlus.d(TAG, "onComplete");
    }
});

执行结果:

onSubscribe
onNext: 1
onNext: 3
onNext: 6
onNext: 10
onNext: 15
onComplete

zip

zip 是通过定义的组合规则将两列数据进行组合后发送给观察者

事件组合流向示意图

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            
    @Override                                                                                         
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                      
        Log.d(TAG, "emit 1");                                                                         
        emitter.onNext(1);                                                                            
        Log.d(TAG, "emit 2");                                                                         
        emitter.onNext(2);                                                                            
        Log.d(TAG, "emit 3");                                                                         
        emitter.onNext(3);                                                                            
        Log.d(TAG, "emit 4");                                                                         
        emitter.onNext(4);                                                                            
        Log.d(TAG, "emit complete1");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                   
                                                                                                      
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {              
    @Override                                                                                         
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                       
        Log.d(TAG, "emit A");                                                                         
        emitter.onNext("A");                                                                          
        Log.d(TAG, "emit B");                                                                         
        emitter.onNext("B");                                                                          
        Log.d(TAG, "emit C");                                                                         
        emitter.onNext("C");                                                                          
        Log.d(TAG, "emit complete2");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                     
                                                                                                      
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                  
    @Override                                                                                         
    public String apply(Integer integer, String s) throws Exception {                                 
        return integer + s;                                                                           
    }                                                                                                 
}).subscribe(new Observer<String>() {                       
    @Override                                                                                         
    public void onSubscribe(Disposable d) {                                                           
        Log.d(TAG, "onSubscribe");                                                                    
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onNext(String value) {                                                                
        Log.d(TAG, "onNext: " + value);                                                               
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onError(Throwable e) {                                                                
        Log.d(TAG, "onError");                                                                        
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onComplete() {                                                                        
        Log.d(TAG, "onComplete");                                                                     
    }                                                                                                 
});

count

count 是统计事件流数量,例如下面的结果是 5.

Observable.just(1, 2, 3, 4, 5).count().subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        LogPlus.d("Accept: " + aLong);
    }
});

onErrorResumeNext

onErrorResumeNext 当原始 Observable 在遇到错误时,使用备用 Observable

Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(1,2,3))
.subscribe(integer -> Log.d("JG",integer.toString())); //1,2,3

retry

retry 当原始 Observable 在遇到错误时进行重试

Observable.just(1,"2",3)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));//1,1,1,1,onError