从响应式编程谈RxJava

参考链接:

《简书上Season_zlc的RxJava2.0系列》(https://www.jianshu.com/c/299d0a51fdd4)
《Github-RxJava》(https://github.com/ReactiveX/RxJava)
《Github-RxAndroid》(https://github.com/ReactiveX/RxAndroid)
《RxJava2.0 Doc》(http://reactivex.io/RxJava/2.x/javadoc/)
《Wiki-diffrent in 2.0》(https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0)

响应式编程

响应式编程是一种基于异步数据流概念的编程模式

响应式编程关键性概念就是事件,在某种程度上,这并不是什么新东西。事件总线(Event buses)或咱们常见的单击事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义操作(原文:side effects,副作用,本文皆翻译为自定义操作)。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和悬停事件数据流。 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。比如,假设你的微博评论就是一个跟单击事件一样的数据流,你能够监听这个流,并做出响应。

最重要的是,有一堆的函数能够创建(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。 这正是“函数式”的魔力所在。一个流能作为另一个流的输入(input),甚至多个流也可以作为其它流的输入。你能合并(merge)两个流。你还能通过过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。

一个流就是一个将要发生的以时间为序的事件序列。它能发射出三种不同的东西:一个数据值(data value)(某种类型的),一个错误(error)或者一个“完成(completed)”的信号。比如说,当前按钮所在的窗口或视图关闭时,“单击”事件流也就“完成”了。

我们只能异步地捕获这些发出的事件:定义一个针对数据值的函数,在发出一个值时,该函数就会异步地执行;针对发出错误时的函数;还有针对发出‘完成’时的函数。有时你可以省略这最后两个函数,只专注于针对数据值的函数。“监听”流的行为叫做订阅。我们定义的这些函数就是观察者。这个流就是被观察的主体(subject)(或“可观察的(observable)”)。这正是观察者设计模式。

RxJava是什么

Rx — Reactive Extensions 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和LINQ-style查询操作。

RxJava — Reactive Extension for Java。是最开始根据微软的 RX为基础,由Netflix主导做出的提供在JVM上实现响应式编程的一种方式。

RxJava是一种在JVM上实现异步数据处理的库,是基于事件的扩展的观察模式

RxJava特点

jar包很小 < 1MB
轻量级框架
支持Java 8 lambda
支持Java 6+ & Android 2.3+
支持同步和异步
使用简洁
解耦、单一化、不嵌套

扩展的观察者

onCompleted()事件
onError()事件
组合而不是嵌套,避免陷入回调地域

RxAndroid是什么

是Rxjava针对Android平台的一个扩展,用于Android开发
提供响应式扩展组建快速方便开发android应用程序

Schedulers(调度器)

Schedulers在RxAndroid中解决Android主线程问题【针对Android】,解决多线程线程问题。

在不指定特定线程的情况下,RxAndroid遵循的是线程不变原则 ,在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生成事件就在哪个线程消费事件。如果我们要让事件切换线程就要用到Schedulers.

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

RxJava和观察者模式

观察者模式的四大要素:
Observable 被观察者
Observer 观察者
subscribe 订阅
事件

观察者模式本质上是通过订阅将被观察者产生的事件传递给观察者

RxJava扩展的观察者模式

RxJava中相对于观察者模式多了onCompleted()和onError()两个方法,这两个互斥的方法能唯一确定事件的结束。

RxJava实践

RxJava 的 Helloword

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void testRxJava(){
//创建被观察者
Observable mObservable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hellowrod!");
subscriber.onCompleted();
}
});

//创建观察者
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};

//订阅事件
mObservable.subscribe(subscriber);

}

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

RxJava中的不完整回调

Action1和Action0都是RxJava的一个接口

Action0它只有一个方法call(),这个方法无参数返回值,可以和无参数的onCompleted()结合使用。

Action1也有一个方法call(T param),这个方法有一个参数返回可以和有一个参数的onNext(T obj)和onError(Throwable error)结合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

线程控制

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

RxJava操作符

Creating Observables (创建操作符)

create

create是最基础的创建方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("地痞兔劈叉");
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
});

just

just相当于create的简写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just("地痞兔劈叉").subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
});

from

from相当于just的集合形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.from(new Integer[]{1, 2, 3, 4}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(Integer s) {
System.out.println("onNext: " + s);
}
});

defer

defer在我们调用create的时候不会创建subscribe对象,只有在我们订阅的时候才会创建。

empty/never/throw

empty创建一个空的observable对象,回调onComplete()方法。
never不回调任何方法。
throw回调onError()方法

interval

interval间隔事件发送(相当于定时器)

range

range创建一个范围内的数据的事件发送

repeat

repeat发送一个特定的数据事件,重复发送

start

start回调onStart()方法

timer

timer进行一个延时后发送数据事件

Transforming Observables (转换操作符)

map

map把一个对象转换成另一个对象(类型转换),比如下面将Integer转为String

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(123).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});

flatmap

flatmap是一个一对多或者多对多的转换,转换后的是一个Observable对象,例如下面将integer转换为string

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.just(1, 2, 3, 4, 5).flatMap(new Func1<Integer, Observable<? extends String>>() {

@Override
public Observable<? extends String> call(Integer integer) {
return Observable.just(String.valueOf(integer));
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});

groupby

groupby是通过指定规则对数据类别进行分类,例如下面我们对1,2,3,4,5进行了分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(1, 2, 3, 4, 5).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 2;
}
}).subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

}
});

buffer

buffer可以一次订阅多个,onNext的回调是一个集合

scan

scan将数据累加来发送事件给观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.range(1, 5).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer integer) {
return sum + integer;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {

}
});

window

window可以通过时间间隔来分割成列表发送到观察者

Filtering Observables (过滤操作符)

debounce

debuounce的意思是在规定时间间隔外才发送数据(间隔内数据忽略)

distinct

distinct是一个去重操作符,可以去掉列表数据中重复的数据项

elementAt

elementAt取出列表数据指定位置的元素

filter

filter通过指定的规则返回boolean值来确定是否过滤掉某些数据项

first

first取出列表数据项的第一个数据

ignoreElements

ignoreElements不回调onNext,只调用onComplete()和onError()

last

last取出列表数据项的最后一个数据

sample

sample(采样发送)每隔一定时间采样数据列表一起发送给观察者

skip

skip跳过列表数据项的前几项

skiplast

skiplast跳过列表数据项的后几项

take

take取列表数据项的前几项

takelast

takelast取列表数据项的后几项

Combining Observables (组合操作符)

zip

zip是通过定义的组合规则将两列数据进行组合后发送给观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

merge

merge是将两列数据通过时间合并成一列新数据列发送给观察者

startwith

startwith在数据列表之前插入一个数据发送给观察者

combineLatest

combineLatest将两个列表通过某规则组合成一个数据列表

join

可以设置发送一个有效期的数据(比如有效期5秒)

switchOnNext

swichOnNext将拥有很多小的Observable数据对象组合成一个Obserable数据对象

Error Handling Operators (处理错误)

onErrorResumeNext

onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable

1
2
3
4
Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(1,2,3))
.subscribe(integer -> Log.d("JG",integer.toString())); //1,2,3

onExceptionResumeNext

当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误

onErrorReturn

当原始Observable在遇到错误时发射一个特定的数据

retry

retry 当原始Observable在遇到错误时进行重试

1
2
3
4
Observable.just(1,"2",3)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));//1,1,1,1,onError