摘要: 原创出处 http://www.iocoder.cn/RxJava/scheduler/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RxJava 1.2.X 版本

本系列写作目的,为了辅助 Hystrix 的理解,因此会较为零散与琐碎,望见谅见谅。


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. Scheduler

rx.Scheduler抽象类,一个可以调度工作单元( rx.Scheduler.Worker )的对象。

FROM 《ReactiveX文档中文翻译 —— Scheduler》
如果你想给 Observable 操作符链添加多线程功能,你可以指定操作符( 或者特定的Observable )在特定的调度器( Scheduler )上执行。

某些 ReactiveX 的 Observable 操作符有一些变体,它们可以接受一个 Scheduler 参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。

使用 ObserveOn 和 SubscribeOn 操作符,你可以让 Observable 在一个特定的调度器上执行。

  • ObserveOn 指示一个 Observable 在一个特定的调度器上调用观察者的 onNext , onError 和 onCompleted 方法。
  • SubscribeOn 更进一步,它指示 Observable 将全部的处理过程( 包括发射数据和通知 )放在特定的调度器上执行。

为什么是抽象类,而不是接口呢?官方说明如下 :

/*
* Why is this an abstract class instead of an interface?
*
* : Java doesn't support extension methods and there are many overload methods needing default
* implementations.
*
* : Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for
* a long time.
*
* : If only an interface were used Scheduler implementations would then need to extend from an
* AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
* functionality.
*
* : Without virtual extension methods even additive changes are breaking and thus severely impede library
* maintenance.
*/

  • 【第一、二点】Java 在 8.0 版本之前,接口不支持默认实现方法,而 Scheduler 需要多个方法提供默认实现。RxJava 考虑到兼容性,将长期使用低版本的 Java 。
  • 【第三、四点】如果将 Scheduler 定义为接口,那么需要添加一个 AbstractScheduler 抽象类,实现接口的默认方法实现。

Scheduler 提供方法如下 :

2. Worker

rx.Scheduler.Worker ,工作单元对象抽象类,执行 Scheduler 调度的操作( rx.functions.Action0 )。

Worker 提供方法如下 :

Worker 实现 rx.Subscription 接口,但是并未实现对应的方法,需要子类实现,用于 :

  • #unsubscribe()原意取消订阅,实意取消操作。
  • #isUnsubscribed()原意订阅是否取消,实意操作是否取消。

3. 默认调度器实现

rx.internal.schedulers 包下,提供了多种默认调度器的实现。

rx.schedulers.Schedulers ,默认调度器单例工厂,创建上图调度器工厂并进行管理。

参考 《ReactiveX文档中文翻译 —— Scheduler》「调度器的种类」

单例说明
Schedulers#io()CachedThreadScheduler用于 IO 密集型任务,如异步阻塞 IO 操作,这个调度器的线程池会根据需要增长
Schedulers#computation()EventLoopsScheduler用于普通的计算任务,默认线程数等于处理器的数量
Schedulers#from(Executor)ExecutorScheduler使用指定的 Executor 作为调度器
Schedulers#immediate()ImmediateScheduler在当前线程立即开始执行任务
Schedulers#newThread()NewThreadScheduler为每个任务创建一个新线程
Schedulers#trampoline()TrampolineScheduler当其它排队的任务完成后,在当前线程排队开始执行

在 Hystrix 里,继承 Scheduler 抽象类,实现了自定义的 Scheduler 。

因此,跳过默认调度器的源码解析。

4. 操作符与调度器

点击 《ReactiveX文档中文翻译 —— Scheduler》「默认调度器」 查看。

5. 使用示例

点击 《ReactiveX文档中文翻译 —— Scheduler》「使用调度器」 查看。

可能你会觉得示例有丢丢“奇怪”,在 《RxJava 源码解析 —— Observable#subscribeOn(Scheduler)》 你将获得答案。

666. 彩蛋

知识星球

本文偏介绍性,大量内容引用 《ReactiveX文档中文翻译 —— Scheduler》

后续根据需要,可能解析默认调度器的源码实现。

文章目录
  1. 1. 1. Scheduler
  2. 2. 2. Worker
  3. 3. 3. 默认调度器实现
  4. 4. 4. 操作符与调度器
  5. 5. 5. 使用示例
  6. 6. 666. 彩蛋