《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-streaming-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Collector Streaming 流式处理。主要包含如下部分:

  • apm-collector-core 模块的 graph 包,提供最精简单节点的流式处理的封装。如下图所示:

  • apm-collector-stream 模块,在 graph 包的基础上,提供异步跨节点等等的流式处理的封装。如下图所示:

免打脸大保健:笔者对流式处理非常不了解,本文可能是一本正经的胡说八道。考虑到笔者是靠脸吃饭(颜值我只服我红雷哥),所以读者老爷请爱护下笔者。

Collector Streaming 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

OK,下面来一本正经的代码走起!

2. apm-collector-core/graph

整体类图如下:

看起来略复杂,不要方,我们先来看一个流式大数据处理框架 Apache Storm 的说明:

FROM 《流式大数据处理的三种框架:Storm,Spark和Samza》
Storm 中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码,将任务分配给工作节点(worker node)执行。

  • Graph :定义了一个数据各个 Node 的处理拓扑图。
  • WayToNode :提交数据给 Node 的方式
  • Node :节点,包含一个 NodeProcessor 和 一个 Next 。
    • NodeProcessor :Node 处理器,处理数据
    • Next :包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组方式

整体交互流程如下:

  • 粉色箭头:当数据进来时,提交给 Grpah 。按照定义的拓扑图,使用 NodeWay 提交给 Node ,NodeProcessor 进行处理。
  • 蓝色箭头:当 NodeProcessor 处理完成后,Next 逐个使用 NodeWay 数组提交给下面的 Node ,继续处理。
    • ps :注意,这块流程,根据不同的 NodeProcessor 的实现类会有不同,蓝色箭头的过程,只是其中的一种,下面会详细解析。

整体顺序图如下:

  • DirectWay 是 WayToNode 接口的一种实现,正如其名,直接提交数据给 Node 。在 「3. apm-collector-stream」 会看到其他实现,例如提交到其他服务器节点的 Node,从而实现跨服务器节点的流式处理。
  • AbstractWorker 在 apm-collector-stream 模块,是 NodeProcessor 接口的一种实现,处理提交给 Node 的数据。在 #onWork(message) 抽象方法里,子类可以实现该方法,根据自身需求,是否调用 #onNext(message) 方法,Next 逐个使用 NodeWay 数组提交给下面的 Node ,继续处理。

下面,我们来详细分别看看如下逻辑的详细代码实现:

  • Graph 创建
  • Graph 启动

2.1 Graph 创建

创建 Graph 的顺序图如下:

  • 第一步,调用 GraphManager#createIfAbsent(graphId, input) 方法( input 参数没用 ),创建一个 Graph 对象。
  • 第二步,调用 Graph#addNode(WayToNode) 方法,创建该 Graph 的首个 Node 对象。
  • 第三步,调用 Node#addNext(WayToNode) 方法,创建该 Node 的下一个 Node 对象。

如下是 collector-agent-stream-provider 模块,TraceStreamGraph#createServiceReferenceGraph() 方法的代码:

public void createServiceReferenceGraph() {
QueueCreatorService<ServiceReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
}

让我们来看看每个方法的具体代码实现。


第一步

GraphManager#createIfAbsent(graphId, input) 方法,创建一个 Graph 对象,并添加到 Graph 映射。代码如下:

  • INSTANCE 属性,单例。
  • allGraphs 属性,Graph 映射。其中映射的 KEY 为每个 Graph 全局唯一编号。在 JvmMetricStreamGraphRegisterStreamGraphTraceStreamGraph 类中,枚举了实际使用的 Graph 编号们。
  • 第 50 至 58 行:当 Graph 映射里不存在指定 Graph 编号时,创建 Graph 对象,并返回。

第二步

Graph#addNode(WayToNode) 方法,创建该 Graph 的首个 Node 对象。代码如下:

  • id 属性,Graph 编号。
  • entryWay首个提交数据给 Node 的方式。
  • 第 58 行 :将方法参数 entryWay 赋值给 this.entryWay 属性。在下分享的 Graph#start(input) 方法里,我们会看到这是 Graph 启动的入口,首个提交给 Node 的方式。
  • 第 60 至 62 行 :调用 WayToNode#buildDestination(Graph) 方法,创建 Node 对象,并返回该 Node 。在上文中,我们已经说过创建的 Node 对象,为该 Graph 的首个 Node 。

WayToNode#buildDestination() 方法,创建该 WayToNode 的 Node 对象。代码如下:

  • destination 属性,目标 Node 。即该 WayToNode 提交数据到的 Node 。
  • destinationHandler 属性,目标 Node 的处理器。见 #out(INPUT) 方法。
  • 第 42 行:创建 Node 对象。
    • 目前,destinationHandler 属性,除了用于创建 Node 对象,无其他用途。

Node 构造方法 方法,代码如下:

  • nodeProcessor 属性,节点处理器。
  • next 属性,包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组的方式。
  • 第 44 行:调用 Graph#checkForNewNode(Node) 方法,校验 Node 的 NodeProcessor 在其 Graph 里,编号唯一