⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/RxJava/blocking-observable/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RxJava 1.2.X 版本

本系列写作目的,为了辅助 Hystrix 的理解,因此会较为零散与琐碎,望见谅见谅。


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

《ReactiveX/RxJava文档中文版 —— 阻塞操作》
BlockingObservable 的方法不是将一个 Observable 变换为另一个,也不是过滤Observables,它们会打断 Observable 的调用链,会阻塞等待直到 Observable 发射了想要的数据,然后返回这个数据(而不是一个 Observable )。

1. toBlocking

调用 Observable#toBlocking()BlockingObservable#from(Observable) 方法,将 Observable 转换成 BlockingObservable 。代码如下:

// BlockingObservable.java
public final class BlockingObservable<T> {

// ... 省略无关代码

private final Observable<? extends T> o;

private BlockingObservable(Observable<? extends T> o) {
this.o = o;
}

public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
return new BlockingObservable<T>(o);
}

}

// Observable.java
public class Observable<T> {

// ... 省略无关代码

public final BlockingObservable<T> toBlocking() {
return BlockingObservable.from(this);
}

}

  • 从代码上我们可以看到,BlockingObservable 并未将 Observable 转换成新的,而是简单的包了一层。

2. toFuture

《ReactiveX/RxJava文档中文版 —— TO》
#toFuture() 操作符也是只能用于 BlockingObservable 。这个操作符将Observable 转换为一个返回单个数据项的 Future 。

  • 如果原始 Observable 发射多个数据项,Future会收到一个IllegalArgumentException;
  • 如果原始 Observable 没有发射任何数据,Future会收到一个NoSuchElementException。

如果你想将发射多个数据项的 Observable 转换为 Future ,可以这样用:myObservable.toList().toBlocking().toFuture()

点击链接 查看 #toFuture() 的代码实现:

  • 通过向传入 Observable 订阅 Subscriber ,打断 Observable 的调用链,会阻塞等待直到 Observable 发射了想要的数据。
    • #onNext() 方法,设置执行的返回值( value )。
    • #onCompleted() 方法,CountDownLatch (finished) 减一。
    • #onError() 方法,设置执行时发生的异常( error ),并 CountDownLatch (finished) 减一。
  • 返回的 Future ,通过 CountDownLatch ( error ) 判断是否执行完成;通过 valueerror 获得执行的结果。

666. 彩蛋

知识星球

文章目录
  1. 1. 1. toBlocking
  2. 2. 2. toFuture
  3. 3. 666. 彩蛋