扫码关注公众号:芋道源码

发送: 百事可乐
获取永久解锁本站全部文章的链接

《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-renew/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程

FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑

  • 蓝框部分,为本文重点。
  • 蓝框部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。

推荐 Spring Cloud 书籍

2. Eureka-Client 发起续租

Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。
Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ),避免租约过期。

默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。

2.1 初始化定时任务

Eureka-Client 在初始化过程中,创建心跳线程,固定间隔向 Eureka-Server 发起续租( renew )。实现代码如下:

// DiscoveryClient.java

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略无关代码
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());

heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff

// ... 省略无关代码

// 【3.2.14】初始化定时任务
initScheduledTasks();
}

private void initScheduledTasks() {

// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);

// ... 省略无关代码
}
// ... 省略无关代码
}
  • scheduler,定时任务服务,用于定时触发心跳( 续租 )。细心如你,会发现任务提交的方式是 ScheduledExecutorService#schedule(...) 方法,只延迟执行一次心跳,说好的固定频率执行心跳呢!!!答案在 「2.3 TimedSupervisorTask」 揭晓。
  • heartbeatExecutor,心跳任务执行线程池。为什么有 scheduler 的情况下,还有 heartbeatExecutor ???答案也在 「2.3 TimedSupervisorTask」 揭晓。
  • HeartbeatThread,心跳线程,在「2.2 TimedSupervisorTask」 详细解析。

2.2 HeartbeatThread

com.netflix.discovery.DiscoveryClient.HeartbeatThread,心跳线程,实现执行 Eureka-Client 向 Eureka-Server 发起续租( renew )请求。实现代码如下:

// DiscoveryClient.java
/**
* 最后成功向 Eureka-Server 心跳时间戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;

private class HeartbeatThread implements Runnable {

public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
  • 调用 #renew 方法,执行续租逻辑。实现代码如下:

    // DiscoveryClient.java
    boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    if (httpResponse.getStatusCode() == 404) {
    REREGISTER_COUNTER.increment();
    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
    long timestamp = instanceInfo.setIsDirtyWithTime();
    // 发起注册
    boolean success = register();
    if (success) {
    instanceInfo.unsetIsDirty(timestamp);
    }
    return success;
    }
    return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
    logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
    return false;
    }
    }
    • 调用 AbstractJerseyEurekaHttpClient#sendHeartBeat(...) 方法,发起续租请求,实现代码如下:

      // AbstractJerseyEurekaHttpClient.java
      @Override
      public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
      String urlPath = "apps/" + appName + '/' + id;
      ClientResponse response = null;
      try {
      WebResource webResource = jerseyClient.resource(serviceUrl)
      .path(urlPath)
      .queryParam("status", info.getStatus().toString())
      .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
      if (overriddenStatus != null) {
      webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
      }
      Builder requestBuilder = webResource.getRequestBuilder();
      addExtraHeaders(requestBuilder);
      response = requestBuilder.put(ClientResponse.class);
      EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
      if (response.hasEntity()) {
      eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
      }
      return eurekaResponseBuilder.build();
      } finally {
      if (logger.isDebugEnabled()) {
      logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
      }
      if (response != null) {
      response.close();
      }
      }
      }
      • PUT 请求 Eureka-Server 的 apps/${APP_NAME}/${INSTANCE_INFO_ID} 接口,参数为 statuslastDirtyTimestampoverriddenstatus,实现续租。
    • 调用 AbstractJerseyEurekaHttpClient#register(...) 方法,当 Eureka-Server 不存在租约时,重新发起注册,在《Eureka 源码解析 —— 应用实例注册发现 (一)之注册》有详细解析。

2.3 TimedSupervisorTask

com.netflix.discovery.TimedSupervisorTask,监管定时任务的任务。

A supervisor task that schedules subtasks while enforce a timeout.

创建 TimedSupervisorTask 代码如下:

public class TimedSupervisorTask extends TimerTask {

private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;

/**
* 定时任务服务
*/
private final ScheduledExecutorService scheduler;
/**
* 执行子任务线程池
*/
private final ThreadPoolExecutor executor;
/**
* 子任务执行超时时间
*/
private final long timeoutMillis;
/**
* 子任务
*/
private final Runnable task;
/**
* 当前任子务执行频率
*/
private final AtomicLong delay;
/**
* 最大子任务执行频率
*
* 子任务执行超时情况下使用
*/
private final long maxDelay;

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;

// Initialize the counters and register.
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}

}
  • scheduler ,定时任务服务,用于定时【发起】子任务。
  • executor ,执行子任务线程池,用于【提交】子任务执行。
  • task ,子任务。
  • timeoutMillis ,子任务执行超时时间,单位:毫秒。
  • delay ,当前子任务执行频率,单位:毫秒。值等于 timeout 参数。
  • maxDelay最大子任务执行频率,单位:毫秒。值等于 timeout * expBackOffBound 参数。

  • scheduler 初始化延迟执行 TimedSupervisorTask 。
  • TimedSupervisorTask 执行时,提交 taskexecutor 执行任务。
    • task 执行正常,TimedSupervisorTask 再次提交自己scheduler 延迟 timeoutMillis 执行。
    • task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ),再次提交自己scheduler 延迟执行。

实现代码如下:

// TimedSupervisorTask.java
1: @Override
2: public void run() {
3: Future<?> future = null;
4: try {
5: // 提交 任务
6: future = executor.submit(task);
7: //
8: threadPoolLevelGauge.set((long) executor.getActiveCount());
9: // 等待任务 执行完成 或 超时
10: future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
11: // 设置 下一次任务执行频率
12: delay.set(timeoutMillis);
13: //
14: threadPoolLevelGauge.set((long) executor.getActiveCount());
15: } catch (TimeoutException e) {
16: logger.error("task supervisor timed out", e);
17: timeoutCounter.increment(); //
18:
19: // 设置 下一次任务执行频率
20: long currentDelay = delay.get();
21: long newDelay = Math.min(maxDelay, currentDelay * 2);
22: delay.compareAndSet(currentDelay, newDelay);
23:
24: } catch (RejectedExecutionException e) {
25: if (executor.isShutdown() || scheduler.isShutdown()) {
26: logger.warn("task supervisor shutting down, reject the task", e);
27: } else {
28: logger.error("task supervisor rejected the task", e);
29: }
30:
31: rejectedCounter.increment(); //
32: } catch (Throwable e) {
33: if (executor.isShutdown() || scheduler.isShutdown()) {
34: logger.warn("task supervisor shutting down, can't accept the task");
35: } else {
36: logger.error("task supervisor threw an exception", e);
37: }
38:
39: throwableCounter.increment(); //
40: } finally {
41: // 取消 未完成的任务
42: if (future != null) {
43: future.cancel(true);
44: }
45:
46: // 调度 下次任务
47: if (!scheduler.isShutdown()) {
48: scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
49: }
50: }
51: }
  • 第 5 至 6 行 :提交子任务 task 到执行子任务线程池 executor
  • 第 9 至 10 行 :等待子任务 task 执行完成或执行超时。
  • 第 11 至 12 行 :子任务 task 执行完成,设置下一次执行延迟 delay
  • 第 19 至 22 行 :子任务 task 执行超时,重新计算下一次执行延迟 delay 。计算公式为 Math.min(maxDelay, currentDelay * 2) 。如果多次超时,超时时间不断乘以 2 ,不允许超过最大延迟时间( maxDelay )。
  • 第 41 至 44 行 :强制取消未完成的子任务。
  • 第 46 至 49 行 :调度下一次 TimedSupervisorTask 。

3. Eureka-Server 接收续租

3.1 接收续租请求

com.netflix.eureka.resources.InstanceResource,处理单个应用实例信息的请求操作的 Resource ( Controller )。

续租应用实例信息的请求,映射 InstanceResource#renewLease() 方法,实现代码如下:

 1: @PUT
2: public Response renewLease(
3: @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
4: @QueryParam("overriddenstatus") String overriddenStatus,
5: @QueryParam("status") String status,
6: @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
7: boolean isFromReplicaNode = "true".equals(isReplication);
8: // 续租
9: boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
10:
11: // 续租失败
12: // Not found in the registry, immediately ask for a register
13: if (!isSuccess) {
14: logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
15: return Response.status(Status.NOT_FOUND).build();
16: }
17:
18: // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
19: // Check if we need to sync based on dirty time stamp, the client
20: // instance might have changed some value
21: Response response = null;
22: if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
23: response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
24: // Store the overridden status since the validation found out the node that replicates wins
25: if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
26: && (overriddenStatus != null)
27: && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
28: && isFromReplicaNode) {
29: registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
30: }
31: } else { // 成功
32: response = Response.ok().build();
33: }
34: logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
35: return response;
36: }
  • 第 8 至 9 行 :调用 PeerAwareInstanceRegistryImpl#renew(...) 方法,续租。实现代码如下:

    // PeerAwareInstanceRegistryImpl.java
    public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) { // 续租
    // Eureka-Server 复制
    replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
    return true;
    }
    return false;
    }
    • 调用父类 AbstractInstanceRegistry#renew(...) 方法,注册应用实例信息。
  • 第 11 至 16 行 :续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起 InstanceInfo 的注册。

  • 第 18 至 30 行 :比较请求的 lastDirtyTimestamp 和 Server 的 InstanceInfo 的 lastDirtyTimestamp 属性差异,需要配置 eureka.syncWhenTimestampDiffers = true ( 默认开启 )。

    • 第 23 行 :调用 #validateDirtyTimestamp(...) 方法,比较 lastDirtyTimestamp 的差异。实现代码如下:

      // InstanceResource.java
      1: private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
      2: // 获取 InstanceInfo
      3: InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
      4: if (appInfo != null) {
      5: if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
      6: Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
      7: // 请求 的 较大
      8: if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
      9: logger.debug("Time to sync, since the last dirty timestamp differs -"
      10: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
      11: return Response.status(Status.NOT_FOUND).build();
      12: // Server 的 较大
      13: } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
      14: // In the case of replication, send the current instance info in the registry for the
      15: // replicating node to sync itself with this one.
      16: if (isReplication) {
      17: logger.debug(
      18: "Time to sync, since the last dirty timestamp differs -"
      19: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
      20: args);
      21: return Response.status(Status.CONFLICT).entity(appInfo).build();
      22: } else {
      23: return Response.ok().build();
      24: }
      25: }
      26: }
      27:
      28: }
      29: return Response.ok().build();
      30: }
      • 第 7 至 11 行 :请求的 lastDirtyTimestamp 较大,意味着请求方( 可能是 Eureka-Client ,也可能是 Eureka-Server 集群内的其他 Server )存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的数据不一致,返回 404 响应。请求方收到 404 响应后重新发起注册
      • 第 16 至 21 行 :《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
      • 第 22 至 24 行 :Server 的 lastDirtyTimestamp 较大,并且请求方为 Eureka-Client,续租成功,返回 200 成功响应。
      • 第 29 行 :lastDirtyTimestamp 一致,返回 200 成功响应。
  • 第 31 至 33 行 :续租成功,返回 200 成功响应。

3.2 续租应用实例信息

调用 AbstractInstanceRegistry#renew(...) 方法,续租应用实例信息,实现代码如下:

 1: public boolean renew(String appName, String id, boolean isReplication) {
2: // 增加 续租次数 到 监控
3: RENEW.increment(isReplication);
4: // 获得 租约
5: Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
6: Lease<InstanceInfo> leaseToRenew = null;
7: if (gMap != null) {
8: leaseToRenew = gMap.get(id);
9: }
10: // 租约不存在
11: if (leaseToRenew == null) {
12: RENEW_NOT_FOUND.increment(isReplication);
13: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
14: return false;
15: } else {
16: InstanceInfo instanceInfo = leaseToRenew.getHolder();
17: if (instanceInfo != null) {
18: // touchASGCache(instanceInfo.getASGName());
19: // override status
20: InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
21: instanceInfo, leaseToRenew, isReplication);
22: if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
23: logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
24: + "; re-register required", instanceInfo.getId());
25: RENEW_NOT_FOUND.increment(isReplication);
26: return false;
27: }
28: if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
29: Object[] args = {
30: instanceInfo.getStatus().name(),
31: instanceInfo.getOverriddenStatus().name(),
32: instanceInfo.getId()
33: };
34: logger.info(
35: "The instance status {} is different from overridden instance status {} for instance {}. "
36: + "Hence setting the status to overridden status", args);
37: instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
38: }
39: }
40: // 新增 续租每分钟次数
41: renewsLastMin.increment();
42: // 设置 租约最后更新时间(续租)
43: leaseToRenew.renew();
44: return true;
45: }
46: }
  • 第 2 至 3 行 :增加续租次数到监控。配合 Netflix Servo 实现监控信息采集。
  • 第 4 至 9 行 :获得租约( Lease )。
  • 第 10 至 14 行 :租约不存在,返回续租失败( false )。
  • 第 19 至 21 行 :获得应用实例的最终状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
  • 第 22 至 27 行 :应用实例的最终状态UNKNOWN,无法续约,返回 false 。在《应用实例注册发现 (八)之覆盖状态》详细解析。
  • 第 28 至 37 行 :应用实例的状态与最终状态不相等,使用最终状态覆盖应用实例的状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
  • 第 40 至 41 行 :新增续租每分钟次数( renewsLastMin )。com.netflix.eureka.util.MeasuredRate,速度测量类,实现代码如下:

    // AbstractInstanceRegistry.java
    /**
    * 续租每分钟次数
    */
    private final MeasuredRate renewsLastMin;

    // MeasuredRate.java
    public class MeasuredRate {
    /**
    * 上一个间隔次数
    */
    private final AtomicLong lastBucket = new AtomicLong(0);
    /**
    * 当前间隔次数
    */
    private final AtomicLong currentBucket = new AtomicLong(0);
    /**
    * 间隔
    */
    private final long sampleInterval;
    /**
    * 定时器
    */
    private final Timer timer;

    private volatile boolean isActive;

    public MeasuredRate(long sampleInterval) {
    this.sampleInterval = sampleInterval;
    this.timer = new Timer("Eureka-MeasureRateTimer", true);
    this.isActive = false;
    }

    public synchronized void start() {
    if (!isActive) {
    timer.schedule(new TimerTask() {

    @Override
    public void run() {
    try {
    // Zero out the current bucket.
    lastBucket.set(currentBucket.getAndSet(0));
    } catch (Throwable e) {
    logger.error("Cannot reset the Measured Rate", e);
    }
    }
    }, sampleInterval, sampleInterval);

    isActive = true;
    }
    }

    public synchronized void stop() {
    if (isActive) {
    timer.cancel();
    isActive = false;
    }
    }

    /**
    * Returns the count in the last sample interval.
    */
    public long getCount() {
    return lastBucket.get();
    }

    /**
    * Increments the count in the current sample interval.
    */
    public void increment() {
    currentBucket.incrementAndGet();
    }
    }
    • timer ,定时器,负责每个 sampleInterval 间隔重置当前次数( currentBucket ),并将原当前次数设置到上一个次数( lastBucket )。
    • #increment() 方法,返回当前次数( currentBucket )。
    • #getCount() 方法,返回上一个次数( lastBucket )。
    • renewsLastMin 有如下用途:
  • 第 42 至 43 行 :调用 Lease#renew() 方法,设置租约最后更新时间( 续租 ),实现代码如下:

    public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
    • x
  • 第 44 行 :返回续租成功( true )。

  • 整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此像注册操作一样加锁

666. 彩蛋

知识星球

效率比想象的低一些,加油继续更新下一篇。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. Eureka-Client 发起续租
    1. 2.1. 2.1 初始化定时任务
    2. 2.2. 2.2 HeartbeatThread
    3. 2.3. 2.3 TimedSupervisorTask
  3. 3. 3. Eureka-Server 接收续租
    1. 3.1. 3.1 接收续租请求
    2. 3.2. 3.2 续租应用实例信息
  4. 4. 666. 彩蛋