⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Eureka/transport/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Eureka 的网络通信部分。在不考虑 Eureka 2.x 的兼容的情况下,Eureka 1.x 主要两部分的网络通信:

  • Eureka-Client 请求 Eureka-Server 的网络通信
  • Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信

本文涉及类在 com.netflix.discovery.shared.transport 包下,涉及到主体类的类图如下( 打开大图 ):

  • 粉色部分 —— EurekaJerseyClient ,对基于 Jersey Server 的 Eureka-Server 的 Jersey 客户端封装。
  • 绿色部分 —— EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法。如果把 DiscoveryClient 类比成 Service ,那么 EurekaHttpClient 可以类比城 Dao 。
  • 综色部分 —— EurekaHttpClient 实现类,真正实现了具体的 Eureka-Server API 调用方法。
  • 红色部分 —— EurekaHttpClient 委托类,提供了会话、重试、重定向、监控指标收集等特性。
  • 黄色部分 —— EurekaHttpClientFactory,用于创建 EurekaHttpClient 。

类图看起来很复杂,整体调用关系如下( 打开大图 ):

OK ,我们逐层解析,嗨起来。

推荐 Spring Cloud 书籍

2. EurekaHttpClient

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient ,EurekaHttpClient 接口。接口代码如下:

public interface EurekaJerseyClient {

ApacheHttpClient4 getClient();

void destroyResources();
}

  • com.sun.jersey.client.apache4.ApacheHttpClient4 ,基于 Apache HttpClient4 实现的 Jersey Client 。

2.1 EurekaJerseyClientImpl

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl ,EurekaHttpClient 实现类。实现代码如下:

public class EurekaJerseyClientImpl implements EurekaJerseyClient {

/**
* 基于 Apache HttpClient4 实现的 Jersey Client
*/
private final ApacheHttpClient4 apacheHttpClient;
/**
* Apache HttpClient 空闲连接清理器
*/
private final ApacheHttpClientConnectionCleaner apacheHttpClientConnectionCleaner;

/**
* Jersey Client 配置
*/
ClientConfig jerseyClientConfig;

public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
ClientConfig clientConfig) {
try {
jerseyClientConfig = clientConfig;
// 创建 ApacheHttpClient
apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);

// 设置 连接参数
HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();
HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
HttpConnectionParams.setSoTimeout(params, readTimeout);

// 创建 ApacheHttpClientConnectionCleaner
this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
} catch (Throwable e) {
throw new RuntimeException("Cannot create Jersey client", e);
}
}

@Override
public ApacheHttpClient4 getClient() {
return apacheHttpClient;
}

@Override
public void destroyResources() {
apacheHttpClientConnectionCleaner.shutdown();
apacheHttpClient.destroy();
}
}

  • com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner ,Apache HttpClient 空闲连接清理器,负责周期性关闭处于 half-close 状态的空闲连接。点击 链接 查看带中文注释的 ApacheHttpClientConnectionCleaner。推荐阅读:《HttpClient容易忽视的细节——连接关闭》

2.2 EurekaJerseyClientBuilder

EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。

调用 #build() 方法,创建 EurekaJerseyClientImpl ,实现代码如下:

// EurekaJerseyClientBuilder.java
public EurekaJerseyClient build() {
MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
try {
return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
} catch (Throwable e) {
throw new RuntimeException("Cannot create Jersey client ", e);
}
}

  • MyDefaultApacheHttpClient4Config ,继承自 com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config ,实现自定义配置。点击 链接 查看带中文注释的 MyDefaultApacheHttpClient4Config。例如 :
    • 自定义的请求、响应的编解码器 com.netflix.discovery.provider.DiscoveryJerseyProvider
    • 禁用重定向,使用 RedirectingEurekaHttpClient 实现该特性。
    • 自定义 UserAgent 。
    • 自定义 Http Proxy 。
    • SSL 功能的增强。ApacheHttpClient4 使用的是 Apache HttpClient 4.1.1 版本,com.netflix.discovery.shared.transport.jersey.SSLSocketFactoryAdapter 将 Apache HttpClient 4.3.4 对 SSL 功能的增强适配到老版本 API 。点击 链接 查看带中文注释的 SSLSocketFactoryAdapter。

3. EurekaHttpClient

com.netflix.discovery.shared.transport.EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法 。点击 链接 查看带中文注释的 EurekaHttpClient。

3.1 EurekaHttpResponse

com.netflix.discovery.shared.transport.EurekaHttpResponse ,请求响应对象,实现代码如下:

public class EurekaHttpResponse<T> {

/**
* 返回状态码
*/
private final int statusCode;
/**
* 返回对象( Entity )
*/
private final T entity;
/**
* 返回 header
*/
private final Map<String, String> headers;
/**
* 重定向地址
*/
private final URI location;

// ... 省略 setting / getting 和 Builder
}

3.2 TransportClientFactory

com.netflix.discovery.shared.transport.TransportClientFactory ,创建 EurekaHttpClient 的工厂接口。接口代码如下:

public interface TransportClientFactory {

/**
* 创建 EurekaHttpClient
*
* @param serviceUrl Eureka-Server 地址
* @return EurekaHttpClient
*/
EurekaHttpClient newClient(EurekaEndpoint serviceUrl);

/**
* 关闭工厂
*/
void shutdown();

}

大多数 EurekaHttpClient 实现类都有其对应的工厂实现类

4. AbstractJerseyEurekaHttpClient

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient ,实现 EurekaHttpClient 的抽象类真正实现了具体的 Eureka-Server API 调用方法。实现代码如下:

 1: public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
2:
3: private static final Logger logger = LoggerFactory.getLogger(AbstractJerseyEurekaHttpClient.class);
4:
5: /**
6: * Jersey Client
7: */
8: protected final Client jerseyClient;
9: /**
10: * 请求的 Eureka-Server 地址
11: */
12: protected final String serviceUrl;
13:
14: protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) {
15: this.jerseyClient = jerseyClient;
16: this.serviceUrl = serviceUrl;
17: logger.debug("Created client for url: {}", serviceUrl);
18: }
19:
20: @Override
21: public EurekaHttpResponse<Void> register(InstanceInfo info) {
22: // 设置 请求地址
23: String urlPath = "apps/" + info.getAppName();
24: ClientResponse response = null;
25: try {
26: Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
27: // 设置 请求头
28: addExtraHeaders(resourceBuilder);
29: // 请求 Eureka-Server
30: response = resourceBuilder
31: .header("Accept-Encoding", "gzip") // GZIP
32: .type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON
33: .accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON
34: .post(ClientResponse.class, info); // 请求参数
35: // 创建 EurekaHttpResponse
36: return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
37: } finally {
38: if (logger.isDebugEnabled()) {
39: logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
40: response == null ? "N/A" : response.getStatus());
41: }
42: if (response != null) {
43: response.close();
44: }
45: }
46: }

  • jerseyClient 属性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(...) 方法,获取 ApacheHttpClient4 。
  • serviceUrl 属性,请求的 Eureka-Server 地址。
  • #register() 方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似
    • 第 22 至 26 行 :设置请求地址。

    • 第 28 行 :调用 #addExtraHeaders(...) 方法,设置请求头( header )。该方法是抽象方法,提供子类实现自定义的请求头。代码如下:

      protected abstract void addExtraHeaders(Builder webResource);

      • x
    • 第 29 至 34 行 :请求 Eureka-Server 。

    • 第 35 至 36 行 :解析响应结果,创建 EurekaHttpResponse 。

4.1 JerseyApplicationClient

com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient ,实现 Eureka-Client 请求 Eureka-Server 的网络通信。点击 链接 查看带中文注释的 JerseyApplicationClient。

4.1.1 JerseyEurekaHttpClientFactory

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory ,创建 JerseyApplicationClient 的工厂类。实现代码如下:

public class JerseyEurekaHttpClientFactory implements TransportClientFactory {

private final EurekaJerseyClient jerseyClient;
private final ApacheHttpClient4 apacheClient;
private final ApacheHttpClientConnectionCleaner cleaner;
private final Map<String, String> additionalHeaders;

public JerseyEurekaHttpClientFactory(ApacheHttpClient4 apacheClient, long connectionIdleTimeout, Map<String, String> additionalHeaders) {
this(null, apacheClient, connectionIdleTimeout, additionalHeaders);
}

private JerseyEurekaHttpClientFactory(EurekaJerseyClient jerseyClient,
ApacheHttpClient4 apacheClient,
long connectionIdleTimeout,
Map<String, String> additionalHeaders) {
this.jerseyClient = jerseyClient;
this.apacheClient = jerseyClient != null ? jerseyClient.getClient() : apacheClient;
this.additionalHeaders = additionalHeaders;
this.cleaner = new ApacheHttpClientConnectionCleaner(this.apacheClient, connectionIdleTimeout);
}

@Override
public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
return new JerseyApplicationClient(apacheClient, endpoint.getServiceUrl(), additionalHeaders);
}

@Override
public void shutdown() {
cleaner.shutdown();
if (jerseyClient != null) {
jerseyClient.destroyResources();
} else {
apacheClient.destroy();
}
}
}

4.1.2 JerseyEurekaHttpClientFactoryBuilder

JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。点击 链接 查看带中文注释的 JerseyEurekaHttpClientFactory。

调用 JerseyEurekaHttpClientFactory#create(...) 方法,创建 JerseyEurekaHttpClientFactory ,实现代码如下:

public static JerseyEurekaHttpClientFactory create(EurekaClientConfig clientConfig,
Collection<ClientFilter> additionalFilters,
InstanceInfo myInstanceInfo,
AbstractEurekaIdentity clientIdentity) {
JerseyEurekaHttpClientFactoryBuilder clientBuilder = newBuilder()
.withAdditionalFilters(additionalFilters) // 客户端附加过滤器
.withMyInstanceInfo(myInstanceInfo) // 应用实例
.withUserAgent("Java-EurekaClient") // UA
.withClientConfig(clientConfig)
.withClientIdentity(clientIdentity);

// 设置 Client Name
if ("true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) {
clientBuilder.withClientName("DiscoveryClient-HTTPClient-System").withSystemSSLConfiguration();
} else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) {
clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient")
.withProxy(
clientConfig.getProxyHost(), Integer.parseInt(clientConfig.getProxyPort()),
clientConfig.getProxyUserName(), clientConfig.getProxyPassword()
); // http proxy
} else {
clientBuilder.withClientName("DiscoveryClient-HTTPClient");
}

return clientBuilder.build();
}

public static JerseyEurekaHttpClientFactoryBuilder newBuilder() {
return new JerseyEurekaHttpClientFactoryBuilder().withExperimental(false);
}

4.2 JerseyReplicationClient

com.netflix.eureka.transport.JerseyReplicationClient ,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。

4.2.1 没有工厂

JerseyReplicationClient 没有专属的工厂

调用 JerseyReplicationClient#createReplicationClient(...) 静态方法,创建 JerseyReplicationClient 。点击 链接 查看带中文注释的方法代码。

5. EurekaHttpClientDecorator

com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator,EurekaHttpClient 委托者抽象类。实现代码如下:

public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {

/**
* 执行请求
*
* @param requestExecutor 请求执行器
* @param <R> 请求泛型
* @return 响应
*/
protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor);

@Override
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
return execute(new RequestExecutor<Void>() {
@Override
public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
return delegate.register(info);
}

@Override
public RequestType getRequestType() {
return RequestType.Register;
}
});
}

}

  • #execute(...) 抽象方法,子类实现该方法,实现自己的特性。

  • #register() 方法,实现向 Eureka-Server 注册应用实例。其他方法代码类似

  • RequestType ,请求类型枚举类。代码如下:

    // EurekaHttpClientDecorator.java
    public enum RequestType {
    Register,
    Cancel,
    SendHeartBeat,
    StatusUpdate,
    DeleteStatusOverride,
    GetApplications,
    GetDelta,
    GetVip,
    GetSecureVip,
    GetApplication,
    GetInstance,
    GetApplicationInstance
    }

  • RequestExecutor ,请求执行器接口。接口代码如下:

    // EurekaHttpClientDecorator.java
    public interface RequestExecutor<R> {

    /**
    * 执行请求
    *
    * @param delegate 委托的 EurekaHttpClient
    * @return 响应
    */
    EurekaHttpResponse<R> execute(EurekaHttpClient delegate);

    /**
    * @return 请求类型
    */
    RequestType getRequestType();
    }


EurekaHttpClientDecorator 的每个实现类实现一个特性,代码非常非常非常清晰。

FROM 《委托模式》
委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。

我们在上图的基础上,增加委托的关系,如下图( 打开大图 ):

  • 请注意,每个委托着实现类,上面可能有类型为 EurekaHttpClientFactory 的属性,用于创建其委托的 EurekaHttpClient 。为什么会有 Factory ?例如,RetryableEurekaHttpClient 重试请求多个 Eureka-Server 地址时,每个 Eureka-Server 地址会创建一个 EurekaHttpClient 。所以,下文涉及到 EurekaHttpClientFactory 和委托的 EurekaHttpClient 的地方,你都需要仔细理解。

5.1 MetricsCollectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient ,监控指标收集 EurekaHttpClient ,配合 Netflix Servo 实现监控信息采集。

#execute() 方法,代码如下:

 1: @Override
2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
3: // 获得 请求类型 的 请求指标
4: EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
5: Stopwatch stopwatch = requestMetrics.latencyTimer.start();
6: try {
7: // 执行请求
8: EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
9: // 增加 请求指标
10: requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
11: return httpResponse;
12: } catch (Exception e) {
13: requestMetrics.connectionErrors.increment();
14: exceptionsMetric.count(e);
15: throw e;
16: } finally {
17: stopwatch.stop();
18: }
19: }

  • 第 10 行 :调用 RequestExecutor#execute(...) 方法,继续执行请求。
    • delegate 属性,对应 JerseyApplicationClient 。

5.2 RedirectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。

#execute() 方法,代码如下:

 1: @Override
2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
3: EurekaHttpClient currentEurekaClient = delegateRef.get();
4: if (currentEurekaClient == null) { // 未找到非 302 的 Eureka-Server
5: AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
6: try {
7: EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
8: // 关闭原有的委托 EurekaHttpClient ,并设置当前成功非 302 请求的 EurekaHttpClient
9: TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
10: return response;
11: } catch (Exception e) {
12: logger.error("Request execution error", e);
13: TransportUtils.shutdown(currentEurekaClientRef.get());
14: throw e;
15: }
16: } else { // 已经找到非 302 的 Eureka-Server
17: try {
18: return requestExecutor.execute(currentEurekaClient);
19: } catch (Exception e) {
20: logger.error("Request execution error", e);
21: delegateRef.compareAndSet(currentEurekaClient, null);
22: currentEurekaClient.shutdown();
23: throw e;
24: }
25: }
26: }

  • 注意:和我们理解的常规的 302 状态返回处理不同!!!
  • 整个分成两部分:【第 4 至 15 行】、【第 16 至 24 行】。
    • 前者,意味着未找到非返回 302 状态码的 Eureka-Server ,此时通过在原始传递进来的 serviceUrls 执行请求,寻找非 302 状态码返回的 Eureka-Server。
      • 当返回非 302 状态码时,找到非返回 302 状态码的 Eureka-Server 。
      • 当返回 302 状态码时,向新的重定向的 Eureka-Server 执行请求直到成功找到或超过最大次数。
    • 后者,意味着当前已经找到非返回 302 状态码的 Eureka-Server ,直接执行请求。注意 :此时 Eureka-Server 再返回 302 状态码,不再处理。
    • 目前 Eureka 1.x 的 Eureka-Server 不存在返回 302 状态码,猜测和 Eureka 2.X TODO[0028]:写入集群和读取集群 有关。
  • 【前者】第 5 行 :使用初始的 serviceEndpoint ( 相当于 serviceUrls ) 创建委托 EurekaHttpClient 。
  • 【前者】第 7 行 :调用 #executeOnNewServer(...) 方法,通过执行请求的方式,寻找非 302 状态码返回的 Eureka-Server。实现代码,点击 链接 查看带中文注释的代码实现。
  • 【前者】【前者】第 9 行 :关闭原有的 delegateRef ( 因为此处可能存在并发,多个线程都找到非 302 状态码返回的 Eureka-Server ),并设置当前成功非 302 请求的 EurekaHttpClient 到 delegateRef
  • 【前者】第 13 行 :关闭 currentEurekaClientRef ,当请求发生异常或者超过最大重定向次数。
  • 【后者】第 18 行 :意味着当前已经找到非返回 302 状态码的 Eureka-Server ,直接执行请求。
  • 【后者】第 21 至 22 行 :执行请求发生异常,关闭 currentEurekaClient ,后面要重新非返回 302 状态码的 Eureka-Server 。

5.2.1 工厂

RedirectingEurekaHttpClient 提供 #createFactory(...) 静态方法获得创建其的工厂,点击 链接 查看。

5.3 RetryableEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient ,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。

#execute() 方法,代码如下:

 1: @Override
2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
3: List<EurekaEndpoint> candidateHosts = null;
4: int endpointIdx = 0;
5: for (int retry = 0; retry < numberOfRetries; retry++) {
6: EurekaHttpClient currentHttpClient = delegate.get();
7: EurekaEndpoint currentEndpoint = null;
8:
9: // 当前委托的 EurekaHttpClient 不存在
10: if (currentHttpClient == null) {
11: // 获得候选的 Eureka-Server 地址数组
12: if (candidateHosts == null) {
13: candidateHosts = getHostCandidates();
14: if (candidateHosts.isEmpty()) {
15: throw new TransportException("There is no known eureka server; cluster server list is empty");
16: }
17: }
18:
19: // 超过候选的 Eureka-Server 地址数组上限
20: if (endpointIdx >= candidateHosts.size()) {
21: throw new TransportException("Cannot execute request on any known server");
22: }
23:
24: // 创建候选的 EurekaHttpClient
25: currentEndpoint = candidateHosts.get(endpointIdx++);
26: currentHttpClient = clientFactory.newClient(currentEndpoint);
27: }
28:
29: try {
30: // 执行请求
31: EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
32: // 判断是否为可接受的相应,若是,返回。
33: if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
34: delegate.set(currentHttpClient);
35: if (retry > 0) {
36: logger.info("Request execution succeeded on retry #{}", retry);
37: }
38: return response;
39: }
40: logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
41: } catch (Exception e) {
42: logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
43: }
44:
45: // 请求失败,若是 currentHttpClient ,清除 delegate
46: // Connection error or 5xx from the server that must be retried on another server
47: delegate.compareAndSet(currentHttpClient, null);
48:
49: // 请求失败,将 currentEndpoint 添加到隔离集合
50: if (currentEndpoint != null) {
51: quarantineSet.add(currentEndpoint);
52: }
53: }
54: throw new TransportException("Retry limit reached; giving up on completing the request");
55: }

  • 第 10 行 :当前 currentHttpClient 不存在,意味着原有 delegate 不存在向 Eureka-Server 成功请求的 EurekaHttpClient 。

    • 此时需要从配置中的 Eureka-Server 数组重试请求,获得可以请求的 Eureka-Server 。
    • 如果已经存在请求成功的 delegate ,直接使用它进行执行请求。
  • 第 11 至 17 行 :调用 #getHostCandidates() 方法,获得候选的 Eureka-Server serviceUrls 数组。实现代码如下:

     1: private List<EurekaEndpoint> getHostCandidates() {
    2: // 获得候选的 Eureka-Server 地址数组
    3: List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
    4:
    5: // 保留交集(移除 quarantineSet 不在 candidateHosts 的元素)
    6: quarantineSet.retainAll(candidateHosts);
    7:
    8: // 在保证最小可用的候选的 Eureka-Server 地址数组,移除在隔离集合内的元素
    9: // If enough hosts are bad, we have no choice but start over again
    10: int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); // 0.66
    11: if (quarantineSet.isEmpty()) {
    12: // no-op
    13: } else if (quarantineSet.size() >= threshold) {
    14: logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
    15: quarantineSet.clear();
    16: } else {
    17: List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
    18: for (EurekaEndpoint endpoint : candidateHosts) {
    19: if (!quarantineSet.contains(endpoint)) {
    20: remainingHosts.add(endpoint);
    21: }
    22: }
    23: candidateHosts = remainingHosts;
    24: }
    25:
    26: return candidateHosts;
    27: }

    • 第 3 行 :调用 ClusterResolver#getClusterEndpoints() 方法,获得候选的 Eureka-Server 地址数组( candidateHosts )。注意:该方法返回的 Eureka-Server 地址数组,使用以本机 IP 为随机种子,达到不同 IP 的应用实例获得的数组顺序不同,而相同 IP 的应用实例获得的数组顺序一致,效果类似基于 IP HASH 的负载均衡算法。实现该功能的代码,在 《Eureka 源码解析 —— EndPoint 与 解析器》搜索关键字【ResolverUtils#randomize(...)】 详细解析。
    • 第 6 行 :调用 Set#retainAll() 方法,移除隔离的故障 Eureka-Server 地址数组( quarantineSet ) 中不在 candidateHosts 的元素。
    • 第 8 至 24 行 :在保证最小可用的 candidateHosts,移除在 quarantineSet 的元素。
      • 第 10 行 :最小可用的阀值,配置 eureka.retryableClientQuarantineRefreshPercentage 来设置百分比,默认值:0.66
      • 最 13 至 15 行 :quarantineSet 数量超过阀值,清空 quarantineSet ,全部 candidateHosts 重试。
      • 第 17 至 24 行 :quarantineSet 数量未超过阀值,移除 candidateHosts 中在 quarantineSet 的元素。
  • 第 19 至 22 行 :超过 candidateHosts 上限,全部 Eureka-Server 请求失败,抛出异常。

  • 第 24 至 26 行 :创建委托的 EurekaHttpClient ,用于下面请求执行。

  • 第 31 行 :执行请求。

  • 第 33 行 :调用 ServerStatusEvaluator#accept() 方法,判断响应状态码和请求类型是否能够接受。实现代码如下:

    // ServerStatusEvaluators.java
    private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
    @Override
    public boolean accept(int statusCode, RequestType requestType) {
    if (statusCode >= 200 && statusCode < 300 || statusCode == 302) {
    return true;
    } else if (requestType == RequestType.Register && statusCode == 404) { // 注册,404 可接受
    return true;
    } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) { // 心跳,404 可接受
    return true;
    } else if (requestType == RequestType.Cancel) { // cancel is best effort 下线,接受全部
    return true;
    } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) { // 增量获取注册信息,403 404 可接受
    return true;
    }
    return false;
    }
    };

  • 第 34 行 :请求成功,设置 delegate 。下次请求,优先使用 delegate ,失败才进行候选的 Eureka-Server 地址数组重试。

  • 第 47 行 :请求失败,delegate 若等于 currentHttpClient ,进行清除。

  • 第 50 至 52 行 :请求失败,将请求的 Eureka-Server 地址添加到 quarantineSet

  • 总结来说:

    • 【第一步】若当前有请求成功的 EurekaHttpClient ,继续使用。若请求失败,执行【第二步】。
    • 【第二步】若当前无请求成功的 EurekaHttpClient ,获取候选的 Eureka-Server 地址数组顺序创建新的 EurekaHttpClient,直到成功,或者超过最大重试次数。当请求成功,保存该 EurekaHttpClient ,下次继续使用,直到请求失败。

5.3.1 工厂

RetryableEurekaHttpClient 提供 #createFactory(...) 静态方法获得创建其的工厂,点击 链接 查看。

5.4 SessionedEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient ,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。

#execute(...) ,代码如下:

 1: @Override
2: protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
3: long now = System.currentTimeMillis();
4: long delay = now - lastReconnectTimeStamp;
5:
6: // 超过 当前会话时间,关闭当前委托的 EurekaHttpClient 。
7: if (delay >= currentSessionDurationMs) {
8: logger.debug("Ending a session and starting anew");
9: lastReconnectTimeStamp = now;
10: currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
11: TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
12: }
13:
14: // 获得委托的 EurekaHttpClient 。若不存在,则创建新的委托的 EurekaHttpClient 。
15: EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
16: if (eurekaHttpClient == null) {
17: eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
18: }
19: return requestExecutor.execute(eurekaHttpClient);
20: }

  • 第 7 至 12 行 :超过当前会话时间,关闭当前委托的 EurekaHttpClient 。

    • 第 10 行 :调用 #randomizeSessionDuration(...) 方法,计算计算下一次会话超时时长,公式为 sessionDurationMs * (0.5, 1.5) ,代码如下:

      protected long randomizeSessionDuration(long sessionDurationMs) {
      long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5));
      return sessionDurationMs + delta;
      }

      • 增加会话过期的随机性,实现所有 Eureka-Client 的会话过期重连的发生时间更加离散,避免集中时间过期。目前猜测这么做的目的和 TODO[0028]:写入集群和读取集群 有关,即返回 302 。关联 1.x new transport enhancements
  • 第 15 至 18 行 :获得委托的 EurekaHttpClient 。若不存在,创建新的委托的 EurekaHttpClient 。TransportUtils#getOrSetAnotherClient(...) 方法代码如下:

     1: public static EurekaHttpClient getOrSetAnotherClient(AtomicReference<EurekaHttpClient> eurekaHttpClientRef, EurekaHttpClient another) {
    2: EurekaHttpClient existing = eurekaHttpClientRef.get();
    3: // 为空才设置
    4: if (eurekaHttpClientRef.compareAndSet(null, another)) {
    5: return another;
    6: }
    7: // 设置失败,意味着另外一个线程已经设置
    8: another.shutdown();
    9: return existing;
    10: }

    • 该方法实现,获得 eurekaHttpClientRef 里的 EurekaHttpClient 。若获取不到,将 another 设置到 eurekaHttpClientRef 。当有多个线程设置时,有且只有一个线程设置成功,另外的设置失败的线程们,意味着当前 eurekaHttpClientRef 有 EurekaHttpClient ,返回 eurekaHttpClientRef

    • 目前该方法存在 BUG ,失败的线程直接返回 existing 的是 null ,需要修改成 return eurekaHttpClientRef.get() 。模拟重现该 BUG 代码如下 :

  • 第 19 行 :执行请求。

5.4.1 没有工厂

在 SessionedEurekaHttpClient 类里,没有实现创建其的工厂。在 「6. 创建网络通讯客户端」搜索 canonicalClientFactory ,可以看到 EurekaHttpClients#canonicalClientFactory(...) 方法,内部有 SessionedEurekaHttpClient 的创建工厂。

6. 创建网络通讯客户端

对于 Eureka-Server 来说,调用 JerseyReplicationClient#createReplicationClient(...) 静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。

对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient )查询注册信息( newQueryClient )两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建,代码如下:

// DiscoveryClient.class
1: private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
2: AbstractDiscoveryClientOptionalArgs args) {
3:
4: Collection<?> additionalFilters = args == null
5: ? Collections.emptyList()
6: : args.additionalFilters;
7:
8: EurekaJerseyClient providedJerseyClient = args == null
9: ? null
10: : args.eurekaJerseyClient;
11:
12: TransportClientFactories argsTransportClientFactories = null;
13: if (args != null && args.getTransportClientFactories() != null) {
14: argsTransportClientFactories = args.getTransportClientFactories();
15: }
16:
17: // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
18: @SuppressWarnings("rawtypes")
19: TransportClientFactories transportClientFactories = argsTransportClientFactories == null
20: ? new Jersey1TransportClientFactories()
21: : argsTransportClientFactories;
22:
23: // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
24: // noinspection unchecked
25: eurekaTransport.transportClientFactory = providedJerseyClient == null
26: ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
27: : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
28:
29: // (省略代码)初始化 应用解析器的应用实例数据源 TODO[0028]写入集群和读取集群
30:
31: // (省略代码)创建 EndPoint 解析器
32: eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(...)
33:
34: if (clientConfig.shouldRegisterWithEureka()) {
35: EurekaHttpClientFactory newRegistrationClientFactory = null;
36: EurekaHttpClient newRegistrationClient = null;
37: try {
38: newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
39: eurekaTransport.bootstrapResolver,
40: eurekaTransport.transportClientFactory,
41: transportConfig
42: );
43: newRegistrationClient = newRegistrationClientFactory.newClient();
44: } catch (Exception e) {
45: logger.warn("Transport initialization failure", e);
46: }
47: eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
48: eurekaTransport.registrationClient = newRegistrationClient;
49: }
50:
51: // new method (resolve from primary servers for read)
52: // Configure new transport layer (candidate for injecting in the future)
53: if (clientConfig.shouldFetchRegistry()) {
54: EurekaHttpClientFactory newQueryClientFactory = null;
55: EurekaHttpClient newQueryClient = null;
56: try {
57: newQueryClientFactory = EurekaHttpClients.queryClientFactory(
58: eurekaTransport.bootstrapResolver,
59: eurekaTransport.transportClientFactory,
60: clientConfig,
61: transportConfig,
62: applicationInfoManager.getInfo(),
63: applicationsSource
64: );
65: newQueryClient = newQueryClientFactory.newClient();
66: } catch (Exception e) {
67: logger.warn("Transport initialization failure", e);
68: }
69: eurekaTransport.queryClientFactory = newQueryClientFactory;
70: eurekaTransport.queryClient = newQueryClient;
71: }
72: }

  • 第 18 至 27 行 :调用 Jersey1TransportClientFactories#newTransportClientFactory(...) 方法,创建 registrationClientqueryClient 公用的委托的 EurekaHttpClientFactory ,代码如下:

    // Jersey1TransportClientFactories.java
    public TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfig,
    final Collection<ClientFilter> additionalFilters,
    final InstanceInfo myInstanceInfo) {
    // JerseyEurekaHttpClientFactory
    final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create(
    clientConfig,
    additionalFilters,
    myInstanceInfo,
    new EurekaClientIdentity(myInstanceInfo.getIPAddr())
    );

    // TransportClientFactory
    final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory); // 委托 TransportClientFactory

    return new TransportClientFactory() {
    @Override
    public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) {
    return metricsFactory.newClient(serviceUrl);
    }

    @Override
    public void shutdown() {
    metricsFactory.shutdown();
    jerseyFactory.shutdown();
    }
    };
    }

    • 在 TransportClientFactory 里委托 JerseyEurekaHttpClientFactory 。
  • 第 34 至 49 行 :调用 EurekaHttpClients#registrationClientFactory(...) 方法,创建 registrationClient 的 EurekaHttpClientFactory ,代码如下 :

    // EurekaHttpClients.java
    public static EurekaHttpClientFactory registrationClientFactory(ClusterResolver bootstrapResolver,
    TransportClientFactory transportClientFactory,
    EurekaTransportConfig transportConfig) {
    return canonicalClientFactory(EurekaClientNames.REGISTRATION, transportConfig, bootstrapResolver, transportClientFactory);
    }

    static EurekaHttpClientFactory canonicalClientFactory(final String name,
    final EurekaTransportConfig transportConfig,
    final ClusterResolver<EurekaEndpoint> clusterResolver,
    final TransportClientFactory transportClientFactory) {

    return new EurekaHttpClientFactory() { // SessionedEurekaHttpClientFactory
    @Override
    public EurekaHttpClient newClient() {
    return new SessionedEurekaHttpClient(
    name,
    RetryableEurekaHttpClient.createFactory( // RetryableEurekaHttpClient
    name,
    transportConfig,
    clusterResolver,
    RedirectingEurekaHttpClient.createFactory(transportClientFactory), // RedirectingEurekaHttpClient
    ServerStatusEvaluators.legacyEvaluator()),
    transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
    );
    }

    @Override
    public void shutdown() {
    wrapClosable(clusterResolver).shutdown();
    }
    };
    }

  • 第 51 至 71 行 :调用 EurekaHttpClients#queryClientFactory(...) 方法,创建 queryClient 的 EurekaHttpClientFactory ,代码如下 :

    // EurekaHttpClients.java
    public static EurekaHttpClientFactory queryClientFactory(ClusterResolver bootstrapResolver,
    TransportClientFactory transportClientFactory,
    EurekaClientConfig clientConfig,
    EurekaTransportConfig transportConfig,
    InstanceInfo myInstanceInfo,
    ApplicationsResolver.ApplicationsSource applicationsSource) {

    ClosableResolver queryResolver = transportConfig.useBootstrapResolverForQuery()
    ? wrapClosable(bootstrapResolver)
    : queryClientResolver(bootstrapResolver, transportClientFactory,
    clientConfig, transportConfig, myInstanceInfo, applicationsSource);
    return canonicalClientFactory(EurekaClientNames.QUERY, transportConfig, queryResolver, transportClientFactory); // 该方法上面有
    }

666. 彩蛋

知识星球

这次真的是彩蛋,我们将整体调用关系调整如下如下( 打开大图 ):

胖友,你学会了么?

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. EurekaHttpClient
    1. 2.1. 2.1 EurekaJerseyClientImpl
    2. 2.2. 2.2 EurekaJerseyClientBuilder
  3. 3. 3. EurekaHttpClient
    1. 3.1. 3.1 EurekaHttpResponse
    2. 3.2. 3.2 TransportClientFactory
  4. 4. 4. AbstractJerseyEurekaHttpClient
    1. 4.1. 4.1 JerseyApplicationClient
      1. 4.1.1. 4.1.1 JerseyEurekaHttpClientFactory
      2. 4.1.2. 4.1.2 JerseyEurekaHttpClientFactoryBuilder
    2. 4.2. 4.2 JerseyReplicationClient
      1. 4.2.1. 4.2.1 没有工厂
  5. 5. 5. EurekaHttpClientDecorator
    1. 5.1. 5.1 MetricsCollectingEurekaHttpClient
    2. 5.2. 5.2 RedirectingEurekaHttpClient
      1. 5.2.1. 5.2.1 工厂
    3. 5.3. 5.3 RetryableEurekaHttpClient
      1. 5.3.1. 5.3.1 工厂
    4. 5.4. 5.4 SessionedEurekaHttpClient
      1. 5.4.1. 5.4.1 没有工厂
  6. 6. 6. 创建网络通讯客户端
  7. 7. 666. 彩蛋