⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-naming-server/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Collector Naming Server 命名服务。主要包含如下部分:

  • Collector Naming Server 提供 Http 两个接口,提供 Agent 分别查询 Collector Agent Jetty Server 、Collector Agent gRPC Server 集群。
  • Collector Agent Jetty Server 、Collector Agent gRPC Server 集群内部的注册与发现。

友情提示,建议胖友已经读过 《SkyWalking 源码分析 —— Collector Server Component 服务器组件》《SkyWalking 源码分析 —— Collector Server Component 服务器组件》

Collector Agent Server ( 包括 Jetty 和 gRPC ),提供上传调用链路,JVM Metric 等等 API 给 Agent 调用。 Agent 通过 Collector Naming Server 调用 Collector Agent Server 的 API ,查询 Collector Agent Server 最新的集群地址。

Naming Server 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

2. Collector Naming Server

Collector Naming Server 通过 apm-collector-naming 项目实现,其中:

  • collector-naming-define 项目:定义了 Naming Server 的接口。
  • collector-naming-jetty-provider 项目:基于 Jetty Server 的 Naming Server 实现。

2.1 NamingModule

org.skywalking.apm.collector.cluster.ClusterModule ,实现 Module 抽象类,集群管理 Module 。

#name() 实现方法,返回模块名为 "naming"

#services() 实现方法,返回 Service 类名:NamingHandlerRegisterService 。

2.2 NamingModuleJettyProvider

org.skywalking.apm.collector.naming.jetty.NamingModuleJettyProvider ,实现 ModuleProvider 抽象类,基于 Jetty 的命名组件服务提供者实现类。

#name() 实现方法,返回组件服务提供者名为 "jetty"

module() 实现方法,返回组件类为 NamingModule 。

#requiredModules() 实现方法,返回依赖组件为 ClusterModule 、JettyManagerModule。


#prepare(Properties) 实现方法,执行准备阶段逻辑。

  • 第 55 行 :创建 ZookeeperModuleListenerService / NamingJettyHandlerRegisterService 对象,并调用 #registerServiceImplementation() 父类方法,注册到 services

#start() 实现方法,执行启动阶段逻辑。

#notifyAfterCompleted() 实现方法,执行启动完成逻辑。目前是个空方法。

2.3 NamingHandlerRegisterService

org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService ,继承 Service 接口,命名处理器注册服务接口

#register(ServerHandler) 接口方法,注册 Server 请求处理器。Collector Agent Server 会调用该方法,将其实现的 用于 Naming 的 ServerHandler 进行注册。如下图所示:

2.3.1 NamingJettyHandlerRegisterService

org.skywalking.apm.collector.naming.jetty.service.service.NamingJettyHandlerRegisterService ,基于 Jetty 的命名处理器注册服务实现类

#register(moduleName, providerName, registration) 实现方法,调用 JettyManagerService#addHandler(path, registration) 方法,注册 Jetty Server 请求处理器。

2.3.2 AgentJettyNamingHandler

org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler ,实现 JettyHandler 抽象类,Collector Agent Jetty Server 实现的命名处理器。

#pathSpec() 实现方法,获得请求路径为 "/agent/jetty"

#doGet() 实现方法,调用 AgentJettyNamingListener#getAddresses() 方法,获得 Collector Agent Jetty Server 集群地址。

2.3.3 AgentGRPCNamingHandler

org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler ,实现 JettyHandler 抽象类,Collector Agent gRPC Server 实现的命名处理器。

#pathSpec() 实现方法,获得请求路径为 "/agent/gRPC"

#doGet() 实现方法,调用 AgentGRPCNamingListener#getAddresses() 方法,获得 Collector Agent gRPC Server 集群地址。

2.4 配置文件

配置文件如下 :

  • 配置 Naming Server 启动在 10800 端口。
  • Naming Server 内嵌在 Collector Server 。通过启动多个 Collector Server 节点,形成 Naming Server 集群。Agent 配置多个 Naming Server 地址。

3. CollectorDiscoveryService

org.skywalking.apm.agent.core.remote.CollectorDiscoveryService , 实现 Agent 的 BootService 接口,Collector Agent Server 地址发现服务。

#boot() 实现方法,调用 ScheduledExecutorService#scheduleAtFixedRate(...) 方法,创建定时任务。该定时任务无初始化延迟,每 Config.Collector.DISCOVERY_CHECK_INTERVAL ( 默认:60 s ) 执行一次。

  • DiscoveryRestServiceClient 实现 java.lang.Runnable 接口,即创建的任务。

3.1 CollectorDiscoveryService

org.skywalking.apm.agent.core.remote.CollectorDiscoveryService ,实现 java.lang.Runnable 接口,Collector 服务发现客户端,基于 Rest 方式通信。

构造方法 ,首先随机选择一个 Collector Naming Server ,用于下面 #findServerList() 方法,首次获取 Collector Agent Server 集群地址。

#run() 实现方法,调用 #findServerList() 方法,获取 Collector Agent Server 集群地址。

#findServerList() 方法,获取 Collector Agent Server 集群地址。

  • 第 85 行 :创建 org.apache.http.impl.client.CloseableHttpClient 对象。目前使用 HttpClient 4.5.3 版本。
  • 第 87 行 :调用 #buildGet() 方法,创建 org.apache.http.client.methods.HttpGet 对象。目前 Agent 查询的是 Collector Agent gRPC Server 集群地址,因为 gRPC 的性能相比 HTTP 更优秀。
  • 第 89 行 :向 Collector Naming Server 发起请求。
  • 第 90 至 93 行 :当响应状态码非 200 时,调用 #findBackupServer() 方法,顺序选择 Collector Naming Server 列表的下一个。注意,此时不会再发起请求,需要等下一次执行。
  • 第 95 至 111 行 :处理响应结果,若 Collector Agent gRPC Server 集群地址发生变化,进行更新到 RemoteDownstreamConfig.Collector.GRPC_SERVERS
  • 第 114 至 117 行 :请求发生异常,调用 #findBackupServer() 方法,顺序选择 Collector Naming Server 列表的下一个。
  • 第 119 行 :调用 CloseableHttpClient#close() 方法,进行关闭。

3.2 配置文件

配置文件如下 :

  • 生产环境使用时,推荐 Agent 配置多个 Naming Server 地址。

666. 彩蛋

知识星球

2017.12.15 归途 从北京回上海。

突然有种感觉,获得像包方便面,快捷而不营养。

有点"丧"。

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. Collector Naming Server
    1. 2.1. 2.1 NamingModule
    2. 2.2. 2.2 NamingModuleJettyProvider
    3. 2.3. 2.3 NamingHandlerRegisterService
      1. 2.3.1. 2.3.1 NamingJettyHandlerRegisterService
      2. 2.3.2. 2.3.2 AgentJettyNamingHandler
      3. 2.3.3. 2.3.3 AgentGRPCNamingHandler
    4. 2.4. 2.4 配置文件
  3. 3. 3. CollectorDiscoveryService
    1. 3.1. 3.1 CollectorDiscoveryService
    2. 3.2. 3.2 配置文件
  4. 4. 666. 彩蛋