扫码关注公众号:芋道源码

发送: 百事可乐
获取永久解锁本站全部文章的链接

《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-timeout/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(三)之执行超时

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

开启执行超时功能,需要配置 :

  • HystrixCommandProperties.executionTimeoutEnabled :执行命令超时功能开关。
    • 值 :Boolean
    • 默认值 :true
  • HystrixCommandProperties.executionTimeoutInMilliseconds :执行命令超时时长。
    • 值 :Integer
    • 单位 :毫秒
    • 默认值 :1000 毫秒

《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中,#executeCommandAndObserve(...) 方法的第 75 行 lift(new HystrixObservableTimeoutOperator<R>(_cmd)) ,实现了对执行命令超时的监控。


推荐 Spring Cloud 书籍

2. HystrixObservableTimeoutOperator

HystrixObservableTimeoutOperator 类,代码如下 :

  1: private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
2:
3: final AbstractCommand<R> originalCommand;
4:
5: public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
6: this.originalCommand = originalCommand;
7: }
8:
9: @Override
10: public Subscriber<? super R> call(final Subscriber<? super R> child) {
11: // 创建 订阅
12: final CompositeSubscription s = new CompositeSubscription();
13: // 添加 订阅
14: // if the child unsubscribes we unsubscribe our parent as well
15: child.add(s);
16:
17: //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
18: final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
19:
20: TimerListener listener = new TimerListener() {
21:
22: @Override
23: public void tick() {
24: // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
25: // otherwise it means we lost a race and the run() execution completed or did not start
26: if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
27: // report timeout failure
28: originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
29:
30: // shut down the original request
31: s.unsubscribe();
32:
33: final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
34:
35: @Override
36: public void run() {
37: child.onError(new HystrixTimeoutException());
38: }
39: });
40:
41: timeoutRunnable.run();
42: //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
43: }
44: }
45:
46: @Override
47: public int getIntervalTimeInMilliseconds() {
48: return originalCommand.properties.executionTimeoutInMilliseconds().get();
49: }
50: };
51:
52: final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
53:
54: // set externally so execute/queue can see this
55: originalCommand.timeoutTimer.set(tl);
56:
57: /**
58: * If this subscriber receives values it means the parent succeeded/completed
59: */
60: Subscriber<R> parent = new Subscriber<R>() {
61:
62: @Override
63: public void onCompleted() {
64: if (isNotTimedOut()) {
65: // stop timer and pass notification through
66: tl.clear();
67: // 完成
68: child.onCompleted();
69: } else {
70: System.out.println("timeout: " + "onCompleted"); // 笔者调试用
71: }
72: }
73:
74: @Override
75: public void onError(Throwable e) {
76: if (isNotTimedOut()) {
77: // stop timer and pass notification through
78: tl.clear();
79: // 异常
80: child.onError(e);
81: } else {
82: System.out.println("timeout: " + "onError"); // 笔者调试用
83: }
84: }
85:
86: @Override
87: public void onNext(R v) {
88: if (isNotTimedOut()) {
89: // 继续执行
90: child.onNext(v);
91: } else {
92: System.out.println("timeout: " + "onNext"); // 笔者调试用
93: }
94: }
95:
96: /**
97: * 通过 CAS 判断是否超时
98: *
99: * @return 是否超时
100: */
101: private boolean isNotTimedOut() {
102: // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
103: return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
104: originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
105: }
106:
107: };
108:
109: // 添加 订阅
110: // if s is unsubscribed we want to unsubscribe the parent
111: s.add(parent);
112:
113: return parent;
114: }
115:
116: }
  • 第 12 行 :创建订阅 s
  • 第 15 行 :添加订阅 schild 的订阅。
  • 第 18 行 :获得 HystrixRequestContext 。因为下面 listener 的执行不在当前线程,HystrixRequestContext 基于 ThreadLocal 实现。
  • 第 20 至 50 行 :创建执行命令超时监听器 listener ( TimerListener ) 。当超过执行命令的时长( TimerListener#getIntervalTimeInMilliseconds() )时,TimerListener#tick() 方法触发调用。
    • 第 26 行 :通过 AbstractCommand.isCommandTimedOut 变量 CAS 操作,保证和下面第 60 行parent 有且只有一方操作成功。TimedOutStatus 状态变迁如下图 :
    • 第 28 行 :TODO 【2011】【Hystrix 事件机制】
    • 第 31 行 :取消订阅 s注意 :不同执行隔离策略此处的表现不同
    • 第 34 至 41 行 :执行 child#onError(e) 【Subscriber#onError(Throwable)】 方法,处理 HystrixTimeoutException 异常。该异常会被 handleFallback 处理,点击 链接 查看,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。
      • HystrixContextRunnable ,设置第 18 行获得的 HystrixRequestContext 到 Callable#run() 所在线程的 HystrixRequestContext ,并继续执行。点击 链接 查看。另外,HystrixContextRunnable 只有此处使用,独立成类的原因是测试用例使用到。
  • 第 52 行 :使用 TimerListener 到定时器,监听命令的超时执行。
  • 第 55 行 :设置 TimerListener 到 AbstractCommand.timeoutTimer 属性。用于执行超时等等场景下的 TimerListener 的清理( tl#clear() )。如下方法有通过该属性对 TimerListener 的清理 :

  • 第 60 至 107 行 :创建的 Subscriber ( parent )。在传参的 child 的基础上,增加了对是否执行超时的判断( #isNotTimedOut() )和TimerListener的清理。

  • 第 111 行 :添加添加订阅 parents 的订阅。整体订阅关系如下 :
    • 这里看起来 s 有些“多余” ?因为 parentlistener 存在互相引用的情况,通过 s 解决。
  • 第 113 行 :返回 parent注意。如果不能理解,建议阅读下 《RxJava 源码解析 —— Observable#lift(Operator)》

3. HystrixTimer

com.netflix.hystrix.util.HystrixTimer ,Hystrix 定时器。

目前有如下场景使用 :

HystrixTimer 构造方法,代码如下 :

public class HystrixTimer {

/**
* 单例
*/
private static HystrixTimer INSTANCE = new HystrixTimer();

/* package */ AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();

private HystrixTimer() {
// private to prevent public instantiation
}

public static HystrixTimer getInstance() {
return INSTANCE;
}

}
  • INSTANCE 静态属性,单例。
  • executor 属性,定时任务执行器( ScheduledExecutor )。

调用 HystrixTimer#addTimerListener(TimerListener) 方法,提交定时监听器,生成定时任务,代码如下 :

 1: public Reference<TimerListener> addTimerListener(final TimerListener listener) {
2: startThreadIfNeeded();
3: // add the listener
4:
5: Runnable r = new Runnable() {
6:
7: @Override
8: public void run() {
9: try {
10: listener.tick();
11: } catch (Exception e) {
12: logger.error("Failed while ticking TimerListener", e);
13: }
14: }
15: };
16:
17: ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
18: return new TimerReference(listener, f);
19: }
  • 第 2 行 :调用 #startThreadIfNeeded() 方法,保证 executor 延迟初始化已完成。
  • 第 5 至 15 行 :创建定时任务 Runnable 。在 Runnable#run() 方法里,调用 TimerListener#tick() 方法。在 「3.2 TimerListener」 详细解析。
  • 第 17 行 :提交定时监听器,生成定时任务 f ( ScheduledFuture )。
  • 第 18 行 :使用 listener + f 创建 TimerReference 返回。在 「3.3 TimerReference」 详细解析。

3.1 ScheduledExecutor

com.netflix.hystrix.util.HystrixTimer.ScheduledExecutor ,Hystrix 定时任务执行器。代码如下 :

/* package */ static class ScheduledExecutor {
/**
* 定时任务线程池执行器
*/
/* package */ volatile ScheduledThreadPoolExecutor executor;
/**
* 是否初始化
*/
private volatile boolean initialized;

/**
* We want this only done once when created in compareAndSet so use an initialize method
*/
public void initialize() {
// coreSize
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();

// 创建 ThreadFactory
ThreadFactory threadFactory = null;
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}

};
} else {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}

// 创建 ScheduledThreadPoolExecutor
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);

// 已初始化
initialized = true;
}

public ScheduledThreadPoolExecutor getThreadPool() {
return executor;
}

public boolean isInitialized() {
return initialized;
}
}
  • 线程池大小( coreSize ),通过 HystrixTimerThreadPoolProperties.corePoolSize 配置。

3.2 TimerListener

com.netflix.hystrix.util.HystrixTimer.TimerListener ,Hystrix 定时任务监听器**接口**。代码如下 :

public static interface TimerListener {

/**
* The 'tick' is called each time the interval occurs.
* <p>
* This method should NOT block or do any work but instead fire its work asynchronously to perform on another thread otherwise it will prevent the Timer from functioning.
* <p>
* This contract is used to keep this implementation single-threaded and simplistic.
* <p>
* If you need a ThreadLocal set, you can store the state in the TimerListener, then when tick() is called, set the ThreadLocal to your desired value.
*/
void tick();

/**
* How often this TimerListener should 'tick' defined in milliseconds.
*/
int getIntervalTimeInMilliseconds();
}
  • #tick() 方法 :时间到达( 超时 )执行的逻辑。
  • #getIntervalTimeInMilliseconds() 方法 :返回到达( 超时 )时间时长。

3.3 TimerReference

com.netflix.hystrix.util.HystrixTimer.TimerReference ,Hystrix 定时任务引用。代码如下 :

private static class TimerReference extends SoftReference<TimerListener> {

private final ScheduledFuture<?> f;

TimerReference(TimerListener referent, ScheduledFuture<?> f) {
super(referent);
this.f = f;
}

@Override
public void clear() {
super.clear();
// stop this ScheduledFuture from any further executions
f.cancel(false); // 非强制
}

}
  • 通过 #clear() 方法,可以取消定时任务的执行。

666. 彩蛋

知识星球

顺畅~刚开始看 Hystrix 执行命令超时逻辑,一直想不通。现在整理干净了。

喵了个咪~

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. HystrixObservableTimeoutOperator
  3. 3. 3. HystrixTimer
    1. 3.1. 3.1 ScheduledExecutor
    2. 3.2. 3.2 TimerListener
    3. 3.3. 3.3 TimerReference
  4. 4. 666. 彩蛋