自我表扬:《Dubbo 实现原理与源码解析 —— 精品合集》
表扬自己:《D数据库实体设计合集》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/register/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 应用与应用实例的注册。先来简单了解下注册的整体流程:

  • 应用启动,Agent 向 Collector 注册应用
  • 注册应用成功后,Agent 向 Collector 注册应用实例

下面,我们分成两个小节,分别从 API 的实现调用,分享代码的具体实现。

友情提示:推荐阅读 《探针与Collector间通讯协议》

2. Collector 注册相关 API

Collector 注册相关 API 相关有四个接口:

  • 2.1 应用的注册 API
  • 2.2 应用实例的正常注册 API
  • 2.3 应用实例的恢复注册 API
  • 2.4 应用实例的心跳 API

API 处理的流程大体如下:

  • 绿框部分,【2.3】【2.4】两个 API ,直接 Service 调用 DAO 方法,无需经过 Graph / Stream 相关方法。

2.1 应用的注册 API

我们先来看看 API 的定义,ApplicationRegisterService.proto ,如下图所示:

2.1.1 ApplicationRegisterServiceHandler#register(…)

ApplicationRegisterServiceHandler#register(Application, StreamObserver<ApplicationMapping>), 代码如下:

  • 第 51 行:获得请求的应用编码( applicationCode )数组
  • 第 54 至 64 行:循环应用编码数组,获取创建应用。
    • 第 57 行:调用 IApplicationIDService#getOrCreate(applicationCode) 方法,获取创建应用,最终获得应用编号( applicationId )。
    • 第 60 至 63 行:获得到应用编号( applicationId != 0 ),则添加到响应。为什么会存在获得不到的情况呢?在下文中,我们会看到,实际异步保存应用,所以会存在获取失败的情况。当获取失败,调用方( 例如 Agent )可以重新发起该请求进行注册应用,从而在异步保存应用,获取到应用编号。
    • 第 67 至 68 行:响应。

2.1.2 IApplicationIDService#getOrCreate(…)

org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService ,继承 Service 接口,应用编号服务接口。

org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService ,实现 IApplicationIDService 接口,应用编号服务实现类。

  • 实现了 #getOrCreate(applicationCode) 方法,代码如下:

    • 第 66 行:调用 ApplicationCacheService#get(applicationCode) 方法,从缓存中获取应用编号。在 《SkyWalking 源码分析 —— Collector Cache 缓存组件》 有详细解析。
    • 第 69 至 76 行:当获取不到应用编号时,获取 Application 对应的 Graph<Application> 对象,调用 Graph#start(application) 方法,进行流式处理。在这过程中,会保存应用到存储器。
    • 第 77 行:返回应用编号。

2.1.3 Graph#start(application)

#createApplicationRegisterGraph() 方法中,我们可以看到 Application 对应的 Graph<Application> 对象的创建。


IdAutoIncrement#increment(min, max) 方法,双向均匀自增。可能看起来比较奇怪,以上文 Application 的调用举例子:

min max result applicationCode
0 / 1 User
0 / -1 应用 A
-1 1 2 应用 B
-1 2 -2 应用 C
-2 2 3 应用 D
  • 【User】和【应用 A】是直接获得 result ,不调用 #increment(min, max) 方法。
  • 总的来说,我们可以看到,以 min + max = 0 为中心点( 实际以 0 为中心点), 双向均匀自增。

TODO 【4007】

2.1.4 Application

org.skywalking.apm.collector.storage.table.register.Application ,应用。例如记录在 ES 如下图:

2.2 应用实例的正常注册 API

我们先来看看 API 的定义,InstanceDiscoveryService ,如下图所示:

整体代码和 「2.1 应用的注册 API」 非常相似,所以本小节,更多的是提供代码的链接地址。

2.2.1 InstanceDiscoveryServiceHandler#register(…)

InstanceDiscoveryServiceHandler#register(ApplicationInstance, StreamObserver<ApplicationInstanceMapping>),注册应用实例。

2.2.2 IInstanceIDService#getOrCreate(…)

org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService ,继承 Service 接口,应用实例编号服务接口。

org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService ,实现 IInstanceIDService 接口,应用编号服务实现类。

2.2.3 Graph#start(instance)

#createInstanceRegisterGraph() 方法中,我们可以看到 Instance 对应的 Graph<Instance> 对象的创建。

2.2.4 Instance

org.skywalking.apm.collector.storage.table.register.Instance ,应用实例。例如记录在 ES 如下图:

2.3 应用实例的恢复注册 API

我们先来看看 API 的定义,InstanceDiscoveryService.proto ,如下图所示:

一般情况下,Agent 在注册应用时候成功后,如果因为各种原因原因和 Collector 断开了 gRPC Channel 连接( 例如,网络 ),恢复连接后,需要调用该 API ,进行恢复注册。

2.3.1 InstanceDiscoveryServiceHandler#recover(…)

InstanceDiscoveryServiceHandler#recover(ApplicationInstanceRecover, StreamObserver<Downstream>), 代码如下:

  • 第 71 行:调用 TimeBucketUtils#getSecondTimeBucket(time) 方法,将 registerTime 转成 timeBucket 。
  • 第 73 行:调用 IInstanceIDService#recover(instanceId, applicationId, registerTime, osInfo) 方法,恢复注册应用实例。
  • 第 75 至 76 行:响应。

2.3.2 IInstanceIDService#recover(…)

InstanceIDService#recover(instanceId, applicationId, registerTime, osInfo) 实现方法,恢复注册。代码如下:

  • 第 96 至 103 行:创建 Instance 对象,用于下面更新操作。
    • 第 99 行: TODO 【4008】
  • 第 106 行:调用 InstanceEsRegisterDAO#save(Instance) 方法,更新应用实例。

2.4 应用实例的心跳 API

我们先来看看 API 的定义,InstanceDiscoveryService.proto ,如下图所示:

一般情况下,Agent 在注册应用时候成功后,定时向 Collector 发送心跳,记录应用存活。

2.4.1 InstanceDiscoveryServiceHandler#heartbeat(…)

InstanceDiscoveryServiceHandler#heartbeat(ApplicationInstanceHeartbeat, StreamObserver<org.skywalking.apm.network.proto.Downstream>) ,目前该方法暂未实现。实现后,会首先调用一个 Service 方法,而后调用 InstanceEsRegisterDAO#updateHeartbeatTime(instanceId, heartbeatTime) 方法,记录应用实例的心跳时间。

3. Agent 调用注册 API

org.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient ,实现 BootService 、GRPCChannelListener 、Runnable 、TracingContextListener 接口,注册应用与实例的客户端。该客户端会调用上述所有 API 。

  • PROCESS_UUID 静态属性,Agent UUID ,使用 UUID 算法生成,去除多余 "-"
  • ———- 分割线 ———-
  • status 属性,gRPC 连接状态。
  • applicationRegisterServiceBlockingStub / instanceDiscoveryServiceBlockingStub / serviceNameDiscoveryServiceBlockingStub 属性,对应 gRPC 提供 API 的阻塞 Stub 。
  • needRegisterRecover 属性,是否需要发起恢复的注册。
  • 如上五个属性,在 #statusChanged(GRPCChannelStatus) 实现方法,根据 gRPC 连接状态的变更,创建或销毁 Stub 。
  • #boot() 实现方法,将自己作为监听器( 因为实现了 GRPCChannelListener 接口 )添加到 GRPCChannelManager 中,从而监听 gRPC Channel 的状态。在 《SkyWalking 源码分析 —— Agent Remote 远程通信服务》 有详细解析。
  • ———- 分割线 ———-
  • applicationRegisterFuture 属性,注册应用与实例的定时任务。
  • #boot() 实现方法,创建 applicationRegisterFuture 。该定时任务无初始化延迟,每 Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL ( 默认:3 s ) 执行一次 #run() 方法。
  • ———- 分割线 ———-
  • lastSegmentTime 属性,最后记录 Segment 的时间。
  • #afterFinished() 实现方法,记录 Segment 最后的时间。
  • #afterBoot() 实现方法,将自己作为监听器( 因为实现了 TracingContextListener 接口 )添加到 GRPCChannelManager 中,从而监听 Segment 的记录。在 《SkyWalking 源码分析 —— Agent 收集 Trace 数据》 有详细解析。

#run() 实现方法,执行应用的注册,应用实例的正常注册、恢复注册、心跳的逻辑。

666. 彩蛋

知识星球

😈 距离 Segment 已经不远了。

胖友,分享个朋友圈可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. Collector 注册相关 API
    1. 2.1. 2.1 应用的注册 API
      1. 2.1.1. 2.1.1 ApplicationRegisterServiceHandler#register(…)
      2. 2.1.2. 2.1.2 IApplicationIDService#getOrCreate(…)
      3. 2.1.3. 2.1.3 Graph#start(application)
      4. 2.1.4. 2.1.4 Application
    2. 2.2. 2.2 应用实例的正常注册 API
      1. 2.2.1. 2.2.1 InstanceDiscoveryServiceHandler#register(…)
      2. 2.2.2. 2.2.2 IInstanceIDService#getOrCreate(…)
      3. 2.2.3. 2.2.3 Graph#start(instance)
      4. 2.2.4. 2.2.4 Instance
    3. 2.3. 2.3 应用实例的恢复注册 API
      1. 2.3.1. 2.3.1 InstanceDiscoveryServiceHandler#recover(…)
      2. 2.3.2. 2.3.2 IInstanceIDService#recover(…)
    4. 2.4. 2.4 应用实例的心跳 API
      1. 2.4.1. 2.4.1 InstanceDiscoveryServiceHandler#heartbeat(…)
  3. 3. 3. Agent 调用注册 API
  4. 4. 666. 彩蛋