⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-register/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 注册应用实例的过程

FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑

  • 蓝框部分,为本文重点。
  • 蓝框部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。

推荐 Spring Cloud 书籍

2. Eureka-Client 发起注册

Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:

  • 配置 eureka.registration.enabled = true,Eureka-Client 向 Eureka-Server 发起注册应用实例的开关
  • InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致。

每次 InstanceInfo 发生属性变化时,标记 isInstanceInfoDirty 属性为 true,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册。另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册。

当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程定时注册。

当 InstanceInfo 的状态( status ) 属性发生变化时,并且配置 eureka.shouldOnDemandUpdateStatusChange = true 时,立即向 Eureka-Server 注册。因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的

Let's Go。让我们看看代码的实现。

2.1 应用实例信息复制器

// DiscoveryClient.java
public class DiscoveryClient implements EurekaClient {

/**
* 应用实例状态变更监听器
*/
private ApplicationInfoManager.StatusChangeListener statusChangeListener;
/**
* 应用实例信息复制器
*/
private InstanceInfoReplicator instanceInfoReplicator;

private void initScheduledTasks() {
// ... 省略无关代码

if (clientConfig.shouldRegisterWithEureka()) {

// ... 省略无关代码

// 创建 应用实例信息复制器
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize

// 创建 应用实例状态变更监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}

@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};

// 注册 应用实例状态变更监听器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

// 开启 应用实例信息复制器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

}

}

  • com.netflix.discovery.InstanceInfoReplicator,应用实例信息复制器。

    • 调用 InstanceInfoReplicator#start(...) 方法,开启应用实例信息复制器。实现代码如下:

      // InstanceInfoReplicator.java
      class InstanceInfoReplicator implements Runnable {

      private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class);

      private final DiscoveryClient discoveryClient;
      /**
      * 应用实例信息
      */
      private final InstanceInfo instanceInfo;
      /**
      * 定时执行频率,单位:秒
      */
      private final int replicationIntervalSeconds;
      /**
      * 定时执行器
      */
      private final ScheduledExecutorService scheduler;
      /**
      * 定时执行任务的 Future
      */
      private final AtomicReference<Future> scheduledPeriodicRef;
      /**
      * 是否开启调度
      */
      private final AtomicBoolean started;

      private final RateLimiter rateLimiter; // 限流相关,跳过
      private final int burstSize; // 限流相关,跳过
      private final int allowedRatePerMinute; // 限流相关,跳过

      InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
      this.discoveryClient = discoveryClient;
      this.instanceInfo = instanceInfo;
      this.scheduler = Executors.newScheduledThreadPool(1,
      new ThreadFactoryBuilder()
      .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
      .setDaemon(true)
      .build());

      this.scheduledPeriodicRef = new AtomicReference<Future>();

      this.started = new AtomicBoolean(false);
      this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
      this.replicationIntervalSeconds = replicationIntervalSeconds;
      this.burstSize = burstSize;

      this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
      logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
      }

      public void start(int initialDelayMs) {
      if (started.compareAndSet(false, true)) {
      // 设置 应用实例信息 数据不一致
      instanceInfo.setIsDirty(); // for initial register
      // 提交任务,并设置该任务的 Future
      Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
      scheduledPeriodicRef.set(next);
      }
      }

      // ... 省略无关方法
      }

      // InstanceInfo.java
      private volatile boolean isInstanceInfoDirty = false;
      private volatile Long lastDirtyTimestamp;

      public synchronized void setIsDirty() {
      isInstanceInfoDirty = true;
      lastDirtyTimestamp = System.currentTimeMillis();
      }

      • 执行 instanceInfo.setIsDirty() 代码块,因为 InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册
      • 调用 ScheduledExecutorService#schedule(...) 方法,延迟 initialDelayMs 毫秒执行一次任务。为什么此处设置 scheduledPeriodicRef ?在 InstanceInfoReplicator#onDemandUpdate() 方法会看到具体用途。
    • 定时检查 InstanceInfo 的状态( status ) 属性是否发生变化。若是,发起注册。实现代码如下:

      // InstanceInfoReplicator.java
      @Override
      public void run() {
      try {
      // 刷新 应用实例信息
      discoveryClient.refreshInstanceInfo();
      // 判断 应用实例信息 是否数据不一致
      Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
      if (dirtyTimestamp != null) {
      // 发起注册
      discoveryClient.register();
      // 设置 应用实例信息 数据一致
      instanceInfo.unsetIsDirty(dirtyTimestamp);
      }
      } catch (Throwable t) {
      logger.warn("There was a problem with the instance info replicator", t);
      } finally {
      // 提交任务,并设置该任务的 Future
      Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
      scheduledPeriodicRef.set(next);
      }
      }

      // InstanceInfo.java
      public synchronized long setIsDirtyWithTime() {
      setIsDirty();
      return lastDirtyTimestamp;
      }

      public synchronized void unsetIsDirty(long unsetDirtyTimestamp) {
      if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
      isInstanceInfoDirty = false;
      } else {
      }
      }

      • 调用 DiscoveryClient#refreshInstanceInfo() 方法,刷新应用实例信息。此处可能导致应用实例信息数据不一致,在「2.2」刷新应用实例信息 详细解析。
      • 调用 DiscoveryClient#register() 方法,Eureka-Client 向 Eureka-Server 注册应用实例
      • 调用 ScheduledExecutorService#schedule(...) 方法,再次延迟执行任务,并设置 scheduledPeriodicRef。通过这样的方式,不断循环定时执行任务。
  • com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener 内部类,监听应用实例信息状态的变更。

    • 调用 ApplicationInfoManager#registerStatusChangeListener(...) 方法,注册应用实例状态变更监听器。实现代码如下:

      public class ApplicationInfoManager {

      /**
      * 状态变更监听器
      */
      protected final Map<String, StatusChangeListener> listeners;

      public void registerStatusChangeListener(StatusChangeListener listener) {
      listeners.put(listener.getId(), listener);
      }
      }

    • 业务里,调用 ApplicationInfoManager#setInstanceStatus(...) 方法,设置应用实例信息的状态,从而通知 InstanceInfoReplicator#onDemandUpdate() 方法的调用。实现代码如下:

      // ApplicationInfoManager.java
      public synchronized void setInstanceStatus(InstanceStatus status) {
      InstanceStatus next = instanceStatusMapper.map(status);
      if (next == null) {
      return;
      }
      InstanceStatus prev = instanceInfo.setStatus(next);
      if (prev != null) {
      for (StatusChangeListener listener : listeners.values()) {
      try {
      listener.notify(new StatusChangeEvent(prev, next));
      } catch (Exception e) {
      logger.warn("failed to notify listener: {}", listener.getId(), e);
      }
      }
      }
      }

      // InstanceInfo.java
      public synchronized InstanceStatus setStatus(InstanceStatus status) {
      if (this.status != status) {
      InstanceStatus prev = this.status;
      this.status = status;
      // 设置 应用实例信息 数据一致
      setIsDirty();
      return prev;
      }
      return null;
      }

    • InstanceInfoReplicator#onDemandUpdate(),实现代码如下:

      // InstanceInfoReplicator.java
      public boolean onDemandUpdate() {
      if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过
      scheduler.submit(new Runnable() {
      @Override
      public void run() {
      logger.debug("Executing on-demand update of local InstanceInfo");
      // 取消任务
      Future latestPeriodic = scheduledPeriodicRef.get();
      if (latestPeriodic != null && !latestPeriodic.isDone()) {
      logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
      latestPeriodic.cancel(false);
      }
      // 再次调用
      InstanceInfoReplicator.this.run();
      }
      });
      return true;
      } else {
      logger.warn("Ignoring onDemand update due to rate limiter");
      return false;
      }
      }

      • 调用 Future#cancel(false) 方法,取消定时任务,避免无用的注册
      • 调用 InstanceInfoReplicator#run() 方法,发起注册。

2.2 刷新应用实例信息

调用 DiscoveryClient#refreshInstanceInfo() 方法,刷新应用实例信息。此处可能导致应用实例信息数据不一致,实现代码如下:

void refreshInstanceInfo() {
// 刷新 数据中心信息
applicationInfoManager.refreshDataCenterInfoIfRequired();
// 刷新 租约信息
applicationInfoManager.refreshLeaseInfoIfRequired();
// 健康检查
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}

  • 调用 ApplicationInfoManager#refreshDataCenterInfoIfRequired() 方法,刷新数据中心相关信息,实现代码如下:

    // ApplicationInfoManager.java
    public void refreshDataCenterInfoIfRequired() {
    // hostname
    String existingAddress = instanceInfo.getHostName();
    String newAddress;
    if (config instanceof RefreshableInstanceConfig) {
    // Refresh data center info, and return up to date address
    newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
    } else {
    newAddress = config.getHostName(true);
    }
    // ip
    String newIp = config.getIpAddress();
    if (newAddress != null && !newAddress.equals(existingAddress)) {
    logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
    // :( in the legacy code here the builder is acting as a mutator.
    // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
    // We will most likely re-write the client at sometime so not fixing for now.
    InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
    builder.setHostName(newAddress) // hostname
    .setIPAddr(newIp) // ip
    .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo
    instanceInfo.setIsDirty();
    }
    }

    public abstract class AbstractInstanceConfig implements EurekaInstanceConfig {

    private static final Pair<String, String> hostInfo = getHostInfo();

    @Override
    public String getHostName(boolean refresh) {
    return hostInfo.second();
    }

    @Override
    public String getIpAddress() {
    return hostInfo.first();
    }

    private static Pair<String, String> getHostInfo() {
    Pair<String, String> pair;
    try {
    InetAddress localHost = InetAddress.getLocalHost();
    pair = new Pair<String, String>(localHost.getHostAddress(), localHost.getHostName());
    } catch (UnknownHostException e) {
    logger.error("Cannot get host info", e);
    pair = new Pair<String, String>("", "");
    }
    return pair;
    }

    }

    • 关注应用实例信息的 hostNameipAddrdataCenterInfo 属性的变化。
    • 一般情况下,我们使用的是非 RefreshableInstanceConfig 实现的配置类( 一般是 MyDataCenterInstanceConfig ),因为 AbstractInstanceConfig.hostInfo静态属性即使本机修改了 IP 等信息,Eureka-Client 进程也不会感知到。TODO[0022]:看下springcloud 的实现
  • 调用 ApplicationInfoManager#refreshLeaseInfoIfRequired() 方法,刷新租约相关信息,实现代码如下:

    public void refreshLeaseInfoIfRequired() {
    LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
    if (leaseInfo == null) {
    return;
    }
    int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
    int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
    if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变
    || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变
    LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
    .setRenewalIntervalInSecs(currentLeaseRenewal)
    .setDurationInSecs(currentLeaseDuration)
    .build();
    instanceInfo.setLeaseInfo(newLeaseInfo);
    instanceInfo.setIsDirty();
    }
    }

    • 关注应用实例信息的 renewalIntervalInSecsdurationInSecs 属性的变化。
  • 调用 HealthCheckHandler#getStatus() 方法,健康检查。这里先暂时跳过,我们在TODO[0004]:健康检查 详细解析。

2.3 发起注册应用实例

调用 DiscoveryClient#register() 方法,Eureka-Client 向 Eureka-Server 注册应用实例,实现代码如下:

// DiscoveryClient.java
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}

// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}

  • 调用 AbstractJerseyEurekaHttpClient#register(...) 方法,POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册。

3. Eureka-Server 接收注册

3.1 接收注册请求

com.netflix.eureka.resources.ApplicationResource,处理单个应用的请求操作的 Resource ( Controller )。

注册应用实例信息的请求,映射 ApplicationResource#addInstance() 方法,实现代码如下:

@Produces({"application/xml", "application/json"})
public class ApplicationResource {

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 校验参数是否合法
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}

// AWS 相关,跳过
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}

// 注册应用实例信息
registry.register(info, "true".equals(isReplication));

// 返回 204 成功
return Response.status(204).build(); // 204 to be backwards compatible
}

}

  • 请求头 isReplication 参数,和 Eureka-Server 集群复制相关,暂时跳过。

  • 调用 PeerAwareInstanceRegistryImpl#register(...) 方法,注册应用实例信息。实现代码如下:

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
    // 租约过期时间
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
    leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 注册应用实例信息
    super.register(info, leaseDuration, isReplication);
    // Eureka-Server 复制
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

    • 调用父类 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息。

3.2 Lease

在看具体的注册应用实例信息的逻辑之前,我们先来看下 com.netflix.eureka.lease.Lease,租约。实现代码如下:

public class Lease<T> {

/**
* 实体
*/
private T holder;
/**
* 注册时间戳
*/
private long registrationTimestamp;
/**
* 开始服务时间戳
*/
private long serviceUpTimestamp;
/**
* 取消注册时间戳
*/
private long evictionTimestamp;
/**
* 最后更新时间戳
*/
// Make it volatile so that the expiration task would see this quicker
private volatile long lastUpdateTimestamp;
/**
* 租约持续时长,单位:毫秒
*/
private long duration;

public Lease(T r, int durationInSecs) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs * 1000);
}

}

  • holder 属性,租约的持有者。在 Eureka-Server 里,暂时只有 InstanceInfo 使用。

  • registrationTimestamp 属性,注册( 创建 )租约时间戳。在构造方法里可以看租约对象的创建时间戳即为注册租约时间戳。

  • serviceUpTimestamp 属性,开始服务时间戳。注册应用实例信息会使用到它如下两个方法,实现代码如下:

    public void serviceUp() {
    if (serviceUpTimestamp == 0) { // 第一次有效
    serviceUpTimestamp = System.currentTimeMillis();
    }
    }

    public void setServiceUpTimestamp(long serviceUpTimestamp) {
    this.serviceUpTimestamp = serviceUpTimestamp;
    }

  • lastUpdatedTimestamp 属性,最后更新租约时间戳。每次续租时,更新该时间戳。注册应用实例信息会使用到它如下方法,实现代码如下:

    public void setLastUpdatedTimestamp() {
    this.lastUpdatedTimestamp = System.currentTimeMillis();
    }

  • duration 属性,租约持续时间,单位:毫秒。当租约过久未续租,即当前时间 - lastUpdatedTimestamp > duration 时,租约过期。

  • evictionTimestamp 属性,租约过期时间戳。

3.3 注册应用实例信息

调用 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息,实现代码如下:

 1: public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
2: try {
3: // 获取读锁
4: read.lock();
5: Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
6: // 增加 注册次数 到 监控
7: REGISTER.increment(isReplication);
8: // 获得 应用实例信息 对应的 租约
9: if (gMap == null) {
10: final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
11: gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
12: if (gMap == null) { // 添加 应用 成功
13: gMap = gNewMap;
14: }
15: }
16: Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
17: // Retain the last dirty timestamp without overwriting it, if there is already a lease
18: if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的
19: Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
20: Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
21: logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
22:
23: // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
24: // InstanceInfo instead of the server local copy.
25: if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
26: logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27: " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28: logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29: registrant = existingLease.getHolder();
30: }
31: } else {
32: // The lease does not exist and hence it is a new registration
33: // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
34: synchronized (lock) {
35: if (this.expectedNumberOfRenewsPerMin > 0) {
36: // Since the client wants to cancel it, reduce the threshold
37: // (1
38: // for 30 seconds, 2 for a minute)
39: this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
40: this.numberOfRenewsPerMinThreshold =
41: (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
42: }
43: }
44: logger.debug("No previous lease information found; it is new registration");
45: }
46: // 创建 租约
47: Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
48: if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳
49: lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
50: }
51: // 添加到 租约映射
52: gMap.put(registrant.getId(), lease);
53: // 添加到 最近注册的调试队列
54: synchronized (recentRegisteredQueue) {
55: recentRegisteredQueue.add(new Pair<Long, String>(
56: System.currentTimeMillis(),
57: registrant.getAppName() + "(" + registrant.getId() + ")"));
58: }
59: // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
60: // This is where the initial state transfer of overridden status happens
61: if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
62: logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
63: + "overrides", registrant.getOverriddenStatus(), registrant.getId());
64: if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
65: logger.info("Not found overridden id {} and hence adding it", registrant.getId());
66: overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
67: }
68: }
69: InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
70: if (overriddenStatusFromMap != null) {
71: logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
72: registrant.setOverriddenStatus(overriddenStatusFromMap);
73: }
74:
75: // 获得应用实例最终状态,并设置应用实例的状态
76: // Set the status based on the overridden status rules
77: InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
78: registrant.setStatusWithoutDirty(overriddenInstanceStatus);
79:
80: // 设置 租约的开始服务的时间戳(只有第一次有效)
81: // If the lease is registered with UP status, set lease service up timestamp
82: if (InstanceStatus.UP.equals(registrant.getStatus())) {
83: lease.serviceUp();
84: }
85: // 设置 应用实例信息的操作类型 为 添加
86: registrant.setActionType(ActionType.ADDED);
87: // 添加到 最近租约变更记录队列
88: recentlyChangedQueue.add(new RecentlyChangedItem(lease));
89: // 设置 租约的最后更新时间戳
90: registrant.setLastUpdatedTimestamp();
91: // 设置 响应缓存 过期
92: invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
93: logger.info("Registered instance {}/{} with status {} (replication={})",
94: registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
95: } finally {
96: // 释放锁
97: read.unlock();
98: }
99: }

  • 第 3 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。

  • 第 6 至 7 行 :增加注册次数到监控。配合 Netflix Servo 实现监控信息采集。

  • 第 5 至 16 行 :获得应用实例信息对应的租约registry 实现代码如下:

    /**
    * 租约映射
    * key1 :应用名 {@link InstanceInfo#appName}
    * key2 :应用实例信息编号 {@link InstanceInfo#instanceId}
    * value :租约
    */
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

  • 第 17 至 30 行 :当租约已存在,判断 Server 已存在的 InstanceInfo 的 lastDirtyTimestamp 是否大于( 不包括等于 ) Client 请求的 InstanceInfo ,若是,使用 Server 的 InstanceInfo 进行替代

  • 第 31 至 44 行 :增加 numberOfRenewsPerMinThresholdexpectedNumberOfRenewsPerMin,自我保护机制相关,在 《Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制》 有详细解析。

  • 第 45 至 52 行 :创建租约,并添加到租约映射( registry )。

  • 第 53 至 58 行 :添加到最近注册的调试队列( recentRegisteredQueue ),用于 Eureka-Server 运维界面的显示,无实际业务逻辑使用。实现代码如下:

    /**
    * 最近注册的调试队列
    * key :添加时的时间戳
    * value :字符串 = 应用名(应用实例信息编号)
    */
    private final CircularQueue<Pair<Long, String>> recentRegisteredQueue;

    /**
    * 循环队列
    *
    * @param <E> 泛型
    */
    private class CircularQueue<E> extends ConcurrentLinkedQueue<E> {

    /**
    * 队列大小
    */
    private int size = 0;

    public CircularQueue(int size) {
    this.size = size;
    }

    @Override
    public boolean add(E e) {
    this.makeSpaceIfNotAvailable();
    return super.add(e);

    }

    /**
    * 保证空间足够
    *
    * 当空间不够时,移除首元素
    */
    private void makeSpaceIfNotAvailable() {
    if (this.size() == size) {
    this.remove();
    }
    }

    public boolean offer(E e) {
    this.makeSpaceIfNotAvailable();
    return super.offer(e);
    }
    }

  • 第 59 至 68 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析。

  • 第 69 至 73 行 :设置应用实例的覆盖状态( overridestatus ),避免注册应用实例后,丢失覆盖状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。

  • 第 75 至 78 行 : 获得应用实例最终状态,并设置应用实例的状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。

  • 第 80 至 84 行 :设置租约的开始服务的时间戳( 只有第一次有效 )。

  • 第 85 至 88 行 :设置应用实例信息的操作类型为添加,并添加到最近租约变更记录队列( recentlyChangedQueue )。recentlyChangedQueue 用于注册信息的增量获取,在《应用实例注册发现 (七)之增量获取》详细解析。实现代码如下:

    /**
    * 最近租约变更记录队列
    */
    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();

  • 第 89 至 90 行 :设置租约的最后更新时间戳。

  • 第 91 至 92 行 :设置响应缓存( ResponseCache )过期,在《Eureka 源码解析 —— 应用实例注册发现 (六)之全量获取》详细解析。

  • 第 96 至 97 行 :释放锁。

666. 彩蛋

知识星球

嘿嘿,蛮嗨的,比起前面几篇写配置相关的文章来说。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. Eureka-Client 发起注册
    1. 2.1. 2.1 应用实例信息复制器
    2. 2.2. 2.2 刷新应用实例信息
    3. 2.3. 2.3 发起注册应用实例
  3. 3. 3. Eureka-Server 接收注册
    1. 3.1. 3.1 接收注册请求
    2. 3.2. 3.2 Lease
    3. 3.3. 3.3 注册应用实例信息
  4. 4. 666. 彩蛋