⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-fourth-fallback/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(四)之失败回退逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

另外,#handleXXXX() 方法,整体代码比较类似,最终都是调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。


推荐 Spring Cloud 书籍

2. handleFallback

《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(...)」 中,#executeCommandAndObserve(...)第 82 行 onErrorResumeNext(handleFallback) 代码,通过调用 Observable#onErrorResumeNext(...) 方法,实现【执行命令 Observable】执行异常时,返回【回退逻辑 Observable】,执行失败回退逻辑。

FROM 《ReactiveX文档中文翻译》「onErrorResumeNext」
onErrorResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable ,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的 Observable 。

handleFallback 变量,代码如下 :

 1: final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
2: @Override
3: public Observable<R> call(Throwable t) {
4: // 标记尝试成功
5: circuitBreaker.markNonSuccess();
6: // 标记 executionResult 执行异常
7: Exception e = getExceptionFromThrowable(t);
8: executionResult = executionResult.setExecutionException(e);
9: // 返回 【回退逻辑 Observable】
10: if (e instanceof RejectedExecutionException) { // 线程池提交任务拒绝异常
11: return handleThreadPoolRejectionViaFallback(e);
12: } else if (t instanceof HystrixTimeoutException) { // 执行命令超时异常
13: return handleTimeoutViaFallback();
14: } else if (t instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
15: return handleBadRequestByEmittingError(e);
16: } else {
17: /*
18: * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
19: */
20: if (e instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
21: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
22: return Observable.error(e);
23: }
24:
25: return handleFailureViaFallback(e);
26: }
27: }
28: };

3. #handleShortCircuitViaFallback()

#handleShortCircuitViaFallback() 方法,short-circuit ,处理链路处于熔断的回退逻辑,在 此处 被调用,代码如下 :

 1: private Observable<R> handleShortCircuitViaFallback() {
2: // TODO 【2011】【Hystrix 事件机制】
3: // record that we are returning a short-circuited fallback
4: eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
5: // 标记 executionResult 执行异常
6: // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
7: Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
8: executionResult = executionResult.setExecutionException(shortCircuitException);
9: try {
10: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
11: return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
12: "short-circuited", shortCircuitException);
13: } catch (Exception e) {
14: return Observable.error(e);
15: }
16: }

  • 第 4 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 7 至 8 行 :标记 executionResult 执行异常
  • 第 11 至 12 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
  • 第 14 行 :返回【异常 Observable】。

4. #handleSemaphoreRejectionViaFallback()

#handleSemaphoreRejectionViaFallback() 方法,semaphore-rejection ,处理信号量获得失败的回退逻辑,在 此处 被调用,代码如下 :

 1: private Observable<R> handleSemaphoreRejectionViaFallback() {
2: // 标记 executionResult 执行异常
3: Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
4: executionResult = executionResult.setExecutionException(semaphoreRejectionException);
5: // TODO 【2011】【Hystrix 事件机制】
6: eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
7: logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
8: // retrieve a fallback or throw an exception if no fallback available
9: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
10: return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
11: "could not acquire a semaphore for execution", semaphoreRejectionException);
12: }

  • 第 3 至 4 行 :标记 executionResult 执行异常
  • 第 6 至 7 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 10 至 11 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。

5. #handleThreadPoolRejectionViaFallback()

#handleThreadPoolRejectionViaFallback() 方法,thread-pool-rejection ,处理线程池提交任务拒绝的回退逻辑,在 此处 被调用,代码如下:

 1: private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
2: // TODO 【2011】【Hystrix 事件机制】
3: eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
4: // TODO 【2002】【metrics】
5: threadPool.markThreadRejection();
6: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
7: // use a fallback instead (or throw exception if not implemented)
8: return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
9: "could not be queued for execution", underlying);
10: }

  • 第 3 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 5 行 :TODO 【2002】【metrics】
  • 第 8 至 9 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。

6. #handleTimeoutViaFallback()

#handleTimeoutViaFallback() 方法,execution-timeout ,处理命令执行超时的回退逻辑,在 此处 被调用,代码如下:

1: private Observable<R> handleTimeoutViaFallback() {
2: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
3: return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
4: "timed-out", new TimeoutException());
5: }

7. #handleFailureViaFallback()

#handleFailureViaFallback() 方法,execution-failure ,处理命令执行异常的回退逻辑,在 此处 被调用,代码如下:

 1: private Observable<R> handleFailureViaFallback(Exception underlying) {
2: // TODO 【2011】【Hystrix 事件机制】
3: /**
4: * All other error handling
5: */
6: logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
7:
8: // report failure
9: eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
10:
11: // 标记 executionResult 异常 TODO 【2007】【executionResult】用途 为啥不是执行异常
12: // record the exception
13: executionResult = executionResult.setException(underlying);
14: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
15: return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
16: }

  • 第 2 至 9 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 13 行 :标记 executionResult 异常
  • 第 15 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。

8. #getFallbackOrThrowException(...)

#getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,代码如下 :

  1: private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
2: // 记录 HystrixRequestContext
3: final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
4: // 标记 executionResult 添加( 记录 )事件
5: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
6: // record the executionResult
7: // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
8: executionResult = executionResult.addEvent((int) latency, eventType);
9:
10: if (isUnrecoverable(originalException)) { // 无法恢复的异常
11: logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
12:
13: // TODO 【2003】【HOOK】
14: /* executionHook for all errors */
15: Exception e = wrapWithOnErrorHook(failureType, originalException);
16: // 返回 【异常 Observable】
17: return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
18: } else {
19: if (isRecoverableError(originalException)) { // 可恢复的异常
20: logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
21: }
22:
23: if (properties.fallbackEnabled().get()) {
24: /* fallback behavior is permitted so attempt */
25:
26: // 设置 HystrixRequestContext 的 Action
27: final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
28: @Override
29: public void call(Notification<? super R> rNotification) {
30: setRequestContextIfNeeded(requestContext);
31: }
32: };
33:
34: // TODO 【2007】【executionResult】用途
35: final Action1<R> markFallbackEmit = new Action1<R>() {
36: @Override
37: public void call(R r) {
38: if (shouldOutputOnNextEvents()) {
39: executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
40: eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
41: }
42: }
43: };
44:
45: // TODO 【2007】【executionResult】用途
46: final Action0 markFallbackCompleted = new Action0() {
47: @Override
48: public void call() {
49: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
50: eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
51: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
52: }
53: };
54:
55: // 处理异常 的 Func
56: final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
57: @Override
58: public Observable<R> call(Throwable t) {
59: // TODO 【2003】【HOOK】
60: /* executionHook for all errors */
61: Exception e = wrapWithOnErrorHook(failureType, originalException);
62: // 获得 Exception
63: Exception fe = getExceptionFromThrowable(t);
64:
65: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
66: Exception toEmit;
67:
68: if (fe instanceof UnsupportedOperationException) {
69: // TODO 【2011】【Hystrix 事件机制】
70: logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
71: eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
72: // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_MISSING
73: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
74:
75: // 创建 HystrixRuntimeException
76: toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
77: } else {
78: // TODO 【2011】【Hystrix 事件机制】
79: logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
80: eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
81: // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_FAILURE
82: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
83:
84: // 创建 HystrixRuntimeException
85: toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
86: }
87:
88: // NOTE: we're suppressing fallback exception here
89: if (shouldNotBeWrapped(originalException)) {
90: return Observable.error(e);
91: }
92:
93: return Observable.error(toEmit);
94: }
95: };
96:
97: // 获得 TryableSemaphore
98: final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
99:
100: // 信号量释放Action
101: final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
102: final Action0 singleSemaphoreRelease = new Action0() {
103: @Override
104: public void call() {
105: if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
106: fallbackSemaphore.release();
107: }
108: }
109: };
110:
111: Observable<R> fallbackExecutionChain;
112:
113: // acquire a permit
114: if (fallbackSemaphore.tryAcquire()) {
115: try {
116: if (isFallbackUserDefined()) {
117: executionHook.onFallbackStart(this);
118: fallbackExecutionChain = getFallbackObservable();
119: } else {
120: //same logic as above without the hook invocation
121: fallbackExecutionChain = getFallbackObservable();
122: }
123: } catch (Throwable ex) {
124: //If hook or user-fallback throws, then use that as the result of the fallback lookup
125: fallbackExecutionChain = Observable.error(ex);
126: }
127:
128: // 获得 【回退逻辑 Observable】
129: return fallbackExecutionChain
130: .doOnEach(setRequestContext)
131: .lift(new FallbackHookApplication(_cmd)) // TODO 【2003】【HOOK】
132: .lift(new DeprecatedOnFallbackHookApplication(_cmd))
133: .doOnNext(markFallbackEmit)
134: .doOnCompleted(markFallbackCompleted)
135: .onErrorResumeNext(handleFallbackError) //
136: .doOnTerminate(singleSemaphoreRelease)
137: .doOnUnsubscribe(singleSemaphoreRelease);
138: } else {
139: return handleFallbackRejectionByEmittingError();
140: }
141: } else {
142: return handleFallbackDisabledByEmittingError(originalException, failureType, message);
143: }
144: }
145: }

  • 耐心,这个方法看起来灰常长,也仅限于长,理解成难度很小。

  • 第 3 行 :记录 HystrixRequestContext 。

  • 第 5 至 8 行 :标记 executionResult 添加( 记录 )事件。

  • 第 10 至 17 行 :调用 #isUnrecoverable(Exception) 方法,若异常不可恢复,直接返回【异常 Observable】。点击 链接 查看该方法。

  • 第 19 至 21 行 :调用 #isRecoverableError(Exception) 方法,若异常可恢复,打印 WARN 日志。点击 链接 查看该方法。主要针对 java.lang.Error 情况,打印 #isUnrecoverable(Exception) 排除掉的 Error。

  • 反向】第 141 至 143 行 :当配置 HystrixCommandProperties.fallbackEnabled = false ( 默认值 :true ) ,即失败回退功能关闭,调用 #handleFallbackDisabledByEmittingError() ,返回【异常 Observable】。点击 链接 查看该方法。

  • 反向】第 138 至 140 行 :失败回退信号量( TryableSemaphore )【注意,不是正常执行信号量】使用失败,调用 #handleFallbackRejectionByEmittingError() ,返回【异常 Observable】。点击 链接 查看该方法。

  • 第 23 行 :当配置 HystrixCommandProperties.fallbackEnabled = true ( 默认值 :true ) ,即失败回退功能开启

  • 第 27 至 32 行 :设置 HystrixRequestContext 的 Action ,使用第 3 行记录的 HystrixRequestContext 。

  • 第 35 至 43 行 :TODO 【2007】【executionResult】用途

  • 第 46 至 53 行 :TODO 【2007】【executionResult】用途

  • 第 56 至 95 行 :处理回退逻辑执行发生异常的 Func1 ,返回【异常 Observable】。

    • 第 61 行 :TODO 【2003】【HOOK】
    • 第 63 行 :调用 #getExceptionFromThrowable(Throwable) 方法,获得 Exception 。若 t类型为 Throwable 时,包装成 Exception 。点击 链接 查看该方法代码。
    • 第 68 至 76 行 :当 fe类型为 UnsupportedOperationException 时,使用 e + fe 创建 HystrixRuntimeException 。该异常发生于 HystrixCommand#getFallback() 抽象方法未被覆写。
    • 第 77 至 86 行 :当 fe类型为其他异常时,使用 e + fe 创建 HystrixRuntimeException 。该异常发生于 HystrixCommand#getFallback() 执行发生异常。
    • 第 89 至 91 行 :调用 #shouldNotBeWrapped() 方法,判断 originalException 是 ExceptionNotWrappedByHystrix 的实现时,即要求返回的【异常 Observable】不使用 HystrixRuntimeException 包装。点击 链接 查看该方法代码。
    • 第 93 行 :返回【异常 Observable】,使用 toEmit ( HystrixRuntimeException ) 为异常。
  • 第 98 行 :调用 #getFallbackSemaphore() 方法,获得失败回退信号量( TryableSemaphore )对象,点击 链接 查看该方法代码。TryableSemaphore 在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 有详细解析。

  • 第 100 至 109 行 :信号量释放的 Action。

  • 第 114 至 137 行 :失败回退信号量( TryableSemaphore )使用成功,返回【回退逻辑 Observable】。

    • 【重要】第 116 至 122 行 :调用 #getFallbackObservable() 方法,创建【回退逻辑 Observable】。将子类对 HystrixCommand#getFallback() 抽象方法的执行结果,使用 Observable#just(...) 包装返回。点击 链接 查看该方法的代码。
      • 第 116 行 :调用 #isFallbackUserDefined() 方法,返回命令子类是否实现 HystrixCommand#getFallback() 抽象方法。只有已实现( true ) 的情况下,调用 HOOK TODO 【2003】【HOOK】。
    • 第 129 至 137 行 :获得 【回退逻辑 Observable】。
      • 第 131 行 :// TODO 【2003】【HOOK】
      • 第 135 行 :调用 Observable#onErrorResumeNext(...) 方法,实现【失败回退 Observable】执行异常时,返回【异常 Observable】。

有两个注意点:

  • 当命令执行超时时,失败回退逻辑使用的是 HystrixTimer 的线程池
  • 失败回退逻辑,无超时时间,使用要小心。

666. 彩蛋

知识星球

比想象中“臭长”的逻辑。

总的来说,逻辑和 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 是很类似的。

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. handleFallback
  3. 3. 3. #handleShortCircuitViaFallback()
  4. 4. 4. #handleSemaphoreRejectionViaFallback()
  5. 5. 5. #handleThreadPoolRejectionViaFallback()
  6. 6. 6. #handleTimeoutViaFallback()
  7. 7. 7. #handleFailureViaFallback()
  8. 8. 8. #getFallbackOrThrowException(...)
  9. 9. 666. 彩蛋