⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Apollo/portal-publish-namespace-branch/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

阅读源码最好的方式,是使用 IDEA 进行调试 Apollo 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git018 关键字
获得艿艿添加了中文注释的 Apollo 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1. 概述

老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》 ,特别是 《Apollo 官方 wiki 文档 —— 灰度发布使用指南》

灰度发布,实际上是 Namespace ( 分支 Namespace )发布 Release 。所以,调用的接口和 《Apollo 源码解析 —— Portal 发布配置》一样的

差异点,在于 apollo-biz 项目中,ReleaseService#publish(...) 方法中,多了一个处理灰度发布的分支逻辑。

2. ReleaseService

2.1 publishBranchNamespace

#publishBranchNamespace(...) 方法, Namespace 发布 Release 。 Namespace 会自动继承 父 Namespace 已经发布的配置。若有相同的配置项,使用 Namespace 的。配置处理的逻辑上,和关联 Namespace 是一致的。代码如下:

 1: private Release publishBranchNamespace(Namespace parentNamespace, Namespace childNamespace,
2: Map<String, String> childNamespaceItems,
3: String releaseName, String releaseComment,
4: String operator, boolean isEmergencyPublish) {
5: // 获得父 Namespace 的最后有效 Release 对象
6: Release parentLatestRelease = findLatestActiveRelease(parentNamespace);
7: // 获得父 Namespace 的配置项
8: Map<String, String> parentConfigurations = parentLatestRelease != null ? gson.fromJson(parentLatestRelease.getConfigurations(), GsonType.CONFIG) : new HashMap<>();
9: // 获得父 Namespace 的 releaseId 属性
10: long baseReleaseId = parentLatestRelease == null ? 0 : parentLatestRelease.getId();
11: // 合并配置项
12: Map<String, String> childNamespaceToPublishConfigs = mergeConfiguration(parentConfigurations, childNamespaceItems);
13: // 发布子 Namespace 的 Release
14: return branchRelease(parentNamespace, childNamespace, releaseName, releaseComment,
15: childNamespaceToPublishConfigs, baseReleaseId, operator,
16: ReleaseOperation.GRAY_RELEASE, isEmergencyPublish);
17:
18: }

  • 第 5 至 12 行:获得最终的配置 Map 。

    • 第 6 行:调用 #findLatestActiveRelease(parentNamespace) 方法,获得 Namespace 的最后有效 Release 对象。

    • 第 8 行:获得 Namespace 的配置 Map 。

    • 第 10 行:获得 Namespace 的 releaseId 属性。

    • 第 12 行:调用 #mergeConfiguration(parentConfigurations, childNamespaceItems) 方法,合并父子 Namespace 的配置 Map 。代码如下:

      private Map<String, String> mergeConfiguration(Map<String, String> baseConfigurations, Map<String, String> coverConfigurations) {
      Map<String, String> result = new HashMap<>();
      // copy base configuration
      // 父 Namespace 的配置项
      for (Map.Entry<String, String> entry : baseConfigurations.entrySet()) {
      result.put(entry.getKey(), entry.getValue());
      }
      // update and publish
      // 子 Namespace 的配置项
      for (Map.Entry<String, String> entry : coverConfigurations.entrySet()) {
      result.put(entry.getKey(), entry.getValue());
      }
      // 返回合并后的配置项
      return result;
      }

      • x
  • 第 14 行:调用 #branchRelease(...) 方法,发布 Namespace 的 Release 。代码如下:

     1: private Release branchRelease(Namespace parentNamespace, Namespace childNamespace,
    2: String releaseName, String releaseComment,
    3: Map<String, String> configurations, long baseReleaseId,
    4: String operator, int releaseOperation, boolean isEmergencyPublish) {
    5: // 获得父 Namespace 最后有效的 Release 对象
    6: Release previousRelease = findLatestActiveRelease(childNamespace.getAppId(), childNamespace.getClusterName(), childNamespace.getNamespaceName());
    7: // 获得父 Namespace 最后有效的 Release 对象的编号
    8: long previousReleaseId = previousRelease == null ? 0 : previousRelease.getId();
    9:
    10: // 创建 Map ,用于 ReleaseHistory 对象的 `operationContext` 属性。
    11: Map<String, Object> releaseOperationContext = Maps.newHashMap();
    12: releaseOperationContext.put(ReleaseOperationContext.BASE_RELEASE_ID, baseReleaseId);
    13: releaseOperationContext.put(ReleaseOperationContext.IS_EMERGENCY_PUBLISH, isEmergencyPublish);
    14:
    15: // 创建子 Namespace 的 Release 对象,并保存
    16: Release release = createRelease(childNamespace, releaseName, releaseComment, configurations, operator);
    17:
    18: // 更新 GrayReleaseRule 的 releaseId 属性
    19: // update gray release rules
    20: GrayReleaseRule grayReleaseRule = namespaceBranchService.updateRulesReleaseId(childNamespace.getAppId(),
    21: parentNamespace.getClusterName(),
    22: childNamespace.getNamespaceName(),
    23: childNamespace.getClusterName(),
    24: release.getId(), operator);
    25:
    26: // 创建 ReleaseHistory 对象,并保存
    27: if (grayReleaseRule != null) {
    28: releaseOperationContext.put(ReleaseOperationContext.RULES, GrayReleaseRuleItemTransformer.batchTransformFromJSON(grayReleaseRule.getRules()));
    29: }
    30: releaseHistoryService.createReleaseHistory(parentNamespace.getAppId(), parentNamespace.getClusterName(),
    31: parentNamespace.getNamespaceName(), childNamespace.getClusterName(),
    32: release.getId(),
    33: previousReleaseId, releaseOperation, releaseOperationContext, operator);
    34: return release;
    35: }

    • 第 6 行:获得 Namespace 最后有效的 Release 对象。

    • 第 8 行:获得 Namespace 的 releaseId 属性。

    • 第 10 至 13 行:创建 Map ,用于 ReleaseHistory 对象的 operationContext 属性。

    • 第 16 行:调用 #createRelease(...) 方法,创建 Namespace 的 Release 对象,并保存到数据库中。

    • 第 18 至 24 行:更新 GrayReleaseRule 的 releaseId 属性到数据库中。代码如下:

      @Transactional
      public GrayReleaseRule updateRulesReleaseId(String appId, String clusterName, String namespaceName, String branchName, long latestReleaseId, String operator) {
      // 获得老的 GrayReleaseRule 对象
      GrayReleaseRule oldRules = grayReleaseRuleRepository.findTopByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(appId, clusterName, namespaceName, branchName);
      if (oldRules == null) {
      return null;
      }

      // 创建新的 GrayReleaseRule 对象
      GrayReleaseRule newRules = new GrayReleaseRule();
      newRules.setBranchStatus(NamespaceBranchStatus.ACTIVE);
      newRules.setReleaseId(latestReleaseId); // update
      newRules.setRules(oldRules.getRules());
      newRules.setAppId(oldRules.getAppId());
      newRules.setClusterName(oldRules.getClusterName());
      newRules.setNamespaceName(oldRules.getNamespaceName());
      newRules.setBranchName(oldRules.getBranchName());
      newRules.setDataChangeCreatedBy(operator); // update
      newRules.setDataChangeLastModifiedBy(operator); // update

      // 保存新的 GrayReleaseRule 对象
      grayReleaseRuleRepository.save(newRules);
      // 删除老的 GrayReleaseRule 对象
      grayReleaseRuleRepository.delete(oldRules);
      return newRules;
      }

      • 删除老的 GrayReleaseRule 对象。
      • 保存新的 GrayReleaseRule 对象。
    • 第 26 至 33 行:调用 ReleaseHistoryService#createReleaseHistory(...) 方法,创建 ReleaseHistory 对象,并保存到数据库中。

2.2 mergeFromMasterAndPublishBranch

本小节不属于本文,考虑到和灰度发布相关,所以放在此处。

Namespace 发布 Release 后,会调用 #mergeFromMasterAndPublishBranch(...) 方法,自动将 Namespace (主干) 合并到 Namespace (分支),并进行一次子 Namespace 的发布。代码如下:

 1: private void mergeFromMasterAndPublishBranch(Namespace parentNamespace, Namespace childNamespace,
2: Map<String, String> parentNamespaceItems,
3: String releaseName, String releaseComment,
4: String operator, Release masterPreviousRelease,
5: Release parentRelease, boolean isEmergencyPublish) {
6: // 获得子 Namespace 的配置 Map
7: // create release for child namespace
8: Map<String, String> childReleaseConfiguration = getNamespaceReleaseConfiguration(childNamespace);
9: // 获得父 Namespace 的配置 Map
10: Map<String, String> parentNamespaceOldConfiguration = masterPreviousRelease == null ? null : gson.fromJson(masterPreviousRelease.getConfigurations(), GsonType.CONFIG);
11:
12: // 计算合并最新父 Namespace 的配置 Map 后的子 Namespace 的配置 Map
13: Map<String, String> childNamespaceToPublishConfigs = calculateChildNamespaceToPublishConfiguration(parentNamespaceOldConfiguration,
14: parentNamespaceItems, childNamespace);
15:
16: // compare
17: // 若发生了变化,则进行一次子 Namespace 的发布
18: if (!childNamespaceToPublishConfigs.equals(childReleaseConfiguration)) {
19: branchRelease(parentNamespace, childNamespace, releaseName, releaseComment,
20: childNamespaceToPublishConfigs, parentRelease.getId(), operator,
21: ReleaseOperation.MASTER_NORMAL_RELEASE_MERGE_TO_GRAY, isEmergencyPublish);
22: }
23: }

  • 第 8 行:调用 #getNamespaceReleaseConfiguration(childNamespace) 方法,获得 Namespace 的最新且有效的 Release 的配置 Map 。代码如下:

    private Map<String, String> getNamespaceReleaseConfiguration(Namespace namespace) {
    // 获得最后有效的 Release 对象
    Release release = findLatestActiveRelease(namespace);
    Map<String, String> configuration = new HashMap<>();
    // 获得配置 Map
    if (release != null) {
    configuration = new Gson().fromJson(release.getConfigurations(), GsonType.CONFIG);
    }
    return configuration;
    }

  • 第 10 行:获得 Namespace 的配置 Map 。

  • 第 12 至 14 行:计算合并最新父 Namespace 的配置 Map 后,子 Namespace 的配置 Map 。代码如下:

    // 计算合并最新父 Namespace 的配置 Map 后的子 Namespace 的配置 Map
    private Map<String, String> calculateChildNamespaceToPublishConfiguration(
    Map<String, String> parentNamespaceOldConfiguration, Map<String, String> parentNamespaceNewConfiguration,
    Namespace childNamespace) {
    // 获得子 Namespace 的最后有效的 Release 对象
    // first. calculate child namespace modified configs
    Release childNamespaceLatestActiveRelease = findLatestActiveRelease(childNamespace);
    // 获得子 Namespace 的配置 Map
    Map<String, String> childNamespaceLatestActiveConfiguration = childNamespaceLatestActiveRelease == null ? null :
    gson.fromJson(childNamespaceLatestActiveRelease.getConfigurations(), GsonType.CONFIG);

    // 以子 Namespace 的配置 Map 为基础,计算出差异的 Map
    Map<String, String> childNamespaceModifiedConfiguration = calculateBranchModifiedItemsAccordingToRelease(parentNamespaceOldConfiguration,
    childNamespaceLatestActiveConfiguration);

    // second. append child namespace modified configs to parent namespace new latest configuration
    return mergeConfiguration(parentNamespaceNewConfiguration, childNamespaceModifiedConfiguration);
    }

    // 以子 Namespace 的配置 Map 为基础,计算出差异的 Map
    private Map<String, String> calculateBranchModifiedItemsAccordingToRelease(
    Map<String, String> masterReleaseConfigs,
    Map<String, String> branchReleaseConfigs) {
    // 差异 Map
    Map<String, String> modifiedConfigs = new HashMap<>();
    // 若子 Namespace 的配置 Map 为空,直接返回空 Map
    if (CollectionUtils.isEmpty(branchReleaseConfigs)) {
    return modifiedConfigs;
    }
    // 若父 Namespace 的配置 Map 为空,直接返回子 Namespace 的配置 Map
    if (CollectionUtils.isEmpty(masterReleaseConfigs)) {
    return branchReleaseConfigs;
    }

    // 以子 Namespace 的配置 Map 为基础,计算出差异的 Map
    for (Map.Entry<String, String> entry : branchReleaseConfigs.entrySet()) {
    if (!Objects.equals(entry.getValue(), masterReleaseConfigs.get(entry.getKey()))) { // 对比
    modifiedConfigs.put(entry.getKey(), entry.getValue());
    }
    }
    return modifiedConfigs;
    }

    • 【第一步】逻辑看起来比较冗长和“绕”。简单的说, Namespace 的配置 Map 是包含 Namespace 的配置 Map ,所以需要剔除。但是呢,剔除的过程中,又需要保留 Namespace 的自定义的配置项。这就是第二个方法,#calculateBranchModifiedItemsAccordingToRelease(...) 的逻辑。
    • 【第二步】做完上面的步骤后,就可以调用 #mergeConfiguration(...) 方法,合并 Namespace 的配置 Map 。
    • 胖友好好理解下。
  • 第 17 至 22 行:若发生了变化,则调用 #branchRelease(...) 方法,进行一次 Namespace 的发布。这块就和 「2.1 publishBranchNamespace」 一致了。

    • 什么情况下会未发生变化呢?例如, Namespace 修改配置项 timeout: 2000=> 3000 ,而恰好 Namespace 修改配置项 timeout: 2000=> 3000 并且已经灰度发布。

3. 加载灰度配置

《Apollo 源码解析 —— Config Service 配置读取接口》 中,我们看到 AbstractConfigService#findRelease(...) 方法中,会读取根据客户端的情况,匹配是否有灰度 Release ,代码如下:

 1: /**
2: * Find release
3: *
4: * 获得 Release 对象
5: *
6: * @param clientAppId the client's app id
7: * @param clientIp the client ip
8: * @param configAppId the requested config's app id
9: * @param configClusterName the requested config's cluster name
10: * @param configNamespace the requested config's namespace name
11: * @param clientMessages the messages received in client side
12: * @return the release
13: */
14: private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,
15: String configNamespace, ApolloNotificationMessages clientMessages) {
16: // 读取灰度发布编号
17: Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId, configClusterName, configNamespace);
18: // 读取灰度 Release 对象
19: Release release = null;
20: if (grayReleaseId != null) {
21: release = findActiveOne(grayReleaseId, clientMessages);
22: }
23: // 非灰度,获得最新的,并且有效的 Release 对象
24: if (release == null) {
25: release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);
26: }
27: return release;
28: }

  • 第 17 行:调用 GrayReleaseRulesHolder#findReleaseIdFromGrayReleaseRule(...) 方法,读取灰度发布编号,即 GrayReleaseRule.releaseId 属性。详细解析,在 「3.1 GrayReleaseRulesHolder」 中。
  • 第 18 至 22 行:调用 #findActiveOne(grayReleaseId, clientMessages) 方法,读取灰度 Release 对象。

3.1 GrayReleaseRulesHolder

com.ctrip.framework.apollo.biz.grayReleaseRule.GrayReleaseRulesHolder ,实现 InitializingBean 和 ReleaseMessageListener 接口,GrayReleaseRule 缓存 Holder ,用于提高对 GrayReleaseRule 的读取速度。

3.1.1 构造方法

private static final Logger logger = LoggerFactory.getLogger(GrayReleaseRulesHolder.class);

private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Splitter STRING_SPLITTER = Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();

@Autowired
private GrayReleaseRuleRepository grayReleaseRuleRepository;
@Autowired
private BizConfig bizConfig;

/**
* 数据库扫描频率,单位:秒
*/
private int databaseScanInterval;
/**
* ExecutorService 对象
*/
private ScheduledExecutorService executorService;
/**
* GrayReleaseRuleCache 缓存
*
* KEY:configAppId+configCluster+configNamespace ,通过 {@link #assembleGrayReleaseRuleKey(String, String, String)} 生成
* 注意,KEY 中不包含 BranchName
* VALUE:GrayReleaseRuleCache 数组
*/
//store configAppId+configCluster+configNamespace -> GrayReleaseRuleCache map
private Multimap<String, GrayReleaseRuleCache> grayReleaseRuleCache;
/**
* GrayReleaseRuleCache 缓存2
*
* KEY:clientAppId+clientNamespace+ip ,通过 {@link #assembleReversedGrayReleaseRuleKey(String, String, String)} 生成
* 注意,KEY 中不包含 ClusterName
* VALUE:{@link GrayReleaseRule#id} 数组
*/
//store clientAppId+clientNamespace+ip -> ruleId map
private Multimap<String, Long> reversedGrayReleaseRuleCache;
/**
* 加载版本号
*/
// an auto increment version to indicate the age of rules
private AtomicLong loadVersion;

public GrayReleaseRulesHolder() {
loadVersion = new AtomicLong();
grayReleaseRuleCache = Multimaps.synchronizedSetMultimap(HashMultimap.create());
reversedGrayReleaseRuleCache = Multimaps.synchronizedSetMultimap(HashMultimap.create());
executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("GrayReleaseRulesHolder", true));
}

  • 缓存相关

    • GrayReleaseRuleCache ,胖友先去 「3.2 GrayReleaseRuleCache」 ,在回过来。
    • grayReleaseRuleCache 属性, GrayReleaseRuleCache 缓存。
      • KEY: configAppId + configCluster + configNamespace 拼接成,不包含 branchName 。因为我们在匹配灰度规则时,不关注 branchName 属性。
      • VALUE:GrayReleaseRuleCache 数组。因为 branchName 不包含在 KEY 中,而同一个 Namespace 可以创建多次灰度( 创建下一个需要将前一个灰度放弃 )版本,所以就会形成数组。
    • reversedGrayReleaseRuleCache 属性,反转的 GrayReleaseRuleCache 缓存。
      • KEY:clientAppId + clientNamespace + ip注意,不包含 clusterName 属性。具体原因,我们下面的 #hasGrayReleaseRule(clientAppId, clientIp, namespaceName) 方法中,详细分享。
      • VALUE:GrayReleaseRule 的编号数组。
      • 为什么叫做反转呢?因为使用 GrayReleaseRule 的具体属性作为键,而使用 GrayReleaseRule 的编号作为值。
    • 通过定时扫描 + ReleaseMessage 近实时通知,更新缓存。
  • 定时任务相关

    • executorService 属性,ExecutorService 对象。
    • databaseScanInterval 属性,数据库扫描频率,单位:秒。
    • loadVersion 属性,加载版本。

3.1.2 初始化

#afterPropertiesSet() 方法,通过 Spring 调用,初始化 Scan 任务。代码如下:

@Override
1: public void afterPropertiesSet() throws Exception {
2: // 从 ServerConfig 中,读取任务的周期配置
3: populateDataBaseInterval();
4: // 初始拉取 GrayReleaseRuleCache 到缓存
5: // force sync load for the first time
6: periodicScanRules();
7: // 定时拉取 GrayReleaseRuleCache 到缓存
8: executorService.scheduleWithFixedDelay(this::periodicScanRules,
9: getDatabaseScanIntervalSecond(), getDatabaseScanIntervalSecond(), getDatabaseScanTimeUnit()
10: );
11: }

  • 第 3 行:调用 #populateDataBaseInterval() 方法,从 ServerConfig 中,读取定时任务的周期配置。代码如下:

    private void populateDataBaseInterval() {
    databaseScanInterval = bizConfig.grayReleaseRuleScanInterval(); // "apollo.gray-release-rule-scan.interval" ,默认为 60 。
    }

  • 第 6 行:调用 #periodicScanRules() 方法,初始拉取 GrayReleaseRuleCache 到缓存。代码如下:

    private void periodicScanRules() {
    // 【TODO 6001】Tracer 日志
    Transaction transaction = Tracer.newTransaction("Apollo.GrayReleaseRulesScanner", "scanGrayReleaseRules");
    try {
    // 递增加载版本号
    loadVersion.incrementAndGet();
    // 从数据卷库中,扫描所有 GrayReleaseRules ,并合并到缓存中
    scanGrayReleaseRules();
    // 【TODO 6001】Tracer 日志
    transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
    // 【TODO 6001】Tracer 日志
    transaction.setStatus(ex);
    logger.error("Scan gray release rule failed", ex);
    } finally {
    // 【TODO 6001】Tracer 日志
    transaction.complete();
    }
    }

    private void scanGrayReleaseRules() {
    long maxIdScanned = 0;
    boolean hasMore = true;
    // 循环顺序分批加载 GrayReleaseRule ,直到结束或者线程打断
    while (hasMore && !Thread.currentThread().isInterrupted()) {
    // 顺序分批加载 GrayReleaseRule 500 条
    List<GrayReleaseRule> grayReleaseRules = grayReleaseRuleRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(grayReleaseRules)) {
    break;
    }
    // 合并到 GrayReleaseRule 缓存
    mergeGrayReleaseRules(grayReleaseRules);
    // 获得新的 maxIdScanned ,取最后一条记录
    int rulesScanned = grayReleaseRules.size();
    maxIdScanned = grayReleaseRules.get(rulesScanned - 1).getId();
    // batch is 500
    // 若拉取不足 500 条,说明无 GrayReleaseRule 了
    hasMore = rulesScanned == 500;
    }
    }

    • 循环顺序分批加载 GrayReleaseRule ,直到全部加载完或者线程打断。
    • loadVersion 属性,递增加载版本号。
    • 调用 #mergeGrayReleaseRules(List<GrayReleaseRule>) 方法,合并 GrayReleaseRule 数组,到缓存中。详细解析,见 「3.1.4 mergeGrayReleaseRules」
    • 🙂 其他代码比较简单,胖友自己看代码注释。
  • 第 7 至 10 行:创建定时任务,定时调用 #scanGrayReleaseRules() 方法,重新全量拉取 GrayReleaseRuleCache 到缓存。

3.1.3 handleMessage

#handleMessage(ReleaseMessage, channel) 实现方法,基于 ReleaseMessage 近实时通知,更新缓存。代码如下:

 1: @Override
2: public void handleMessage(ReleaseMessage message, String channel) {
3: logger.info("message received - channel: {}, message: {}", channel, message);
4: String releaseMessage = message.getMessage();
5: // 只处理 APOLLO_RELEASE_TOPIC 的消息
6: if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(releaseMessage)) {
7: return;
8: }
9: // 获得 appId cluster namespace 参数
10: List<String> keys = STRING_SPLITTER.splitToList(releaseMessage);
11: //message should be appId+cluster+namespace
12: if (keys.size() != 3) {
13: logger.error("message format invalid - {}", releaseMessage);
14: return;
15: }
16: String appId = keys.get(0);
17: String cluster = keys.get(1);
18: String namespace = keys.get(2);
19:
20: // 获得对应的 GrayReleaseRule 数组
21: List<GrayReleaseRule> rules = grayReleaseRuleRepository.findByAppIdAndClusterNameAndNamespaceName(appId, cluster, namespace);
22: // 合并到 GrayReleaseRule 缓存中
23: mergeGrayReleaseRules(rules);
24: }

  • 第 5 至 8 行:只处理 APOLLO_RELEASE_TOPIC 的消息。
  • 第 9 至 18 行:获得 appId cluster namespace 参数。
  • 第 21 行:调用 grayReleaseRuleRepository#findByAppIdAndClusterNameAndNamespaceName(appId, cluster, namespace) 方法,获得对应的 GrayReleaseRule 数组。
  • 第 23 行:调用 #mergeGrayReleaseRules(List<GrayReleaseRule>) 方法,合并到 GrayReleaseRule 缓存中。详细解析,见 「3.1.4 mergeGrayReleaseRules」

3.1.4 mergeGrayReleaseRules

#mergeGrayReleaseRules(List<GrayReleaseRule>) 方法,合并 GrayReleaseRule 到缓存中。代码如下:

 1: private void mergeGrayReleaseRules(List<GrayReleaseRule> grayReleaseRules) {
2: if (CollectionUtils.isEmpty(grayReleaseRules)) {
3: return;
4: }
5: // !!! 注意,下面我们说的“老”,指的是已经在缓存中,但是实际不一定“老”。
6: for (GrayReleaseRule grayReleaseRule : grayReleaseRules) {
7: // 无对应的 Release 编号,记未灰度发布,则无视
8: if (grayReleaseRule.getReleaseId() == null || grayReleaseRule.getReleaseId() == 0) {
9: // filter rules with no release id, i.e. never released
10: continue;
11: }
12: // 创建 `grayReleaseRuleCache` 的 KEY
13: String key = assembleGrayReleaseRuleKey(grayReleaseRule.getAppId(), grayReleaseRule.getClusterName(), grayReleaseRule.getNamespaceName());
14: // 从缓存 `grayReleaseRuleCache` 读取,并创建数组,避免并发
15: // create a new list to avoid ConcurrentModificationException
16: List<GrayReleaseRuleCache> rules = Lists.newArrayList(grayReleaseRuleCache.get(key));
17: // 获得子 Namespace 对应的老的 GrayReleaseRuleCache 对象
18: GrayReleaseRuleCache oldRule = null;
19: for (GrayReleaseRuleCache ruleCache : rules) {
20: if (ruleCache.getBranchName().equals(grayReleaseRule.getBranchName())) {
21: oldRule = ruleCache;
22: break;
23: }
24: }
25:
26: // 忽略,若不存在老的 GrayReleaseRuleCache ,并且当前 GrayReleaseRule 对应的分支不处于激活( 有效 )状态
27: // if old rule is null and new rule's branch status is not active, ignore
28: if (oldRule == null && grayReleaseRule.getBranchStatus() != NamespaceBranchStatus.ACTIVE) {
29: continue;
30: }
31:
32: // 若新的 GrayReleaseRule 为新增或更新,进行缓存更新
33: // use id comparison to avoid synchronization
34: if (oldRule == null || grayReleaseRule.getId() > oldRule.getRuleId()) {
35: // 添加新的 GrayReleaseRuleCache 到缓存中
36: addCache(key, transformRuleToRuleCache(grayReleaseRule));
37: // 移除老的 GrayReleaseRuleCache 出缓存中
38: if (oldRule != null) {
39: removeCache(key, oldRule);
40: }
41: } else {
42: // 老的 GrayReleaseRuleCache 对应的分支处于激活( 有效 )状态,更新加载版本号。
43: // 例如,定时轮询,有可能,早于 `#handleMessage(...)` 拿到对应的新的 GrayReleaseRule 记录,那么此时规则编号是相等的,不符合上面的条件,但是符合这个条件。
44: // 再例如,两次定时轮询,第二次和第一次的规则编号是相等的,不符合上面的条件,但是符合这个条件。
45: if (oldRule.getBranchStatus() == NamespaceBranchStatus.ACTIVE) {
46: // update load version
47: oldRule.setLoadVersion(loadVersion.get());
48: // 保留两轮,
49: // 适用于,`GrayReleaseRule.branchStatus` 为 DELETED 或 MERGED 的情况。
50: } else if ((loadVersion.get() - oldRule.getLoadVersion()) > 1) {
51: // remove outdated inactive branch rule after 2 update cycles
52: removeCache(key, oldRule);
53: }
54: }
55: }
56: }

  • 第 5 行:!!! 注意,下面我们说的“老”,指的是已经在缓存中,但是实际不一定“老”。
  • 第 6 行:循环 GrayReleaseRule 数组,合并到缓存中。被缓存到的 GrayReleaseRule 对象,我们成为“”的。
  • 第 7 至 11 行:无视,若 GrayReleaseRule 无对应的 Release 编号,说明该 Namespace 还未灰度发布。
  • 第 12 至 24 行:获得子 Namespace 对应的的 GrayReleaseRuleCache 对象。此处的“老”,指的是缓存中的。
  • 第 26 至 30 行:无视,若不存在老的 GrayReleaseRuleCache ,并且当前 GrayReleaseRule 对应的分支不处于激活( ACTIVE 有效 )状态。
  • 第 32 至 40 行:若的 GrayReleaseRule 为新增或更新( 编号更大 ),进行缓存更新,并移除的 GrayReleaseRule 出缓存。
    • 第 36 行:调用 transformRuleToRuleCache(GrayReleaseRule) 方法,将 GrayReleaseRule 转换成 GrayReleaseRuleCache 对象。详细解析,见 「3.1.4.1 transformRuleToRuleCache」
    • 第 36 行:调用 #addCache(key, GrayReleaseRuleCache) 方法,添加的 GrayReleaseRuleCache 到缓存中。详细解析,见 「3.1.4.2 addCache」
    • 第 37 至 40 行:调用 #remove(key, oldRule) 方法,移除 的 GrayReleaseRuleCache 出缓存。详细解析,见 「3.1.4.3 removeCache」
  • 第 42 至 47 行:的 GrayReleaseRuleCache 对应的分支处于激活( 有效 )状态,更新加载版本号。
    • 例如,定时轮询,有可能,早于 #handleMessage(...) 拿到对应的新的 GrayReleaseRule 记录,那么此时规则编号是相等的,不符合上面的条件,但是符合这个条件。
    • 再例如,两次定时轮询,第二次和第一次的规则编号是相等的,不符合上面的条件,但是符合这个条件。
    • 总结,刷新有效的 GrayReleaseRuleCache 对象的 loadVersion
  • 第 50 至 53 行:若 GrayReleaseRule.branchStatus 为 DELETED 或 MERGED 的情况,保留两轮定时扫描,后调用 #remove(key, oldRule) 方法,移除出缓存。
    • 例如,灰度全量发布时,会添加 GrayReleaseRule.branchStatusMERGED 到缓存中。保留两轮,进行移除出缓存。

    • 为什么是两轮?笔者请教了宋老师( Apollo 的作者之一 ),解答如下:

      这个是把已经inactive的rule删除,至于为啥保留两轮,这个可能只是个选择问题 * T T 笔者表示还是不太明白,继续思考ing 。如果有知道的胖友,烦请告知。

3.1.4.1 transformRuleToRuleCache

#transformRuleToRuleCache(GrayReleaseRule) 方法,将 GrayReleaseRule 转换成 GrayReleaseRuleCache 对象。代码如下:

private GrayReleaseRuleCache transformRuleToRuleCache(GrayReleaseRule grayReleaseRule) {
// 转换出 GrayReleaseRuleItemDTO 数组
Set<GrayReleaseRuleItemDTO> ruleItems;
try {
ruleItems = GrayReleaseRuleItemTransformer.batchTransformFromJSON(grayReleaseRule.getRules());
} catch (Throwable ex) {
ruleItems = Sets.newHashSet();
Tracer.logError(ex);
logger.error("parse rule for gray release rule {} failed", grayReleaseRule.getId(), ex);
}
// 创建 GrayReleaseRuleCache 对象,并返回
return new GrayReleaseRuleCache(grayReleaseRule.getId(),
grayReleaseRule.getBranchName(), grayReleaseRule.getNamespaceName(), grayReleaseRule
.getReleaseId(), grayReleaseRule.getBranchStatus(), loadVersion.get(), ruleItems);
}

3.1.4.2 addCache

#addCache(key, GrayReleaseRuleCache) 方法,添加的 GrayReleaseRuleCache 到缓存中。代码如下:

 1: private void addCache(String key, GrayReleaseRuleCache ruleCache) {
2: // 添加到 reversedGrayReleaseRuleCache 中
3: // 为什么这里判断状态?因为删除灰度,或者灰度全量发布的情况下,是无效的,所以不添加到 reversedGrayReleaseRuleCache 中
4: if (ruleCache.getBranchStatus() == NamespaceBranchStatus.ACTIVE) {
5: for (GrayReleaseRuleItemDTO ruleItemDTO : ruleCache.getRuleItems()) {
6: for (String clientIp : ruleItemDTO.getClientIpList()) {
7: reversedGrayReleaseRuleCache.put(assembleReversedGrayReleaseRuleKey(ruleItemDTO.getClientAppId(), ruleCache.getNamespaceName(), clientIp),
8: ruleCache.getRuleId());
9: }
10: }
11: }
12: // 添加到 grayReleaseRuleCache
13: // 这里为什么可以添加?因为添加到 grayReleaseRuleCache 中是个对象,可以判断状态
14: grayReleaseRuleCache.put(key, ruleCache);
15: }

  • 第 2 至 11 行:添加到 reversedGrayReleaseRuleCache 中。
    • 为什么这里判断状态?因为删除灰度,或者灰度全量发布的情况下,是无效的,所以不添加到 reversedGrayReleaseRuleCache 中。
  • 第 14 行:添加到 grayReleaseRuleCache 中。
    • 为什么这里可以添加?因为添加到 grayReleaseRuleCache 中是个对象,可以判断状态。

3.1.4.3 removeCache

#remove(key, oldRule) 方法,移除 的 GrayReleaseRuleCache 出缓存。代码如下:

private void removeCache(String key, GrayReleaseRuleCache ruleCache) {
// 移除出 grayReleaseRuleCache
grayReleaseRuleCache.remove(key, ruleCache);
// 移除出 reversedGrayReleaseRuleCache
for (GrayReleaseRuleItemDTO ruleItemDTO : ruleCache.getRuleItems()) {
for (String clientIp : ruleItemDTO.getClientIpList()) {
reversedGrayReleaseRuleCache.remove(assembleReversedGrayReleaseRuleKey(ruleItemDTO.getClientAppId(), ruleCache.getNamespaceName(), clientIp),
ruleCache.getRuleId());
}
}
}

3.1.5 findReleaseIdFromGrayReleaseRule

#findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId, configCluster, configNamespaceName) 方法,若匹配上灰度规则,返回对应的 Release 编号。代码如下:

public Long findReleaseIdFromGrayReleaseRule(String clientAppId, String clientIp, String
configAppId, String configCluster, String configNamespaceName) {
// 判断 grayReleaseRuleCache 中是否存在
String key = assembleGrayReleaseRuleKey(configAppId, configCluster, configNamespaceName);
if (!grayReleaseRuleCache.containsKey(key)) {
return null;
}
// 循环 GrayReleaseRuleCache 数组,获得匹配的 Release 编号
// create a new list to avoid ConcurrentModificationException
List<GrayReleaseRuleCache> rules = Lists.newArrayList(grayReleaseRuleCache.get(key));
for (GrayReleaseRuleCache rule : rules) {
// 校验 GrayReleaseRuleCache 对应的子 Namespace 的状态是否为有效
//check branch status
if (rule.getBranchStatus() != NamespaceBranchStatus.ACTIVE) {
continue;
}
// 是否匹配灰度规则。若是,则返回。
if (rule.matches(clientAppId, clientIp)) {
return rule.getReleaseId();
}
}
return null;
}

  • 🙂 代码比较易懂,胖友自己看代码注释哈。

3.1.6 hasGrayReleaseRule

/**
* Check whether there are gray release rules for the clientAppId, clientIp, namespace
* combination. Please note that even there are gray release rules, it doesn't mean it will always
* load gray releases. Because gray release rules actually apply to one more dimension - cluster.
*/
public boolean hasGrayReleaseRule(String clientAppId, String clientIp, String namespaceName) {
return reversedGrayReleaseRuleCache.containsKey(assembleReversedGrayReleaseRuleKey(clientAppId, namespaceName, clientIp))
|| reversedGrayReleaseRuleCache.containsKey(assembleReversedGrayReleaseRuleKey(clientAppId, namespaceName, GrayReleaseRuleItemDTO.ALL_IP));
}

  • 我们来翻一下英文注释哈,非直译哈。

  • 【一】Check whether there are gray release rules for the clientAppId, clientIp, namespace combination. 针对 clientAppId + clientIp + namespaceName ,校验是否有灰度规则。

  • 【二】Please note that even there are gray release rules, it doesn't mean it will always load gray releases. 请注意,即使返回 true ,也不意味着调用方能加载到灰度发布的配置。

  • 【三】 Because gray release rules actually apply to one more dimension - cluster. 因为,reversedGrayReleaseRuleCache 的 KEY 不包含 branchName ,所以 reversedGrayReleaseRuleCache 的 VALUE 为多个 branchName 的 Release 编号的集合

  • 那么为什么不包含 branchName 呢?在 《Apollo 源码解析 —— Config Service 配置读取接口》 一文中,我们看到 AbstractConfigService 中,#loadConfig(...) 方法中,是按照集群优先级加载,代码如下:

    @Override
    public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName,
    String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) {
    // 优先,获得指定 Cluster 的 Release 。若存在,直接返回。
    // load from specified cluster fist
    if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) {
    Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace,
    clientMessages);
    if (!Objects.isNull(clusterRelease)) {
    return clusterRelease;
    }
    }

    // 其次,获得所属 IDC 的 Cluster 的 Release 。若存在,直接返回
    // try to load via data center
    if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {
    Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace, clientMessages);
    if (!Objects.isNull(dataCenterRelease)) {
    return dataCenterRelease;
    }
    }

    // 最后,获得默认 Cluster 的 Release 。
    // fallback to default release
    return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace, clientMessages);
    }

    • 但是,笔者又想了想,应该也不是这个方法的原因,因为这个方法里,每个调用的方法,clusterName 是明确的,那么把 clusterName 融入到缓存 KEY 也是可以的。所以应该不是这个原因
  • 目前 #hasGrayReleaseRule(clientAppId, clientIp, namespaceName) 方法,仅仅被 ConfigFileController 调用。而 ConfigFileController 在调用时,确实是不知道自己使用哪个 clusterName 。恩恩,应该是这个原因。

3.2 GrayReleaseRuleCache

com.ctrip.framework.apollo.biz.grayReleaseRule.GrayReleaseRuleCache ,GrayReleaseRule 的缓存类。代码如下:

private long ruleId;

// 缺少 appId
// 缺少 clusterName

private String namespaceName;
private String branchName;
private Set<GrayReleaseRuleItemDTO> ruleItems;
private long releaseId;
private int branchStatus;
/**
* 加载版本
*/
private long loadVersion;

// 匹配 clientAppId + clientIp
public boolean matches(String clientAppId, String clientIp) {
for (GrayReleaseRuleItemDTO ruleItem : ruleItems) {
if (ruleItem.matches(clientAppId, clientIp)) {
return true;
}
}
return false;
}

相比 GrayReleaseRule 来说:

  • appId + clusterName 字段,因为在 GrayReleaseRulesHolder 中,缓存 KEY 会根据需要包含这两个字段。
  • loadVersion 字段,用于记录 GrayReleaseRuleCache 的加载版本,用于自动过期逻辑。

666. 彩蛋

T T ,GrayReleaseRulesHolder 看的还是有点懵逼,后面自己有机会写配置中心的灰度发布功能的时候,在捉摸捉摸。如果有些写的不对的地方,欢迎指正。

知识星球

文章目录
  1. 1. 1. 概述
  2. 2. 2. ReleaseService
    1. 2.1. 2.1 publishBranchNamespace
    2. 2.2. 2.2 mergeFromMasterAndPublishBranch
  3. 3. 3. 加载灰度配置
    1. 3.1. 3.1 GrayReleaseRulesHolder
      1. 3.1.1. 3.1.1 构造方法
      2. 3.1.2. 3.1.2 初始化
      3. 3.1.3. 3.1.3 handleMessage
      4. 3.1.4. 3.1.4 mergeGrayReleaseRules
        1. 3.1.4.1. 3.1.4.1 transformRuleToRuleCache
        2. 3.1.4.2. 3.1.4.2 addCache
        3. 3.1.4.3. 3.1.4.3 removeCache
      5. 3.1.5. 3.1.5 findReleaseIdFromGrayReleaseRule
      6. 3.1.6. 3.1.6 hasGrayReleaseRule
    2. 3.2. 3.2 GrayReleaseRuleCache
  4. 4. 666. 彩蛋