摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程

FROM 《深度剖析服务发现组件Netflix Eureka》

Eureka-Client 获取注册信息,分成全量获取增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。

本文重点在于全量获取

推荐 Spring Cloud 书籍

2. Eureka-Client 发起全量获取

本小节调用关系如下:

2.1 初始化全量获取

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:

// DiscoveryClient.java
/**
* Applications 在本地的缓存
*/
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略无关代码
// 【3.2.5】初始化应用集合在本地的缓存
localRegionApps.set(new Applications());
// ... 省略无关代码
// 【3.2.12】从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// ... 省略无关代码
}
  • com.netflix.discovery.shared.Applications,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:

  • 配置 eureka.shouldFetchRegistry = true,开启从 Eureka-Server 获取注册信息。默认值:true

  • 调用 #fetchRegistry(false) 方法,从 Eureka-Server 全量获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

2.2 定时获取

Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:

// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略无关代码
// 【3.2.9】初始化线程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略无关代码
// 【3.2.14】初始化定时任务
initScheduledTasks();
// ... 省略无关代码
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ... 省略无关代码
}

2.3 刷新注册信息缓存

调用 #refreshRegistry(false) 方法,刷新注册信息缓存,实现代码如下:

// DiscoveryClient.java
1: void refreshRegistry() {
2: try {
3: // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
4: boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
5:
6: boolean remoteRegionsModified = false;
7: // This makes sure that a dynamic change to remote regions to fetch is honored.
8: String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
9: if (null != latestRemoteRegions) {
10: String currentRemoteRegions = remoteRegionsToFetch.get();
11: if (!latestRemoteRegions.equals(currentRemoteRegions)) {
12: // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
13: synchronized (instanceRegionChecker.getAzToRegionMapper()) {
14: if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
15: String[] remoteRegions = latestRemoteRegions.split(",");
16: remoteRegionsRef.set(remoteRegions);
17: instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
18: remoteRegionsModified = true;
19: } else {
20: logger.info("Remote regions to fetch modified concurrently," +
21: " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
22: }
23: }
24: } else {
25: // Just refresh mapping to reflect any DNS/Property change
26: instanceRegionChecker.getAzToRegionMapper().refreshMapping();
27: }
28: }
29:
30: boolean success = fetchRegistry(remoteRegionsModified);
31: if (success) {
32: // 设置 注册信息的应用实例数
33: registrySize = localRegionApps.get().size();
34: // 设置 最后获取注册信息时间
35: lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
36: }
37:
38: // 打印日志
39: if (logger.isDebugEnabled()) {
40: StringBuilder allAppsHashCodes = new StringBuilder();
41: allAppsHashCodes.append("Local region apps hashcode: ");
42: allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
43: allAppsHashCodes.append(", is fetching remote regions? ");
44: allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
45: for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
46: allAppsHashCodes.append(", Remote region: ");
47: allAppsHashCodes.append(entry.getKey());
48: allAppsHashCodes.append(" , apps hashcode: ");
49: allAppsHashCodes.append(entry.getValue().getAppsHashCode());
50: }
51: logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
52: allAppsHashCodes.toString());
53: }
54: } catch (Throwable e) {
55: logger.error("Cannot fetch registry from server", e);
56: }
57: }
  • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
  • 第 30 行 :调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。
  • 第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下:

    /**
    * 注册信息的应用实例数
    */
    private volatile int registrySize = 0;
    /**
    * 最后成功从 Eureka-Server 拉取注册信息时间戳
    */
    private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
  • 第 38 至 53 行 :打印调试日志。

  • 第 54 至 56 行 :打印异常日志。

2.4 发起获取注册信息

调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:

1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
2: Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
3:
4: try {
5: // 获取 本地缓存的注册的应用实例集合
6: // If the delta is disabled or if it is the first time, get all
7: // applications
8: Applications applications = getApplications();
9:
10: // 全量获取
11: if (clientConfig.shouldDisableDelta() // 禁用增量获取
12: || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
13: || forceFullRegistryFetch
14: || (applications == null) // 空
15: || (applications.getRegisteredApplications().size() == 0) // 空
16: || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
17: {
18: logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
19: logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
20: logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
21: logger.info("Application is null : {}", (applications == null));
22: logger.info("Registered Applications size is zero : {}",
23: (applications.getRegisteredApplications().size() == 0));
24: logger.info("Application version is -1: {}", (applications.getVersion() == -1));
25: // 执行 全量获取
26: getAndStoreFullRegistry();
27: } else {
28: // 执行 增量获取
29: getAndUpdateDelta(applications);
30: }
31: // 设置 应用集合 hashcode
32: applications.setAppsHashCode(applications.getReconcileHashCode());
33: // 打印 本地缓存的注册的应用实例数量
34: logTotalInstances();
35: } catch (Throwable e) {
36: logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
37: return false;
38: } finally {
39: if (tracer != null) {
40: tracer.stop();
41: }
42: }
43:
44: // Notify about cache refresh before updating the instance remote status
45: onCacheRefreshed();
46:
47: // Update remote status based on refreshed data held in the cache
48: updateInstanceRemoteStatus();
49:
50: // registry was fetched successfully, so return true
51: return true;
52: }
  • 第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下:

    public Applications getApplications() {
    return localRegionApps.get();
    }
  • 第 10 至 26 行 :全量获取注册信息。

    • 第 11 行 :配置 eureka.disableDelta = true ,禁用增量获取注册信息。默认值:false
    • 第 12 行 :只获得一个 vipAddress 对应的应用实例们的注册信息。
    • 第 13 行 :方法参数 forceFullRegistryFetch 强制全量获取注册信息。
    • 第 14 至 15 行 :本地缓存为空。
    • 第 25 至 26 行 :调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下文详细解析。
  • 第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
  • 第 31 至 32 行 :计算应用集合 hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
  • 第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:

    private void logTotalInstances() {
    if (logger.isDebugEnabled()) {
    int totInstances = 0;
    for (Application application : getApplications().getRegisteredApplications()) {
    totInstances += application.getInstancesAsIsFromEureka().size();
    }
    logger.debug("The total number of all instances in the client now is {}", totInstances);
    }
    }
  • 第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。

    • #onCacheRefreshed() 方法,实现代码如下:

      /**
      * Eureka 事件监听器
      */
      private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();
      protected void onCacheRefreshed() {
      fireEvent(new CacheRefreshedEvent());
      }
      protected void fireEvent(final EurekaEvent event) {
      for (EurekaEventListener listener : eventListeners) {
      listener.onEvent(event);
      }
      }
      • x
    • 笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:

      // 【3.2.12】从 Eureka-Server 拉取注册信息
      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
      fetchRegistryFromBackup();
      }
  • 第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。

    1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
    2:
    3: private synchronized void updateInstanceRemoteStatus() {
    4: // Determine this instance's status for this app and set to UNKNOWN if not found
    5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
    6: if (instanceInfo.getAppName() != null) {
    7: Application app = getApplication(instanceInfo.getAppName());
    8: if (app != null) {
    9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
    10: if (remoteInstanceInfo != null) {
    11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
    12: }
    13: }
    14: }
    15: if (currentRemoteInstanceStatus == null) {
    16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
    17: }
    18:
    19: // Notify if status changed
    20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
    21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
    22: lastRemoteInstanceStatus = currentRemoteInstanceStatus;
    23: }
    24: }
    • 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。
    • 第 19 至 23 行 :对比本地缓存最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存( 注意,只更新该缓存变量,不更新本地当前应用实例的状态( instanceInfo.status ) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。#onRemoteStatusChanged(...) 实现代码如下:

      protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
      fireEvent(new StatusChangeEvent(oldStatus, newStatus));
      }

2.4.1 全量获取注册信息,并设置到本地缓存

调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:

1: private void getAndStoreFullRegistry() throws Throwable {
2: long currentUpdateGeneration = fetchRegistryGeneration.get();
3:
4: logger.info("Getting all instance registry info from the eureka server");
5:
6: // 全量获取注册信息
7: Applications apps = null;
8: EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
9: ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
10: : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
11: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
12: apps = httpResponse.getEntity();
13: }
14: logger.info("The response status is {}", httpResponse.getStatusCode());
15:
16: // 设置到本地缓存
17: if (apps == null) {
18: logger.error("The application is null for some reason. Not storing this information");
19: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
20: localRegionApps.set(this.filterAndShuffle(apps));
21: logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
22: } else {
23: logger.warn("Not updating applications as another thread is updating it already");
24: }
25: }
  • 第 6 至 14 行 :全量获取注册信息,实现代码如下:

    // AbstractJerseyEurekaHttpClient.java
    @Override
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);
    }
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
    WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
    if (regions != null && regions.length > 0) {
    regionsParamValue = StringUtil.join(regions);
    webResource = webResource.queryParam("regions", regionsParamValue);
    }
    Builder requestBuilder = webResource.getRequestBuilder();
    addExtraHeaders(requestBuilder);
    response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
    Applications applications = null;
    if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
    applications = response.getEntity(Applications.class);
    }
    return anEurekaHttpResponse(response.getStatus(), Applications.class)
    .headers(headersOf(response))
    .entity(applications)
    .build();
    } finally {
    if (logger.isDebugEnabled()) {
    logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
    serviceUrl, urlPath,
    regionsParamValue == null ? "" : "regions=" + regionsParamValue,
    response == null ? "N/A" : response.getStatus()
    );
    }
    if (response != null) {
    response.close();
    }
    }
    }
    • 调用 AbstractJerseyEurekaHttpClient#getApplications(...) 方法,GET 请求 Eureka-Server 的 apps/ 接口,参数为 regions ,返回格式为 JSON ,实现全量获取注册信息
  • 第 16 至 24 行 :设置到本地注册信息缓存

    • 第 19 行 :TODO[0025] :并发更新的情况???
    • 第 20 行 :调用 #filterAndShuffle(...) 方法,根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 :true ) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。

3. Eureka-Server 接收全量获取

3.1 接收全量获取请求

com.netflix.eureka.resources.ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。

接收全量获取请求,映射 ApplicationsResource#getContainers() 方法,实现代码如下:

1: @GET
2: public Response getContainers(@PathParam("version") String version,
3: @HeaderParam(HEADER_ACCEPT) String acceptHeader,
4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
6: @Context UriInfo uriInfo,
7: @Nullable @QueryParam("regions") String regionsStr) {
8: // TODO[0009]:RemoteRegionRegistry
9: boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
10: String[] regions = null;
11: if (!isRemoteRegionRequested) {
12: EurekaMonitors.GET_ALL.increment();
13: } else {
14: regions = regionsStr.toLowerCase().split(",");
15: Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
16: EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
17: }
18:
19: // 判断是否可以访问
20: // Check if the server allows the access to the registry. The server can
21: // restrict access if it is not
22: // ready to serve traffic depending on various reasons.
23: if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
24: return Response.status(Status.FORBIDDEN).build();
25: }
26:
27: // API 版本
28: CurrentRequestVersion.set(Version.toEnum(version));
29:
30: // 返回数据格式
31: KeyType keyType = Key.KeyType.JSON;
32: String returnMediaType = MediaType.APPLICATION_JSON;
33: if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
34: keyType = Key.KeyType.XML;
35: returnMediaType = MediaType.APPLICATION_XML;
36: }
37:
38: // 响应缓存键( KEY )
39: Key cacheKey = new Key(Key.EntityType.Application,
40: ResponseCacheImpl.ALL_APPS,
41: keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
42: );
43:
44: //
45: Response response;
46: if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
47: response = Response.ok(responseCache.getGZIP(cacheKey))
48: .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
49: .header(HEADER_CONTENT_TYPE, returnMediaType)
50: .build();
51: } else {
52: response = Response.ok(responseCache.get(cacheKey))
53: .build();
54: }
55: return response;
56: }
  • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
  • 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。
  • 第 27 至 28 行 :设置 API 版本号。默认最新 API 版本为 V2。实现代码如下:

    public enum Version {
    V1, V2;
    public static Version toEnum(String v) {
    for (Version version : Version.values()) {
    if (version.name().equalsIgnoreCase(v)) {
    return version;
    }
    }
    //Defaults to v2
    return V2;
    }
    }
  • 第 30 至 36 行 :设置返回数据格式,默认 JSON 。

  • 第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在 「3.2.1 缓存键」详细解析。
  • 第 44 至 55 行 :从响应缓存读取全量注册信息,在 「3.3 缓存读取」详细解析。

3.2 响应缓存 ResponseCache

com.netflix.eureka.registry.ResponseCache,响应缓存接口,接口代码如下:

public interface ResponseCache {
String get(Key key);
byte[] getGZIP(Key key);
void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
AtomicLong getVersionDelta();
AtomicLong getVersionDeltaWithRegions();
}
  • 其中,#getVersionDelta()#getVersionDeltaWithRegions() 已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码:

    // Applications.java
    @Deprecated
    public void setVersion(Long version) {
    this.versionDelta = version;
    }
    // AbstractInstanceRegistry.java
    public Applications getApplicationDeltas() {
    // ... 省略其它无关代码
    apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方
    // ... 省略其它无关代码
    }
  • #get() :获得缓存。

  • #getGZIP() :获得缓存,并 GZIP 。
  • #invalidate() :过期缓存。

3.2.1 缓存键

com.netflix.eureka.registry.Key,缓存键。实现代码如下:

public class Key {
public enum KeyType {
JSON, XML
}
/**
* An enum to define the entity that is stored in this cache for this key.
*/
public enum EntityType {
Application, VIP, SVIP
}
/**
* 实体名
*/
private final String entityName;
/**
* TODO[0009]:RemoteRegionRegistry
*/
private final String[] regions;
/**
* 请求参数类型
*/
private final KeyType requestType;
/**
* 请求 API 版本号
*/
private final Version requestVersion;
/**
* hashKey
*/
private final String hashKey;
/**
* 实体类型
*
* {@link EntityType}
*/
private final EntityType entityType;
/**
* {@link EurekaAccept}
*/
private final EurekaAccept eurekaAccept;
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
@Override
public int hashCode() {
String hashKey = getHashKey();
return hashKey.hashCode();
}
@Override
public boolean equals(Object other) {
if (other instanceof Key) {
return getHashKey().equals(((Key) other).getHashKey());
} else {
return false;
}
}
}

3.2.2 响应缓存实现类

com.netflix.eureka.registry.ResponseCacheImpl,响应缓存实现类。

在 ResponseCacheImpl 里,将缓存拆分成两层 :

  • 只读缓存( readOnlyCacheMap )
  • 固定过期 + 固定大小读写缓存( readWriteCacheMap )

默认配置下,缓存读取策略如下:

缓存过期策略如下:

  • 应用实例注册、下线、过期时,只只只过期 readWriteCacheMap
  • readWriteCacheMap 写入一段时间( 可配置 )后自动过期。
  • 定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

注意:应用实例注册、下线、过期时,不会很快刷新到 readWriteCacheMap 缓存里。默认配置下,最大延迟在 30 秒。

为什么可以使用缓存?

CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。

推荐阅读:

3.3 缓存读取

调用 ResponseCacheImpl#get(...) 方法( #getGzip(...) 类似 ),读取缓存,实现代码如下:

1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
2:
3: private final LoadingCache<Key, Value> readWriteCacheMap;
4:
5: public String get(final Key key) {
6: return get(key, shouldUseReadOnlyResponseCache);
7: }
8:
9: String get(final Key key, boolean useReadOnlyCache) {
10: Value payload = getValue(key, useReadOnlyCache);
11: if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
12: return null;
13: } else {
14: return payload.getPayload();
15: }
16: }
17:
18: Value getValue(final Key key, boolean useReadOnlyCache) {
19: Value payload = null;
20: try {
21: if (useReadOnlyCache) {
22: final Value currentPayload = readOnlyCacheMap.get(key);
23: if (currentPayload != null) {
24: payload = currentPayload;
25: } else {
26: payload = readWriteCacheMap.get(key);
27: readOnlyCacheMap.put(key, payload);
28: }
29: } else {
30: payload = readWriteCacheMap.get(key);
31: }
32: } catch (Throwable t) {
33: logger.error("Cannot get value for key :" + key, t);
34: }
35: return payload;
36: }
  • 第 5 至 7 行 :调用 #get(key, useReadOnlyCache) 方法,读取缓存。其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 :true ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 readOnlyCacheMap ,性能会有一定的下降。
  • 第 9 至 16 行 :调用 getValue(key, useReadOnlyCache) 方法,读取缓存。从 readOnlyCacheMapreadWriteCacheMap 变量可以看到缓存值的类为 com.netflix.eureka.registry.ResponseCacheImpl.Value ,实现代码如下:

    public class Value {
    /**
    * 原始值
    */
    private final String payload;
    /**
    * GZIP 压缩后的值
    */
    private byte[] gzipped;
    public Value(String payload) {
    this.payload = payload;
    if (!EMPTY_PAYLOAD.equals(payload)) {
    // ... 省略 GZIP 压缩代码
    gzipped = bos.toByteArray();
    } else {
    gzipped = null;
    }
    }
    public String getPayload() {
    return payload;
    }
    public byte[] getGzipped() {
    return gzipped;
    }
    }
  • 第 21 至 31 行 :读取缓存。

    • 第 21 至 28 行 :先读取 readOnlyCacheMap 。读取不到,读取 readWriteCacheMap ,并设置到 readOnlyCacheMap
    • 第 29 至 31 行 :读取 readWriteCacheMap
    • readWriteCacheMap 实现代码如下:

      this.readWriteCacheMap =
      CacheBuilder.newBuilder().initialCapacity(1000)
      .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
      .removalListener(new RemovalListener<Key, Value>() {
      @Override
      public void onRemoval(RemovalNotification<Key, Value> notification) {
      // TODO[0009]:RemoteRegionRegistry
      Key removedKey = notification.getKey();
      if (removedKey.hasRegions()) {
      Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
      regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
      }
      }
      })
      .build(new CacheLoader<Key, Value>() {
      @Override
      public Value load(Key key) throws Exception {
      // // TODO[0009]:RemoteRegionRegistry
      if (key.hasRegions()) {
      Key cloneWithNoRegions = key.cloneWithoutRegions();
      regionSpecificKeys.put(cloneWithNoRegions, key);
      }
      Value value = generatePayload(key);
      return value;
      }
      });
      • readWriteCacheMap 最大缓存数量为 1000 。
      • 调用 #generatePayload(key) 方法,生成缓存值。
  • #generatePayload(key) 方法,实现代码如下:

    1: private Value generatePayload(Key key) {
    2: Stopwatch tracer = null;
    3: try {
    4: String payload;
    5: switch (key.getEntityType()) {
    6: case Application:
    7: boolean isRemoteRegionRequested = key.hasRegions();
    8:
    9: if (ALL_APPS.equals(key.getName())) {
    10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
    11: tracer = serializeAllAppsWithRemoteRegionTimer.start();
    12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
    13: } else {
    14: tracer = serializeAllAppsTimer.start();
    15: payload = getPayLoad(key, registry.getApplications());
    16: }
    17: } else if (ALL_APPS_DELTA.equals(key.getName())) {
    18: // ... 省略增量获取相关的代码
    19: } else {
    20: tracer = serializeOneApptimer.start();
    21: payload = getPayLoad(key, registry.getApplication(key.getName()));
    22: }
    23: break;
    24: // ... 省略部分代码
    25: }
    26: return new Value(payload);
    27: } finally {
    28: if (tracer != null) {
    29: tracer.stop();
    30: }
    31: }
    32: }
    • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
    • 第 13 至 16 行 :调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合。后调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值。🙂 这两个方法代码较多,下面详细解析。
    • 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.1 获得注册的应用集合

调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合,实现代码如下:

1: // AbstractInstanceRegistry.java
2:
3: private static final String[] EMPTY_STR_ARRAY = new String[0];
4:
5: public Applications getApplications() {
6: boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
7: if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry
8: return getApplicationsFromLocalRegionOnly();
9: } else {
10: return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
11: }
12: }
13:
14: public Applications getApplicationsFromLocalRegionOnly() {
15: return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
16: }
  • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
  • 第 9 至 16 行 :调用 #getApplicationsFromMultipleRegions(...) 方法,获得注册的应用集合,实现代码如下:

    1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    2: // TODO[0009]:RemoteRegionRegistry
    3: boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    4: logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
    5: includeRemoteRegion, Arrays.toString(remoteRegions));
    6: if (includeRemoteRegion) {
    7: GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
    8: } else {
    9: GET_ALL_CACHE_MISS.increment();
    10: }
    11: // 获得获得注册的应用集合
    12: Applications apps = new Applications();
    13: apps.setVersion(1L);
    14: for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
    15: Application app = null;
    16:
    17: if (entry.getValue() != null) {
    18: for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
    19: Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
    20: if (app == null) {
    21: app = new Application(lease.getHolder().getAppName());
    22: }
    23: app.addInstance(decorateInstanceInfo(lease));
    24: }
    25: }
    26: if (app != null) {
    27: apps.addApplication(app);
    28: }
    29: }
    30: // TODO[0009]:RemoteRegionRegistry
    31: if (includeRemoteRegion) {
    32: for (String remoteRegion : remoteRegions) {
    33: RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
    34: if (null != remoteRegistry) {
    35: Applications remoteApps = remoteRegistry.getApplications();
    36: for (Application application : remoteApps.getRegisteredApplications()) {
    37: if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
    38: logger.info("Application {} fetched from the remote region {}",
    39: application.getName(), remoteRegion);
    40:
    41: Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
    42: if (appInstanceTillNow == null) {
    43: appInstanceTillNow = new Application(application.getName());
    44: apps.addApplication(appInstanceTillNow);
    45: }
    46: for (InstanceInfo instanceInfo : application.getInstances()) {
    47: appInstanceTillNow.addInstance(instanceInfo);
    48: }
    49: } else {
    50: logger.debug("Application {} not fetched from the remote region {} as there exists a "
    51: + "whitelist and this app is not in the whitelist.",
    52: application.getName(), remoteRegion);
    53: }
    54: }
    55: } else {
    56: logger.warn("No remote registry available for the remote region {}", remoteRegion);
    57: }
    58: }
    59: }
    60: // 设置 应用集合 hashcode
    61: apps.setAppsHashCode(apps.getReconcileHashCode());
    62: return apps;
    63: }
    • 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
    • 第 11 至 29 行 :获得获得注册的应用集合。
    • 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
    • 第 61 行 :计算应用集合 hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.2 转换成缓存值

调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值,实现代码如下:

/**
* Generate pay load with both JSON and XML formats for all applications.
*/
private String getPayLoad(Key key, Applications apps) {
// 获得编码器
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
// 编码
result = encoderWrapper.encode(apps);
} catch (Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
}
if(logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
}
return result;
}

3.4 主动过期读写缓存

应用实例注册、下线、过期时,调用 ResponseCacheImpl#invalidate() 方法,主动过期读写缓存( readWriteCacheMap ),实现代码如下:

public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
  • 调用 #invalidate(keys) 方法,逐个过期每个缓存键值,实现代码如下:

    public void invalidate(Key... keys) {
    for (Key key : keys) {
    logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    // 过期读写缓存
    readWriteCacheMap.invalidate(key);
    // TODO[0009]:RemoteRegionRegistry
    Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
    if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
    for (Key keysWithRegion : keysWithRegions) {
    logger.debug("Invalidating the response cache key : {} {} {} {} {}",
    key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    readWriteCacheMap.invalidate(keysWithRegion);
    }
    }
    }
    }

3.5 被动过期读写缓存

读写缓存( readWriteCacheMap ) 写入后,一段时间自动过期,实现代码如下:

expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
  • 配置 eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

3.6 定时刷新只读缓存

定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。实现代码如下:

1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
2: // ... 省略无关代码
3:
4: long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
5: // ... 省略无关代码
6:
7: if (shouldUseReadOnlyResponseCache) {
8: timer.schedule(getCacheUpdateTask(),
9: new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
10: + responseCacheUpdateIntervalMs),
11: responseCacheUpdateIntervalMs);
12: }
13:
14: // ... 省略无关代码
15: }
16:
17: private TimerTask getCacheUpdateTask() {
18: return new TimerTask() {
19: @Override
20: public void run() {
21: logger.debug("Updating the client cache from response cache");
22: for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键
23: if (logger.isDebugEnabled()) {
24: Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
25: logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
26: }
27: try {
28: CurrentRequestVersion.set(key.getVersion());
29: Value cacheValue = readWriteCacheMap.get(key);
30: Value currentCacheValue = readOnlyCacheMap.get(key);
31: if (cacheValue != currentCacheValue) { // 不一致时,进行替换
32: readOnlyCacheMap.put(key, cacheValue);
33: }
34: } catch (Throwable th) {
35: logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
36: }
37: }
38: }
39: };
40: }
  • 第 7 至 12 行 :初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒。
  • 第 17 至 39 行 :创建定时任务。
    • 第 22 行 :循环 readOnlyCacheMap 的缓存键。为什么不循环 readWriteCacheMapreadOnlyCacheMap 的缓存过期依赖 readWriteCacheMap,因此缓存键会更多。
    • 第 28 行 至 33 行 :对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

666. 彩蛋

比预期,比想想,长老多老多的一篇文章。细思极恐。

估计下一篇增量获取会简洁很多。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. Eureka-Client 发起全量获取
    1. 2.1. 2.1 初始化全量获取
    2. 2.2. 2.2 定时获取
    3. 2.3. 2.3 刷新注册信息缓存
    4. 2.4. 2.4 发起获取注册信息
      1. 2.4.1. 2.4.1 全量获取注册信息,并设置到本地缓存
  3. 3. 3. Eureka-Server 接收全量获取
    1. 3.1. 3.1 接收全量获取请求
    2. 3.2. 3.2 响应缓存 ResponseCache
      1. 3.2.1. 3.2.1 缓存键
      2. 3.2.2. 3.2.2 响应缓存实现类
    3. 3.3. 3.3 缓存读取
      1. 3.3.1. 3.3.1 获得注册的应用集合
      2. 3.3.2. 3.3.2 转换成缓存值
    4. 3.4. 3.4 主动过期读写缓存
    5. 3.5. 3.5 被动过期读写缓存
    6. 3.6. 3.6 定时刷新只读缓存
  4. 4. 666. 彩蛋