⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/register/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 应用与应用实例的注册。先来简单了解下注册的整体流程:

  • 应用启动,Agent 向 Collector 注册应用
  • 注册应用成功后,Agent 向 Collector 注册应用实例

下面,我们分成两个小节,分别从 API 的实现调用,分享代码的具体实现。

友情提示:推荐阅读 《探针与Collector间通讯协议》

2. Collector 注册相关 API

Collector 注册相关 API 相关有四个接口:

  • 2.1 应用的注册 API
  • 2.2 应用实例的正常注册 API
  • 2.3 应用实例的恢复注册 API
  • 2.4 应用实例的心跳 API

API 处理的流程大体如下:

  • 绿框部分,【2.3】【2.4】两个 API ,直接 Service 调用 DAO 方法,无需经过 Graph / Stream 相关方法。

2.1 应用的注册 API

我们先来看看 API 的定义,ApplicationRegisterService.proto ,如下图所示:

2.1.1 ApplicationRegisterServiceHandler#register(...)

ApplicationRegisterServiceHandler#register(Application, StreamObserver<ApplicationMapping>), 代码如下:

  • 第 51 行:获得请求的应用编码( applicationCode )数组
  • 第 54 至 64 行:循环应用编码数组,获取创建应用。
    • 第 57 行:调用 IApplicationIDService#getOrCreate(applicationCode) 方法,获取创建应用,最终获得应用编号( applicationId )。
    • 第 60 至 63 行:获得到应用编号( applicationId != 0 ),则添加到响应。为什么会存在获得不到的情况呢?在下文中,我们会看到,实际异步保存应用,所以会存在获取失败的情况。当获取失败,调用方( 例如 Agent )可以重新发起该请求进行注册应用,从而在异步保存应用,获取到应用编号。
    • 第 67 至 68 行:响应。

2.1.2 IApplicationIDService#getOrCreate(...)

org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService ,继承 Service 接口,应用编号服务接口。

org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService ,实现 IApplicationIDService 接口,应用编号服务实现类。

  • 实现了 #getOrCreate(applicationCode) 方法,代码如下:

    • 第 66 行:调用 ApplicationCacheService#get(applicationCode) 方法,从缓存中获取应用编号。在 《SkyWalking 源码分析 —— Collector Cache 缓存组件》 有详细解析。
    • 第 69 至 76 行:当获取不到应用编号时,获取 Application 对应的 Graph<Application> 对象,调用 Graph#start(application) 方法,进行流式处理。在这过程中,会保存应用到存储器。
    • 第 77 行:返回应用编号。

2.1.3 Graph#start(application)

#createApplicationRegisterGraph() 方法中,我们可以看到 Application 对应的 Graph<Application> 对象的创建。


IdAutoIncrement#increment(min, max) 方法,双向均匀自增。可能看起来比较奇怪,以上文 Application 的调用举例子:

min max result applicationCode
0 / 1 User
0 / -1 应用 A
-1 1 2 应用 B
-1 2 -2 应用 C
-2 2 3 应用 D
  • 【User】和【应用 A】是直接获得 result ,不调用 #increment(min, max) 方法。
  • 总的来说,我们可以看到,以 min + max = 0 为中心点( 实际以 0 为中心点), 双向均匀自增。

TODO 【4007】

2.1.4 Application

org.skywalking.apm.collector.storage.table.register.Application ,应用。例如记录在 ES 如下图:

2.2 应用实例的正常注册 API

我们先来看看 API 的定义,InstanceDiscoveryService ,如下图所示:

整体代码和 「2.1 应用的注册 API」 非常相似,所以本小节,更多的是提供代码的链接地址。

2.2.1 InstanceDiscoveryServiceHandler#register(...)

InstanceDiscoveryServiceHandler#register(ApplicationInstance, StreamObserver<ApplicationInstanceMapping>),注册应用实例。

2.2.2 IInstanceIDService#getOrCreate(...)

org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService ,继承 Service 接口,应用实例编号服务接口。

org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService ,实现 IInstanceIDService 接口,应用编号服务实现类。

2.2.3 Graph#start(instance)

#createInstanceRegisterGraph() 方法中,我们可以看到 Instance 对应的 Graph<Instance> 对象的创建。

2.2.4 Instance

org.skywalking.apm.collector.storage.table.register.Instance ,应用实例。例如记录在 ES 如下图:

2.3 应用实例的恢复注册 API

我们先来看看 API 的定义,InstanceDiscoveryService.proto ,如下图所示:

一般情况下,Agent 在注册应用时候成功后,如果因为各种原因原因和 Collector 断开了 gRPC Channel 连接( 例如,网络 ),恢复连接后,需要调用该 API ,进行恢复注册。

2.3.1 InstanceDiscoveryServiceHandler#recover(...)

InstanceDiscoveryServiceHandler#recover(ApplicationInstanceRecover, StreamObserver<Downstream>), 代码如下:

  • 第 71 行:调用 TimeBucketUtils#getSecondTimeBucket(time) 方法,将 registerTime 转成 timeBucket 。
  • 第 73 行:调用 IInstanceIDService#recover(instanceId, applicationId, registerTime, osInfo) 方法,恢复注册应用实例。
  • 第 75 至 76 行:响应。

2.3.2 IInstanceIDService#recover(...)

InstanceIDService#recover(instanceId, applicationId, registerTime, osInfo) 实现方法,恢复注册。代码如下:

  • 第 96 至 103 行:创建 Instance 对象,用于下面更新操作。
    • 第 99 行: TODO 【4008】
  • 第 106 行:调用 InstanceEsRegisterDAO#save(Instance) 方法,更新应用实例。

2.4 应用实例的心跳 API

我们先来看看 API 的定义,InstanceDiscoveryService.proto ,如下图所示:

一般情况下,Agent 在注册应用时候成功后,定时向 Collector 发送心跳,记录应用存活。

2.4.1 InstanceDiscoveryServiceHandler#heartbeat(...)

InstanceDiscoveryServiceHandler#heartbeat(ApplicationInstanceHeartbeat, StreamObserver<org.skywalking.apm.network.proto.Downstream>) ,目前该方法暂未实现。实现后,会首先调用一个 Service 方法,而后调用 InstanceEsRegisterDAO#updateHeartbeatTime(instanceId, heartbeatTime) 方法,记录应用实例的心跳时间。

3. Agent 调用注册 API

org.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient ,实现 BootService 、GRPCChannelListener 、Runnable 、TracingContextListener 接口,注册应用与实例的客户端。该客户端会调用上述所有 API 。

  • PROCESS_UUID 静态属性,Agent UUID ,使用 UUID 算法生成,去除多余 "-"
  • ---------- 分割线 ----------
  • status 属性,gRPC 连接状态。
  • applicationRegisterServiceBlockingStub / instanceDiscoveryServiceBlockingStub / serviceNameDiscoveryServiceBlockingStub 属性,对应 gRPC 提供 API 的阻塞 Stub 。
  • needRegisterRecover 属性,是否需要发起恢复的注册。
  • 如上五个属性,在 #statusChanged(GRPCChannelStatus) 实现方法,根据 gRPC 连接状态的变更,创建或销毁 Stub 。
  • #boot() 实现方法,将自己作为监听器( 因为实现了 GRPCChannelListener 接口 )添加到 GRPCChannelManager 中,从而监听 gRPC Channel 的状态。在 《SkyWalking 源码分析 —— Agent Remote 远程通信服务》 有详细解析。
  • ---------- 分割线 ----------
  • applicationRegisterFuture 属性,注册应用与实例的定时任务。
  • #boot() 实现方法,创建 applicationRegisterFuture 。该定时任务无初始化延迟,每 Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL ( 默认:3 s ) 执行一次 #run() 方法。
  • ---------- 分割线 ----------
  • lastSegmentTime 属性,最后记录 Segment 的时间。
  • #afterFinished() 实现方法,记录 Segment 最后的时间。
  • #afterBoot() 实现方法,将自己作为监听器( 因为实现了 TracingContextListener 接口 )添加到 GRPCChannelManager 中,从而监听 Segment 的记录。在 《SkyWalking 源码分析 —— Agent 收集 Trace 数据》 有详细解析。

#run() 实现方法,执行应用的注册,应用实例的正常注册、恢复注册、心跳的逻辑。

666. 彩蛋

知识星球

😈 距离 Segment 已经不远了。

胖友,分享个朋友圈可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. Collector 注册相关 API
    1. 2.1. 2.1 应用的注册 API
      1. 2.1.1. 2.1.1 ApplicationRegisterServiceHandler#register(...)
      2. 2.1.2. 2.1.2 IApplicationIDService#getOrCreate(...)
      3. 2.1.3. 2.1.3 Graph#start(application)
      4. 2.1.4. 2.1.4 Application
    2. 2.2. 2.2 应用实例的正常注册 API
      1. 2.2.1. 2.2.1 InstanceDiscoveryServiceHandler#register(...)
      2. 2.2.2. 2.2.2 IInstanceIDService#getOrCreate(...)
      3. 2.2.3. 2.2.3 Graph#start(instance)
      4. 2.2.4. 2.2.4 Instance
    3. 2.3. 2.3 应用实例的恢复注册 API
      1. 2.3.1. 2.3.1 InstanceDiscoveryServiceHandler#recover(...)
      2. 2.3.2. 2.3.2 IInstanceIDService#recover(...)
    4. 2.4. 2.4 应用实例的心跳 API
      1. 2.4.1. 2.4.1 InstanceDiscoveryServiceHandler#heartbeat(...)
  3. 3. 3. Agent 调用注册 API
  4. 4. 666. 彩蛋