参考链接:
响应式编程
响应式编程是一种基于异步数据流概念的编程模式
响应式编程关键性概念就是事件,在某种程度上,这并不是什么新东西。事件总线(Event buses)或咱们常见的单击事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义操作(原文:side effects,副作用,本文皆翻译为自定义操作)。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和悬停事件数据流。 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。比如,假设你的微博评论就是一个跟单击事件一样的数据流,你能够监听这个流,并做出响应。
最重要的是,有一堆的函数能够 创建
(create)任何流,也能将任何流进行 组合
(combine)和 过滤
(filter)。 这正是 函数式
的魔力所在。一个流能作为另一个流的 输入
(input),甚至多个流也可以作为其它流的输入。你能 合并
(merge)两个流。你还能通过 过滤
(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据 映射
(map)到一个新的流中。
响应式编程流模型
一个流就是一个将要发生的以时间为序的 事件序列
。它能发射出三种不同的东西:一个 数据值
(data value)(某种类型的),一个 错误
(error)或者一个 完成
(completed)的信号。比如说,当前按钮所在的窗口或视图关闭时,“单击”事件流也就“完成”了。
我们只能异步地捕获这些发出的事件:定义一个针对数据值的函数,在发出一个值时,该函数就会异步地执行;针对发出 错误 时的函数;还有针对发出 完成 时的函数。有时你可以省略这最后两个函数,只专注于针对数据值的函数。监听流的行为叫做订阅。我们定义的这些函数就是观察者。这个流就是被观察的主体 (subject) 或可观察的 (observable) 。这正是观察者设计模式。
RxJava 是什么
Rx
— Reactive Extensions
原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和 LINQ-style
查询操作。
RxJava
— Reactive Extension for Java
是最开始根据微软的 RX
为基础,由 Netflix
主导做出的提供在 JVM
上实现响应式编程的一种方式。
RxJava
是一种在 JVM
上实现异步数据处理的库,是基于事件的扩展的观察模式。
RxJava 特点
- jar 包很小 < 1MB
- 轻量级框架
- 支持 Java 8 lambda
- 支持 Java 6+ & Android 2.3+
- 支持同步和异步
- 使用简洁
- 解耦、单一化、不嵌套
扩展的观察者
- onCompleted() 事件
- onError() 事件
- 组合而不是嵌套,避免陷入回调地域
RxAndroid 是什么
是 Rxjava 针对 Android 平台的一个扩展,用于 Android 开发
提供响应式扩展组建快速方便开发 Android 应用程序
Schedulers(调度器)
Schedulers
在 RxAndroid 中解决 Android 主线程问题(针对 Android),解决多线程线程问题。
在不指定特定线程的情况下,RxAndroid 遵循的是线程不变原则,在哪个线程调用 subscribe()
就在哪个线程生产事件,在哪个线程生成事件就在哪个线程消费事件。如果我们要让事件切换线程就要用到 Schedulers.
1
2
3
4
5
6
7
8
9
| Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
|
RxJava 和观察者模式
观察者模式的四大要素:
- Observable 被观察者
- Observer 观察者
- subscribe 订阅
- 事件
观察者模式模型
观察者模式本质上是通过订阅将被观察者产生的事件传递给观察者
RxJava 扩展的观察者模式
RxJava的事件观察模型
RxJava 中相对于观察者模式多了 onCompleted()
和 onError()
两个方法,这两个互斥的方法能唯一确定事件的结束。
RxJava 实践
RxJava 的 Helloword
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| private void testRxJava(){
//创建被观察者
Observable mObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hellowrod!");
subscriber.onNext("i'm");
subscriber.onNext("shuihan");
subscriber.onCompleted();
}
});
//创建观察者
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};
//订阅事件
mObservable.subscribe(subscriber);
}
|
事件流向示意图
可以看到,这里传入了一个 OnSubscribe
对象作为参数。OnSubscribe
会被存储在返回的 Observable
对象中,它的作用相当于一个计划表,当 Observable
被订阅的时候,OnSubscribe
的 call()
方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者 Subscriber
将会被调用三次 onNext()
和一次 onCompleted()
)。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
RxJava 中的不完整回调
Action1
和 Action0
都是 RxJava 的一个接口
Action0
它只有一个方法 call()
, 这个方法无参数返回值,可以和无参数的 onCompleted()
结合使用。
Action1
也有一个方法 call(T param)
, 这个方法有一个参数返回可以和有一个参数的 onNext(T obj)
和 onError(Throwable error)
结合使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
|
线程控制
1
2
3
4
5
6
7
8
9
| Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
|
RxJava操作符
在 Rxjava 中的操作符是很多的,按照用途可以分为几大类,操作符索引列表请参考:RxJava 操作符索引. 下面对部分操作符进行一些操作示例。
注意:下面案例均已改为 RxJava 2.0 的函数案例,所以在你实践之前,请引入 RxJava 2.0 的库
关于 RxJava 2.0 的基础知识,可以参考我的另一篇博文 《Rxjava2使用总结》
create
create
是最基础的创建事件流的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("水寒的博客");
emitter.onNext("https://dp2px.com");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogPlus.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
| onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete
|
just
just
相当于 create
的简写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| Observable.just("水寒的博客", "https://dp2px.com").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogPlus.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
| onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete
|
from
from
相当于 just
的集合形式,from 所对应的方法如下:
下面我们先看看 fromArray
的使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| String[] siteInfo = new String[]{"水寒的博客", "https://dp2px.com"};
Observable.fromArray(siteInfo).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogPlus.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
| onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete
|
Java 中的集合比数组更加常用,所以我们再看看 fromIterable
的使用,下面我们以 List<String>
为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| List<String> siteInfo = new ArrayList<>();
siteInfo.add("水寒的博客");
siteInfo.add("https://dp2px.com");
siteInfo.add("欢迎访问");
siteInfo.add("hugo博客搭建");
Observable.fromIterable(siteInfo).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogPlus.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
5
6
| onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onNext: 欢迎访问
onNext: hugo博客搭建
onComplete
|
同理,我们可以猜想,如果是 Set<String>
集合,onNext
的顺序就不能保证有序了。
map
map
把一个对象转换成另一个对象(类型转换),比如下面将 Integer
转为 String
, 你可能注意到了,这个 map
是一个 转换符
,而不是前面的创建符。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| Observable.just(1, 2).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
if(integer == 1) return "水寒的博客";
else if(integer == 2) return "https://dp2px.com";
return "undefined";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogPlus.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
| onSubscribe
onNext: 水寒的博客
onNext: https://dp2px.com
onComplete
|
flatmap
上面的 map
实现了一个 一对一
的类型转换,除了 map 外我们还留意到有一个 flatmap
,flatmap
是一个 一对多
或者 多对多
的转换,转换后的是一个 ObservableSource
对象,例如下面将 Integer
转换为 String
, 而且特别是这个 一对多
, 也就是说 flatmap
可以将事件拆分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| Observable.just(1, 2).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
for(int i = 0; i < integer; i++){
emitter.onNext("group-" + integer + ", data-" + i);
}
emitter.onComplete();
}
});
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogPlus.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
5
| onSubscribe
onNext: group-1, data-0
onNext: group-2, data-0
onNext: group-2, data-1
onComplete
|
concatMap
上面的 flatMap
实际上不能保证数据严格有序的,要保证数据严格有序只能使用 concatMap
, 你可能会有疑问了,上面的代码不是有序的吗?之所以有序是因为我们这里的数据转换只是简单的字符串拼接,执行速度非常快,才会产生 flatMap
按顺序发射数据的假象,我们将 flatMap
的转换加上 50 ms
的 delay
,再运行一下就会发现出现问题了。 这里就不做示例了,因为只需要将函数名替换即可。
groupby
groupBy
用于分组元素,它可以被用来根据指定的条件将元素分成若干组。它将得到一个 Observable<GroupedObservable<T, M>>
类型的 Observable
。 通常我们会使用 groupBy
配合 flapMap
实现并发。例如下面我们对 1,2,3,4,5 个任务进行了分组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 2; //分为两组
}
}).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(GroupedObservable<Integer, Integer> groupedObservable) {
groupedObservable.delay(50, TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "Group--onSubscribe");
}
@Override
public void onNext(Integer integer) {
LogPlus.d(TAG, "Group--onNext: " + integer);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "Gorup--onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "Group--onComplete");
}
});
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
输出结果:
1
2
3
4
5
6
7
8
9
10
| onSubscribe
Group--onSubscribe
Group--onSubscribe
onComplete
Group--onNext: 1
Group--onNext: 2
Group--onNext: 4
Group--onComplete
Group--onNext: 5
Group--onComplete
|
通过 groupBy
将 数据 分组,再将每组的数据通过 flatMap
调度至一个 线程 来执行。 groupBy
与 flatMap
的组合,可以任意控制并发数。
scan
scan
将数据累加来发送事件给观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| Observable.just(1, 2, 3, 4, 5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer sum, Integer integer) throws Exception {
return sum + integer;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogPlus.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
LogPlus.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
LogPlus.d(TAG, "onError");
}
@Override
public void onComplete() {
LogPlus.d(TAG, "onComplete");
}
});
|
执行结果:
1
2
3
4
5
6
7
| onSubscribe
onNext: 1
onNext: 3
onNext: 6
onNext: 10
onNext: 15
onComplete
|
zip
zip
是通过定义的组合规则将两列数据进行组合后发送给观察者
事件组合流向示意图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
|
count
count
是统计事件流数量,例如下面的结果是 5
.
1
2
3
4
5
6
| Observable.just(1, 2, 3, 4, 5).count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogPlus.d("Accept: " + aLong);
}
});
|
onErrorResumeNext
onErrorResumeNext
当原始 Observable
在遇到错误时,使用备用 Observable
1
2
3
4
| Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(1,2,3))
.subscribe(integer -> Log.d("JG",integer.toString())); //1,2,3
|
retry
retry
当原始 Observable
在遇到错误时进行重试
1
2
3
4
| Observable.just(1,"2",3)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));//1,1,1,1,onError
|
关于 RxJava2.0 相关请参考我的另一篇文章《Rxjava2使用总结》