摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-fourth-fallback/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(四)之失败回退逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

另外,#handleXXXX() 方法,整体代码比较类似,最终都是调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(…)」 详细解析。


推荐 Spring Cloud 书籍

2. handleFallback

《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中,#executeCommandAndObserve(...)第 82 行 onErrorResumeNext(handleFallback) 代码,通过调用 Observable#onErrorResumeNext(...) 方法,实现【执行命令 Observable】执行异常时,返回【回退逻辑 Observable】,执行失败回退逻辑。

FROM 《ReactiveX文档中文翻译》「onErrorResumeNext」
onErrorResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable ,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的 Observable 。

handleFallback 变量,代码如下 :

1: final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
2: @Override
3: public Observable<R> call(Throwable t) {
4: // 标记尝试成功
5: circuitBreaker.markNonSuccess();
6: // 标记 executionResult 执行异常
7: Exception e = getExceptionFromThrowable(t);
8: executionResult = executionResult.setExecutionException(e);
9: // 返回 【回退逻辑 Observable】
10: if (e instanceof RejectedExecutionException) { // 线程池提交任务拒绝异常
11: return handleThreadPoolRejectionViaFallback(e);
12: } else if (t instanceof HystrixTimeoutException) { // 执行命令超时异常
13: return handleTimeoutViaFallback();
14: } else if (t instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
15: return handleBadRequestByEmittingError(e);
16: } else {
17: /*
18: * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
19: */
20: if (e instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
21: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
22: return Observable.error(e);
23: }
24:
25: return handleFailureViaFallback(e);
26: }
27: }
28: };

3. #handleShortCircuitViaFallback()

#handleShortCircuitViaFallback() 方法,short-circuit ,处理链路处于熔断的回退逻辑,在 此处 被调用,代码如下 :

1: private Observable<R> handleShortCircuitViaFallback() {
2: // TODO 【2011】【Hystrix 事件机制】
3: // record that we are returning a short-circuited fallback
4: eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
5: // 标记 executionResult 执行异常
6: // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
7: Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
8: executionResult = executionResult.setExecutionException(shortCircuitException);
9: try {
10: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
11: return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
12: "short-circuited", shortCircuitException);
13: } catch (Exception e) {
14: return Observable.error(e);
15: }
16: }
  • 第 4 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 7 至 8 行 :标记 executionResult 执行异常
  • 第 11 至 12 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(…)」 详细解析。
  • 第 14 行 :返回【异常 Observable】。

4. #handleSemaphoreRejectionViaFallback()

#handleSemaphoreRejectionViaFallback() 方法,semaphore-rejection ,处理信号量获得失败的回退逻辑,在 此处 被调用,代码如下 :

1: private Observable<R> handleSemaphoreRejectionViaFallback() {
2: // 标记 executionResult 执行异常
3: Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
4: executionResult = executionResult.setExecutionException(semaphoreRejectionException);
5: // TODO 【2011】【Hystrix 事件机制】
6: eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
7: logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
8: // retrieve a fallback or throw an exception if no fallback available
9: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
10: return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
11: "could not acquire a semaphore for execution", semaphoreRejectionException);
12: }
  • 第 3 至 4 行 :标记 executionResult 执行异常
  • 第 6 至 7 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 10 至 11 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(…)」 详细解析。

5. #handleThreadPoolRejectionViaFallback()

#handleThreadPoolRejectionViaFallback() 方法,thread-pool-rejection ,处理线程池提交任务拒绝的回退逻辑,在 此处 被调用,代码如下:

1: private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
2: // TODO 【2011】【Hystrix 事件机制】
3: eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
4: // TODO 【2002】【metrics】
5: threadPool.markThreadRejection();
6: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
7: // use a fallback instead (or throw exception if not implemented)
8: return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
9: "could not be queued for execution", underlying);
10: }
  • 第 3 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 5 行 :TODO 【2002】【metrics】
  • 第 8 至 9 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(…)」 详细解析。

6. #handleTimeoutViaFallback()

#handleTimeoutViaFallback() 方法,execution-timeout ,处理命令执行超时的回退逻辑,在 此处 被调用,代码如下:

1: private Observable<R> handleTimeoutViaFallback() {
2: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
3: return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
4: "timed-out", new TimeoutException());
5: }

7. #handleFailureViaFallback()

#handleFailureViaFallback() 方法,execution-failure ,处理命令执行异常的回退逻辑,在 此处 被调用,代码如下:

1: private Observable<R> handleFailureViaFallback(Exception underlying) {
2: // TODO 【2011】【Hystrix 事件机制】
3: /**
4: * All other error handling
5: */
6: logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
7:
8: // report failure
9: eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
10:
11: // 标记 executionResult 异常 TODO 【2007】【executionResult】用途 为啥不是执行异常
12: // record the exception
13: executionResult = executionResult.setException(underlying);
14: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
15: return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
16: }
  • 第 2 至 9 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 13 行 :标记 executionResult 异常
  • 第 15 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(…)」 详细解析。

8. #getFallbackOrThrowException(…)

#getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,代码如下 :

1: private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
2: // 记录 HystrixRequestContext
3: final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
4: // 标记 executionResult 添加( 记录 )事件
5: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
6: // record the executionResult
7: // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
8: executionResult = executionResult.addEvent((int) latency, eventType);
9:
10: if (isUnrecoverable(originalException)) { // 无法恢复的异常
11: logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
12:
13: // TODO 【2003】【HOOK】
14: /* executionHook for all errors */
15: Exception e = wrapWithOnErrorHook(failureType, originalException);
16: // 返回 【异常 Observable】
17: return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
18: } else {
19: if (isRecoverableError(originalException)) { // 可恢复的异常
20: logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
21: }
22:
23: if (properties.fallbackEnabled().get()) {
24: /* fallback behavior is permitted so attempt */
25:
26: // 设置 HystrixRequestContext 的 Action
27: final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
28: @Override
29: public void call(Notification<? super R> rNotification) {
30: setRequestContextIfNeeded(requestContext);
31: }
32: };
33:
34: // TODO 【2007】【executionResult】用途
35: final Action1<R> markFallbackEmit = new Action1<R>() {
36: @Override
37: public void call(R r) {
38: if (shouldOutputOnNextEvents()) {
39: executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
40: eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
41: }
42: }
43: };
44:
45: // TODO 【2007】【executionResult】用途
46: final Action0 markFallbackCompleted = new Action0() {
47: @Override
48: public void call() {
49: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
50: eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
51: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
52: }
53: };
54:
55: // 处理异常 的 Func
56: final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
57: @Override
58: public Observable<R> call(Throwable t) {
59: // TODO 【2003】【HOOK】
60: /* executionHook for all errors */
61: Exception e = wrapWithOnErrorHook(failureType, originalException);
62: // 获得 Exception
63: Exception fe = getExceptionFromThrowable(t);
64:
65: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
66: Exception toEmit;
67:
68: if (fe instanceof UnsupportedOperationException) {
69: // TODO 【2011】【Hystrix 事件机制】
70: logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
71: eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
72: // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_MISSING
73: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
74:
75: // 创建 HystrixRuntimeException
76: toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
77: } else {
78: // TODO 【2011】【Hystrix 事件机制】
79: logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
80: eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
81: // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_FAILURE
82: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
83:
84: // 创建 HystrixRuntimeException
85: toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
86: }
87:
88: // NOTE: we're suppressing fallback exception here
89: if (shouldNotBeWrapped(originalException)) {
90: return Observable.error(e);
91: }
92:
93: return Observable.error(toEmit);
94: }
95: };
96:
97: // 获得 TryableSemaphore
98: final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
99:
100: // 信号量释放Action
101: final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
102: final Action0 singleSemaphoreRelease = new Action0() {
103: @Override
104: public void call() {
105: if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
106: fallbackSemaphore.release();
107: }
108: }
109: };
110:
111: Observable<R> fallbackExecutionChain;
112:
113: // acquire a permit
114: if (fallbackSemaphore.tryAcquire()) {
115: try {
116: if (isFallbackUserDefined()) {
117: executionHook.onFallbackStart(this);
118: fallbackExecutionChain = getFallbackObservable();
119: } else {
120: //same logic as above without the hook invocation
121: fallbackExecutionChain = getFallbackObservable();
122: }
123: } catch (Throwable ex) {
124: //If hook or user-fallback throws, then use that as the result of the fallback lookup
125: fallbackExecutionChain = Observable.error(ex);
126: }
127:
128: // 获得 【回退逻辑 Observable】
129: return fallbackExecutionChain
130: .doOnEach(setRequestContext)
131: .lift(new FallbackHookApplication(_cmd)) // TODO 【2003】【HOOK】
132: .lift(new DeprecatedOnFallbackHookApplication(_cmd))
133: .doOnNext(markFallbackEmit)
134: .doOnCompleted(markFallbackCompleted)
135: .onErrorResumeNext(handleFallbackError) //
136: .doOnTerminate(singleSemaphoreRelease)
137: .doOnUnsubscribe(singleSemaphoreRelease);
138: } else {
139: return handleFallbackRejectionByEmittingError();
140: }
141: } else {
142: return handleFallbackDisabledByEmittingError(originalException, failureType, message);
143: }
144: }
145: }
  • 耐心,这个方法看起来灰常长,也仅限于长,理解成难度很小。
  • 第 3 行 :记录 HystrixRequestContext 。
  • 第 5 至 8 行 :标记 executionResult 添加( 记录 )事件。
  • 第 10 至 17 行 :调用 #isUnrecoverable(Exception) 方法,若异常不可恢复,直接返回【异常 Observable】。点击 链接 查看该方法。
  • 第 19 至 21 行 :调用 #isRecoverableError(Exception) 方法,若异常可恢复,打印 WARN 日志。点击 链接 查看该方法。主要针对 java.lang.Error 情况,打印 #isUnrecoverable(Exception) 排除掉的 Error。
  • 反向】第 141 至 143 行 :当配置 HystrixCommandProperties.fallbackEnabled = false ( 默认值 :true ) ,即失败回退功能关闭,调用 #handleFallbackDisabledByEmittingError() ,返回【异常 Observable】。点击 链接 查看该方法。
  • 反向】第 138 至 140 行 :失败回退信号量( TryableSemaphore )【注意,不是正常执行信号量】使用失败,调用 #handleFallbackRejectionByEmittingError() ,返回【异常 Observable】。点击 链接 查看该方法。
  • 第 23 行 :当配置 HystrixCommandProperties.fallbackEnabled = true ( 默认值 :true ) ,即失败回退功能开启
  • 第 27 至 32 行 :设置 HystrixRequestContext 的 Action ,使用第 3 行记录的 HystrixRequestContext 。
  • 第 35 至 43 行 :TODO 【2007】【executionResult】用途
  • 第 46 至 53 行 :TODO 【2007】【executionResult】用途
  • 第 56 至 95 行 :处理回退逻辑执行发生异常的 Func1 ,返回【异常 Observable】。

    • 第 61 行 :TODO 【2003】【HOOK】
    • 第 63 行 :调用 #getExceptionFromThrowable(Throwable) 方法,获得 Exception 。若 t类型为 Throwable 时,包装成 Exception 。点击 链接 查看该方法代码。
    • 第 68 至 76 行 :当 fe类型为 UnsupportedOperationException 时,使用 e + fe 创建 HystrixRuntimeException 。该异常发生于 HystrixCommand#getFallback() 抽象方法未被覆写。
    • 第 77 至 86 行 :当 fe类型为其他异常时,使用 e + fe 创建 HystrixRuntimeException 。该异常发生于 HystrixCommand#getFallback() 执行发生异常。
    • 第 89 至 91 行 :调用 #shouldNotBeWrapped() 方法,判断 originalException 是 ExceptionNotWrappedByHystrix 的实现时,即要求返回的【异常 Observable】不使用 HystrixRuntimeException 包装。点击 链接 查看该方法代码。
    • 第 93 行 :返回【异常 Observable】,使用 toEmit ( HystrixRuntimeException ) 为异常。
  • 第 98 行 :调用 #getFallbackSemaphore() 方法,获得失败回退信号量( TryableSemaphore )对象,点击 链接 查看该方法代码。TryableSemaphore 在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 有详细解析。

  • 第 100 至 109 行 :信号量释放的 Action。
  • 第 114 至 137 行 :失败回退信号量( TryableSemaphore )使用成功,返回【回退逻辑 Observable】。
    • 【重要】第 116 至 122 行 :调用 #getFallbackObservable() 方法,创建【回退逻辑 Observable】。将子类对 HystrixCommand#getFallback() 抽象方法的执行结果,使用 Observable#just(...) 包装返回。点击 链接 查看该方法的代码。
      • 第 116 行 :调用 #isFallbackUserDefined() 方法,返回命令子类是否实现 HystrixCommand#getFallback() 抽象方法。只有已实现( true ) 的情况下,调用 HOOK TODO 【2003】【HOOK】。
    • 第 129 至 137 行 :获得 【回退逻辑 Observable】。
      • 第 131 行 :// TODO 【2003】【HOOK】
      • 第 135 行 :调用 Observable#onErrorResumeNext(...) 方法,实现【失败回退 Observable】执行异常时,返回【异常 Observable】。

有两个注意点:

  • 当命令执行超时时,失败回退逻辑使用的是 HystrixTimer 的线程池
  • 失败回退逻辑,无超时时间,使用要小心。

666. 彩蛋

比想象中“臭长”的逻辑。

总的来说,逻辑和 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 是很类似的。

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. handleFallback
  3. 3. 3. #handleShortCircuitViaFallback()
  4. 4. 4. #handleSemaphoreRejectionViaFallback()
  5. 5. 5. #handleThreadPoolRejectionViaFallback()
  6. 6. 6. #handleTimeoutViaFallback()
  7. 7. 7. #handleFailureViaFallback()
  8. 8. 8. #getFallbackOrThrowException(…)
  9. 9. 666. 彩蛋