从响应式编程谈RxJava

参考链接:

《简书上Season_zlc的RxJava2.0系列》(https://www.jianshu.com/c/299d0a51fdd4)
《Github-RxJava》(https://github.com/ReactiveX/RxJava)
《Github-RxAndroid》(https://github.com/ReactiveX/RxAndroid)
《RxJava2.0 Doc》(http://reactivex.io/RxJava/2.x/javadoc/)
《Wiki-diffrent in 2.0》(https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0)

响应式编程

响应式编程是一种基于异步数据流概念的编程模式

响应式编程关键性概念就是事件,在某种程度上,这并不是什么新东西。事件总线(Event buses)或咱们常见的单击事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义操作(原文:side effects,副作用,本文皆翻译为自定义操作)。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和悬停事件数据流。 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。比如,假设你的微博评论就是一个跟单击事件一样的数据流,你能够监听这个流,并做出响应。

最重要的是,有一堆的函数能够创建(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。 这正是“函数式”的魔力所在。一个流能作为另一个流的输入(input),甚至多个流也可以作为其它流的输入。你能合并(merge)两个流。你还能通过过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。

响应式编程流模型

一个流就是一个将要发生的以时间为序的事件序列。它能发射出三种不同的东西:一个数据值(data value)(某种类型的),一个错误(error)或者一个“完成(completed)”的信号。比如说,当前按钮所在的窗口或视图关闭时,“单击”事件流也就“完成”了。

我们只能异步地捕获这些发出的事件:定义一个针对数据值的函数,在发出一个值时,该函数就会异步地执行;针对发出错误时的函数;还有针对发出‘完成’时的函数。有时你可以省略这最后两个函数,只专注于针对数据值的函数。“监听”流的行为叫做订阅。我们定义的这些函数就是观察者。这个流就是被观察的主体(subject)(或“可观察的(observable)”)。这正是观察者设计模式。

RxJava是什么

Rx — Reactive Extensions 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和LINQ-style查询操作。

RxJava — Reactive Extension for Java。是最开始根据微软的 RX为基础,由Netflix主导做出的提供在JVM上实现响应式编程的一种方式。

RxJava是一种在JVM上实现异步数据处理的库,是基于事件的扩展的观察模式

RxJava特点

jar包很小 < 1MB
轻量级框架
支持Java 8 lambda
支持Java 6+ & Android 2.3+
支持同步和异步
使用简洁
解耦、单一化、不嵌套

扩展的观察者

onCompleted()事件
onError()事件
组合而不是嵌套,避免陷入回调地域

RxAndroid是什么

是Rxjava针对Android平台的一个扩展,用于Android开发
提供响应式扩展组建快速方便开发android应用程序

Schedulers(调度器)

Schedulers在RxAndroid中解决Android主线程问题(针对Android),解决多线程线程问题。

在不指定特定线程的情况下,RxAndroid遵循的是线程不变原则 ,在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生成事件就在哪个线程消费事件。如果我们要让事件切换线程就要用到Schedulers.

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

RxJava和观察者模式

观察者模式的四大要素:

  • Observable 被观察者
  • Observer 观察者
  • subscribe 订阅
  • 事件

观察者模式模型

观察者模式本质上是通过订阅将被观察者产生的事件传递给观察者

RxJava扩展的观察者模式

RxJava的事件观察模型

RxJava中相对于观察者模式多了onCompleted()onError()两个方法,这两个互斥的方法能唯一确定事件的结束。

RxJava实践

RxJava 的 Helloword

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void testRxJava(){
//创建被观察者
Observable mObservable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hellowrod!");
subscriber.onCompleted();
}
});

//创建观察者
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};

//订阅事件
mObservable.subscribe(subscriber);

}

事件流向示意图

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

RxJava中的不完整回调

Action1和Action0都是RxJava的一个接口

Action0它只有一个方法call(),这个方法无参数返回值,可以和无参数的onCompleted()结合使用。

Action1也有一个方法call(T param),这个方法有一个参数返回可以和有一个参数的onNext(T obj)和onError(Throwable error)结合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

线程控制

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

RxJava操作符

Creating Observables (创建操作符)

create

create是最基础的创建方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("地痞兔劈叉");
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
});

just

just相当于create的简写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just("地痞兔劈叉").subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
});

from

from相当于just的集合形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.from(new Integer[]{1, 2, 3, 4}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onNext(Integer s) {
System.out.println("onNext: " + s);
}
});

defer

defer在我们调用create的时候不会创建subscribe对象,只有在我们订阅的时候才会创建。

empty/never/throw

empty创建一个空的observable对象,回调onComplete()方法。
never不回调任何方法。
throw回调onError()方法

interval

interval间隔事件发送(相当于定时器)

range

range创建一个范围内的数据的事件发送

repeat

repeat发送一个特定的数据事件,重复发送

start

start回调onStart()方法

timer

timer进行一个延时后发送数据事件

Transforming Observables (转换操作符)

map

map把一个对象转换成另一个对象(类型转换),比如下面将Integer转为String

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(123).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});

flatmap

flatmap是一个一对多或者多对多的转换,转换后的是一个Observable对象,例如下面将integer转换为string

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.just(1, 2, 3, 4, 5).flatMap(new Func1<Integer, Observable<? extends String>>() {

@Override
public Observable<? extends String> call(Integer integer) {
return Observable.just(String.valueOf(integer));
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
});

groupby

groupby是通过指定规则对数据类别进行分类,例如下面我们对1,2,3,4,5进行了分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(1, 2, 3, 4, 5).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 2;
}
}).subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

}
});

buffer

buffer可以一次订阅多个,onNext的回调是一个集合

scan

scan将数据累加来发送事件给观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.range(1, 5).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer integer) {
return sum + integer;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {

}
});

window

window可以通过时间间隔来分割成列表发送到观察者

Filtering Observables (过滤操作符)

debounce

debuounce的意思是在规定时间间隔外才发送数据(间隔内数据忽略)

distinct

distinct是一个去重操作符,可以去掉列表数据中重复的数据项

elementAt

elementAt取出列表数据指定位置的元素

filter

filter通过指定的规则返回boolean值来确定是否过滤掉某些数据项

first

first取出列表数据项的第一个数据

ignoreElements

ignoreElements不回调onNext,只调用onComplete()和onError()

last

last取出列表数据项的最后一个数据

sample

sample(采样发送)每隔一定时间采样数据列表一起发送给观察者

skip

skip跳过列表数据项的前几项

skiplast

skiplast跳过列表数据项的后几项

take

take取列表数据项的前几项

takelast

takelast取列表数据项的后几项

Combining Observables (组合操作符)

zip

zip是通过定义的组合规则将两列数据进行组合后发送给观察者

事件组合流向示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

merge

merge是将两列数据通过时间合并成一列新数据列发送给观察者

startwith

startwith在数据列表之前插入一个数据发送给观察者

combineLatest

combineLatest将两个列表通过某规则组合成一个数据列表

join

可以设置发送一个有效期的数据(比如有效期5秒)

switchOnNext

swichOnNext将拥有很多小的Observable数据对象组合成一个Obserable数据对象

Error Handling Operators (处理错误)

onErrorResumeNext

onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable

1
2
3
4
Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(1,2,3))
.subscribe(integer -> Log.d("JG",integer.toString())); //1,2,3

onExceptionResumeNext

当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误

onErrorReturn

当原始Observable在遇到错误时发射一个特定的数据

retry

retry 当原始Observable在遇到错误时进行重试

1
2
3
4
Observable.just(1,"2",3)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));//1,1,1,1,onError

评论

Ajax Android AndroidStudio Animation Anroid Studio AppBarLayout Babel Banner Buffer Bulma ByteBuffer C++ C11 C89 C99 CDN CMYK COM1 COM2 CSS Camera Raw, 直方图 Chrome Class ContentProvider CoordinatorLayout C语言 DML DOM Dagger Dagger2 Darktable Demo Document DownloadManage ES2015 ESLint Element Error Exception Extensions File FileProvider Flow Fresco GCC Git GitHub GitLab Gradle Groovy HTML5 Handler HandlerThread Hexo Hybrid I/O IDEA IO ImageMagick IntelliJ Intellij Interpolator JCenter JNI JS Java JavaScript JsBridge Kotlin Lab Lambda Lifecycle Lint Linux Looper MQTT MVC MVP Maven MessageQueue Modbus Momentum MySQL NDK NIO NexT Next Nodejs ObjectAnimator Oracle VM Permission PhotoShop Physics Python RGB RS-232 RTU Remote-SSH Retrofit Runnable RxAndroid RxJava SE0 SSH Spring SpringBoot Statubar Style Task Theme Thread Tkinter UI UIKit UML VM virtualBox VS Code VUE ValueAnimator ViewPropertyAnimator Vue Vue.js Web Web前端 Workbench api apk bookmark by关键字 cli compileOnly computed css c语言 databases demo hexo hotfix html iOS icarus implementation init jQuery javascript launchModel logo merge methods mvp offset photos pug query rxjava2 scss servlet shell svg tkinter tomcat transition unicode utf-8 vector virtual box vscode watch webpack 七牛 下载 中介者模式 串口 临潼石榴 主题 书签 事件 享元模式 仓库 代理模式 位运算 依赖注入 修改,tables 光和色 内存 内核 内部分享 函数 函数式编程 分支 分析 创建 删除 动画 单例模式 压缩图片 发布 可空性 合并 同向性 后期 启动模式 命令 命令模式 响应式 响应式编程 图层 图床 图片压缩 图片处理 图片轮播 地球 域名 基础 增加 备忘录模式 外观模式 多线程 大爆炸 天气APP 太白山 头文件 奇点 字符串 字符集 存储引擎 宇宙 宏定义 实践 属性 属性动画 岐山擀面皮 岐山肉臊子 岐山香醋 工具 工厂模式 年终总结 开发技巧 异常 弱引用 恒星 打包 技巧 指令 指针 插件 插值 摄影 操作系统 攻略 故事 数据库 数据类型 数组 文件 新功能 旅行 旋转木马 时序图 时空 时间简史 曲线 杂谈 权限 枚举 架构 查询 标准库 标签选择器 样式 核心 框架 案例 桥接模式 检测工具 模块化 模板 模板引擎 模板方法模式 油泼辣子 泛型 洛川苹果 浅色状态栏 渲染 源码 源码分析 瀑布流 热修复 版本 版本控制 状态栏 状态模式 生活 留言板 相册 相对论 眉县猕猴桃 知识点 码云 磁盘 科学 笔记 策略模式 类图 系统,发行版, GNU 索引 组件 组合模式 绑定 结构 结构体 编码 网易云信 网格布局 网站广播 网站通知 网络 美化 联合 脚手架 膨胀的宇宙 自定义 自定义View 自定义插件 蒙版 虚拟 虚拟机 补码 补齐 表单 表达式 装饰模式 西安 观察者模式 规范 视图 视频 解耦器模式 设计 设计原则 设计模式 访问者模式 语法 责任链模式 贪吃蛇 转换 软件工程 软引用 运算符 迭代子模式 适配器模式 选择器 通信 通道 配置 链表 锐化 错误 键盘 闭包 降噪 陕西地方特产 面向对象 项目优化 项目构建 黑洞
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×