摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-mode/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 执行命令方法

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:

方法
#execute()同步调用,返回直接结果
#queue()异步调用,返回 java.util.concurrent.Future
#observe()异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable()未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果


推荐 Spring Cloud 书籍

2. 实现

// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略无关属性与方法
public Observable<R> toObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ....
}
}
}
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
}
// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略无关属性与方法
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// ... 包装 delegate
}
// ...
return f;
}
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
protected abstract R run() throws Exception;
}
  • #toObservable() 方法 :做订阅,返回干净的 Observable 。这就是为什么上文说“未调用”
  • #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅
  • #queue() 方法 :调用 #toObservable() 方法的基础上,调用:
    • Observable#toBlocking() 方法 :将 Observable 转换成阻塞rx.observables.BlockingObservable
    • BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法执行结果的 Future 。
      • #run() 方法 :子类实现该方法,执行正常的业务逻辑
  • #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。
  • 整理四种调用方式如下:

    FROM 《【翻译】Hystrix文档-实现原理》

3. BlockingObservable

本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择:

《RxJava 源码解析 —— BlockingObservable》

666. 彩蛋

第一篇 Hystrix 正式的源码解析。

梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. 实现
  3. 3. 3. BlockingObservable
  4. 4. 666. 彩蛋