Rxjava2使用总结

前言

Rxjava2是函数式编程响应式编程两种牛逼的思想结合的产物,也是在Java中这两种思想的一种实现。

函数响应式编程(Functional Reactive Programming:FRP):是一种通过一系列函数的组合调用来发射,转变,监听,响应数据流的编程范式。

在Rxjava2中提供了5种观察者模式来实现这种函数响应式编程思想。

ObservableSource/Observer

可通过onNext方法发送单条数据或者数据序列,通过onComplete发送完成通知或通过onError发送异常通知,不支持背压策略。

抽象类Observable是接口ObservableSource下的一个抽象实现,我们可以通过Observable创建一个可观察对象发射数据流。

1
2
3
4
5
6
7
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello World");
emitter.onComplete();
}
});

调用Observable.create方法,创建一个可观察对象,并通过onNext发送一条数据“Hello World”,然后通过onComplete发送完成通知。

创建一个观察者Observer来接受并响应可观察对象发射的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("接受完成");
}
};

在onNext方法中接收到可观察对象发射的数据”Hello World”,并做出响应——打印到控制台。

Observer订阅Observable。

1
observable.subscribe(observer);

通过Observable.create创建可观察对象时,我们可以发现具体执行发射动作的是由接口ObservableEmitter的实例化对象完成的,而ObservableEmitter继承自 接口Emitter,查看源码接口Emitter的具体代码如下:

1
2
3
4
5
6
7
8
public interface Emitter<T> {
//用来发送数据,可多次调用,每调用一次发送一条数据
void onNext(@NonNull T value);
//用来发送异常通知,只发送一次,若多次调用只发送第一条
void onError(@NonNull Throwable error);
//用来发送完成通知,只发送一次,若多次调用只发送第一条
void onComplete();
}

接口Observer中的三个方法(onNext,onError,onComplete)正好与Emitter中的三个方法相对应,对于Emitter中对应方法发送的数据或通知进行响应。

使用 just 操作符简化如下:

1
2
3
4
5
6
7
8
9
public void demo3() {
Observable.just("Hello World")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
}

Consumer通过其函数accept只接收可观察对象发射的数据,不接收异常信息或完成信息。

事件订阅后都会返回一个 Disposable 对象:

1
2
3
4
public interface Disposable {
void dispose();
boolean isDisposed();
}

其中isDisposed()方法用来判断当前订阅是否失效,dispose()方法用来取消当前订阅。

Publisher/Subscriber

在ObservableSource/Observer基础上进行了改进,可通过背压策略处理背压问题,但效率没有第一组高。

Flowable是Publisher与Subscriber这一组观察者模式中Publisher的典型实现。在使用Flowable的时候,可观察对象不再是Observable,而是Flowable;观察者不再是Observer,而是Subscriber。Flowable与Subscriber之间依然通过subscribe()进行关联。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
System.out.println("发射----> 1");
e.onNext(1);
System.out.println("发射----> 2");
e.onNext(2);
System.out.println("发射----> 3");
e.onNext(3);
System.out.println("发射----> 完成");
e.onComplete();
}
}, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数
.subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}

@Override
public void onNext(Integer integer) {
System.out.println("接收----> " + integer);
}

@Override
public void onError(Throwable t) {
}

@Override
public void onComplete() {
System.out.println("接收----> 完成");
}
});

注意第13行的 BackpressureStrategy.BUFFER 策略设置。

BUFFER 是处理背压的默认策略,在其内部维护了一个缓存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。

DROP 在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据。

LATEST 与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。

ERROR 此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。

MISSING 此策略不会对对缓存池的状态进行判断,所以在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符 onBackpressureBuffer(), onBackpressureDrop(), onBackpressureLatest() 指定背压策略。

方法对应策略
onBackpressureBuffer()BackpressureStrategy.BUFFER
onBackpressureDrop()BackpressureStrategy.DROP
onBackpressureLatest()BackpressureStrategy.LATEST

注意:以下三组是新的响应式关系的实现,在Rxjava1中没有,可看做是 ObservableSource/Observer 的简化版。

SingleSource/SingleObserver

不能发送数据序列或完成通知,只能通过onSuccess方法发送单条数据,或者通过onError发送异常通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
emitter.onSuccess(0);
}
}).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});

CompletableSource/CompletableObserve

不能发送任何形式的数据(单条数据或数据序列),只能通过onComplete发送完成通知或者通过onError发送异常通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onComplete() {
System.out.println("执行完成");
}

@Override
public void onError(Throwable e) {

}
});

MaybeSource/MaybeObserver

可通过onSuccess发送单条数据,通过onComplete发送完成通知或者通过onError发送一条异常通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Maybe.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> emitter) throws Exception {
emitter.onSuccess(1);
emitter.onComplete();
}
}).subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("执行完成");
}
});

线程调度

1
Observable<T> subscribeOn(Scheduler scheduler)

subscribeOn通过接收一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。若多次设定,则只有一次起作用。

1
Observable<T> observeOn(Scheduler scheduler)

observeOn同样接收一个Scheduler参数,来指定下游操作运行在特定的线程调度器Scheduler上。若多次设定,每次均起作用。

Scheduler种类:

Schedulers.io()

用于IO密集型的操作,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。

Schedulers.newThread()

在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率没有Schedulers.io( )高。

Schedulers.computation()

用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。

Schedulers.trampoline()

在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。

Schedulers.single()

拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。

Scheduler.from(@NonNull Executor executor)

指定一个线程调度器,由此调度器来控制任务的执行策略。

AndroidSchedulers.mainThread()

在Android UI线程中执行任务,为Android开发定制。

subscribeOn来指定对数据的处理运行在特定的线程调度器Scheduler上,直到遇到observeOn改变线程调度器若多次设定,则只有一次起作用。observeOn指定下游操作运行在特定的线程调度器Scheduler上。若多次设定,每次均起作用。

评论

Ajax Android AndroidStudio Animation Anroid Studio AppBarLayout Babel Banner Buffer Bulma ByteBuffer C++ C11 C89 C99 CDN CMYK COM1 COM2 CSS Camera Raw, 直方图 Chrome ContentProvider CoordinatorLayout C语言 DML DOM Dagger Dagger2 Darktable Demo Document DownloadManage ES2015 ESLint Element Error Exception Extensions File FileProvider Flow Fresco GCC Git GitHub GitLab Gradle Groovy HTML5 Handler HandlerThread Hexo Hybrid I/O IDEA IO ImageMagick IntelliJ Intellij Interpolator JCenter JNI JS Java JavaScript JsBridge Kotlin Lab Lambda Lifecycle Lint Linux Looper MQTT MVC MVP Maven MessageQueue Modbus Momentum MySQL NDK NIO NexT Next Nodejs ObjectAnimator Oracle VM Permission PhotoShop Physics Python RGB RS-232 RTU Remote-SSH Retrofit Runnable RxAndroid RxJava SE0 SSH Spring SpringBoot Statubar Task Theme Thread Tkinter UI UIKit UML VM virtualBox VS Code VUE ValueAnimator ViewPropertyAnimator Vue Web Web前端 Workbench api apk bookmark by关键字 compileOnly css c语言 databases demo hexo hotfix html iOS icarus implementation init jQuery javascript launchModel logo merge mvp offset photos pug query rxjava2 scss servlet shell svg tkinter tomcat transition unicode utf-8 vector virtual box vscode 七牛 下载 中介者模式 串口 临潼石榴 主题 书签 事件 享元模式 仓库 代理模式 位运算 依赖注入 修改,tables 光和色 内存 内核 内部分享 函数 函数式编程 分支 分析 创建 删除 动画 单例模式 压缩图片 发布 可空性 合并 同向性 后期 启动模式 命令 命令模式 响应式 响应式编程 图层 图床 图片压缩 图片处理 图片轮播 地球 域名 基础 增加 备忘录模式 外观模式 多线程 大爆炸 天气APP 太白山 头文件 奇点 字符串 字符集 存储引擎 宇宙 宏定义 实践 属性 属性动画 岐山擀面皮 岐山肉臊子 岐山香醋 工具 工厂模式 年终总结 开发技巧 异常 弱引用 恒星 打包 技巧 指针 插件 摄影 操作系统 攻略 故事 数据库 数据类型 数组 文件 新功能 旅行 旋转木马 时序图 时空 时间简史 曲线 杂谈 权限 枚举 架构 查询 标准库 标签选择器 样式 核心 框架 案例 桥接模式 检测工具 模块化 模板引擎 模板方法模式 油泼辣子 泛型 洛川苹果 浅色状态栏 源码 源码分析 瀑布流 热修复 版本 版本控制 状态栏 状态模式 生活 留言板 相册 相对论 眉县猕猴桃 知识点 码云 磁盘 科学 笔记 策略模式 类图 系统,发行版, GNU 索引 组件 组合模式 结构 结构体 编码 网易云信 网格布局 网站广播 网站通知 网络 美化 联合 膨胀的宇宙 自定义 自定义View 自定义插件 蒙版 虚拟 虚拟机 补码 补齐 表单 表达式 装饰模式 西安 观察者模式 规范 视图 视频 解耦器模式 设计 设计原则 设计模式 访问者模式 语法 责任链模式 贪吃蛇 转换 软件工程 软引用 运算符 迭代子模式 适配器模式 选择器 通信 通道 配置 链表 锐化 错误 键盘 闭包 降噪 陕西地方特产 面向对象 项目优化 项目构建 黑洞
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×