⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-store-trace/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

分布式链路追踪系统,链路的追踪大体流程如下:

  1. Agent 收集 Trace 数据。
  2. Agent 发送 Trace 数据给 Collector 。
  3. Collector 接收 Trace 数据。
  4. Collector 存储 Trace 数据到存储器,例如,数据库

本文主要分享【第四部分】 SkyWalking Collector 存储 Trace 数据

友情提示:Collector 接收到 TraceSegment 的数据,对应的类是 Protobuf 生成的。考虑到更加易读易懂,本文使用 TraceSegment 相关的原始类

Collector 在接收到 Trace 数据后,经过流式处理,最终存储到存储器。如下图,红圈部分,为本文分享的内容:

2. SpanListener

《SkyWalking 源码分析 —— Collector 接收 Trace 数据》 一文中,我们看到 SegmentParse#parse(UpstreamSegment, Source) 方法中:

  • #preBuild(List<UniqueId>, SegmentDecorator) 方法中,预构建的过程中,使用 Span 监听器们,从 TraceSegment 解析出不同的数据。
  • 预构建成功后,通知 Span 监听器们,去构建各自的数据,经过流式处理,最终存储到存储器。

org.skywalking.apm.collector.agent.stream.parser.SpanListener ,Span 监听器接口

  • 定义了 #build() 方法,构建数据,执行流式处理,最终存储到存储器。

SpanListener 的子类如下图:

  • 第一层,通用接口层,定义了从 TraceSegment 解析数据的方法。
  • 第二层,业务实现层,每个实现类对应一个数据实体类,一个 Graph 对象。如下图所示:

下面,我们以每个数据实体类为中心,逐个分享。

3. GlobalTrace

org.skywalking.apm.collector.storage.table.global.GlobalTrace ,全局链路追踪,记录一次分布式链路追踪,包括的 TraceSegment 编号。


org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTraceSpanListenerGlobalTrace 的 SpanListener ,实现了 FirstSpanListener 、GlobalTraceIdsListener 接口,代码如下:

  • globalTraceIds 属性,全局链路追踪编号数组
  • segmentId 属性,TraceSegment 链路编号。
  • timeBucket 属性,时间。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId) 方法,从 Span 中解析到 segmentIdtimeBucket
  • #parseGlobalTraceId(UniqueId) 方法,解析全局链路追踪编号,添加到 globalTraceIds 数组。
  • #build() 方法,构建,代码如下:
    • 第 84 行:获取 GlobalTrace 对应的 Graph<GlobalTrace> 对象。
    • 第 86 至 92 行:循环 globalTraceIds 数组,创建 GlobalTrace 对象,逐个调用 Graph#start(application) 方法,进行流式处理。在这过程中,会保存 GlobalTrace 到存储器。

TraceStreamGraph#createGlobalTraceGraph() 方法中,我们可以看到 GlobalTrace 对应的 Graph<GlobalTrace> 对象的创建。

4. InstPerformance

旁白君:InstPerformance 和 GlobalTrace 整体比较相似,分享的会比较简洁一些。

org.skywalking.apm.collector.storage.table.instance.InstPerformance ,应用实例性能,记录应用实例每秒的请求总次数,请求总时长。


org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformanceSpanListenerInstPerformance 的 SpanListener ,实现了 FirstSpanListener 、EntrySpanListener 接口。


TraceStreamGraph#createInstPerformanceGraph() 方法中,我们可以看到 InstPerformance 对应的 Graph<InstPerformance> 对象的创建。

5. SegmentCost

旁白君:SegmentCost 和 GlobalTrace 整体比较相似,分享的会比较简洁一些。

org.skywalking.apm.collector.storage.table.segment.SegmentCost ,TraceSegment 消耗时长,记录 TraceSegment 开始时间,结束时间,花费时长等等。


org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListenerSegmentCost 的 SpanListener ,实现了 FirstSpanListener 、EntrySpanListener 、ExitSpanListener 、LocalSpanListener 接口。


TraceStreamGraph#createSegmentCostGraph() 方法中,我们可以看到 SegmentCost 对应的 Graph<SegmentCost> 对象的创建。

6. NodeComponent

org.skywalking.apm.collector.storage.table.node.NodeComponent ,节点组件。


org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentSpanListenerNodeComponent 的 SpanListener ,实现了 FirstSpanListener 、EntrySpanListener 、ExitSpanListener 接口,代码如下:

  • nodeComponents 属性,节点组件数组,一次 TraceSegment 可以经过个节点组件,例如 SpringMVC => MongoDB 。
  • segmentId 属性,TraceSegment 链路编号。
  • timeBucket 属性,时间( yyyyMMddHHmm )。
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId) 方法,从 EntrySpan 中解析到 segmentIdapplicationId ,创建 NodeComponent 对象,添加到 nodeComponents注意,EntrySpan 使用 applicationId 作为 peerId
  • #parseExit(SpanDecorator, applicationId, instanceId, segmentId) 方法,从 ExitSpan 中解析到 segmentIdpeerId ,创建 NodeComponent 对象,添加到 nodeComponents注意,ExitSpan 使用 peerId 作为 peerId
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId) 方法,从首个 Span 中解析到 timeBucket
  • #build() 方法,构建,代码如下:
    • 第 84 行:获取 NodeComponent 对应的 Graph<NodeComponent> 对象。
    • 第 86 至 92 行:循环 nodeComponents 数组,逐个调用 Graph#start(nodeComponent) 方法,进行流式处理。在这过程中,会保存 NodeComponent 到存储器。

TraceStreamGraph#createNodeComponentGraph() 方法中,我们可以看到 NodeComponent 对应的 Graph<NodeComponent> 对象的创建。

7. NodeMapping

org.skywalking.apm.collector.storage.table.node.NodeComponent ,节点匹配,用于匹配服务消费者与提供者。


org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingSpanListenerNodeMapping 的 SpanListener ,实现了 FirstSpanListener 、RefsListener 接口,代码如下:

  • nodeMappings 属性,节点匹配数组,一次 TraceSegment 可以经过个节点组件,例如调用多次远程服务,或者数据库。
  • timeBucket 属性,时间( yyyyMMddHHmm )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId) 方法,从 TraceSegmentRef 中解析到 applicationIdpeerId ,创建 NodeMapping 对象,添加到 nodeMappings
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId) 方法,从首个 Span 中解析到timeBucket
  • #build() 方法,构建,代码如下:
    • 第 84 行:获取 NodeMapping 对应的 Graph<NodeMapping> 对象。
    • 第 86 至 92 行:循环 nodeMappings 数组,逐个调用 Graph#start(nodeMapping) 方法,进行流式处理。在这过程中,会保存 NodeMapping 到存储器。

TraceStreamGraph#createNodeMappingGraph() 方法中,我们可以看到 NodeMapping 对应的 Graph<NodeMapping> 对象的创建。

8. NodeReference

org.skywalking.apm.collector.storage.table.noderef.NodeReference ,节点调用统计,用于记录服务消费者对服务提供者的调用,基于应用级别的,以分钟为时间最小粒度的聚合统计。


org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceSpanListenerNodeReference 的 SpanListener ,实现了 EntrySpanListener 、ExitSpanListener 、RefsListener 接口,代码如下:

  • references 属性,父 TraceSegment 调用产生的 NodeReference 数组。
  • nodeReferences 属性,NodeReference 数组,最终会包含 references 数组。
  • timeBucket 属性,时间( yyyyMMddHHmm )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId) 方法,代码如下:
    • 第 106 至 109 行:使用父 TraceSegment 的应用编号作为服务消费者编号,自己的应用编号作为服务提供者应用编号,创建 NodeReference 对象。
    • 第 111 行:将 NodeReference 对象,添加到 references注意,是 references ,而不是 nodeReference
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId) 方法,代码如下:
    • 作为服务提供者,接受调用。
    • ------- 父 TraceSegment 存在 --------
    • 第 79 至 85 行:references 非空,说明被父 TraceSegment 调用。因此,循环 references 数组,设置 idtimeBucket 属性( 因为 timeBucket 需要从 EntrySpan 中获取,所以 #parseRef(...) 的目的,就是临时存储父 TraceSegment 的应用编号到 references 中 )。
    • 第 87 行:调用 #buildserviceSum(...) 方法,设置调用次数,然后添加到 nodeReferences 中。
    • ------- 父 TraceSegment 不存在 --------
    • 第 91 至 97 行:使用 USER_ID 的应用编号( 特殊,代表 "用户" )作为服务消费者编号,自己的应用编号作为服务提供者应用编号,创建 NodeReference 对象。
    • 第 99 行:调用 #buildserviceSum(...) 方法,设置调用次数,然后添加到 nodeReferences 中。
  • #parseExit(SpanDecorator, applicationId, instanceId, segmentId) 方法,代码如下:
    • 作为服务消费者,发起调用。
    • 第 64 至 71 行:使用自己的应用编号作为服务消费者编号,peerId 作为服务提供者应用编号,创建 NodeReference 对象。
    • 第 73 行:调用 #buildserviceSum(...) 方法,设置调用次数,然后添加到 nodeReferences 中。
  • #build() 方法,构建,代码如下:
    • 第 84 行:获取 NodeReference 对应的 Graph<NodeReference> 对象。
    • 第 86 至 92 行:循环 nodeReferences 数组,逐个调用 Graph#start(nodeReference) 方法,进行流式处理。在这过程中,会保存 NodeReference 到存储器。

TraceStreamGraph#createNodeReferenceGraph() 方法中,我们可以看到 NodeReference 对应的 Graph<NodeReference> 对象的创建。

9. ServiceEntry

org.skywalking.apm.collector.storage.table.service.ServiceEntry ,入口操作。


org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListenerServiceEntry 的 SpanListener ,实现了 EntrySpanListener 、FirstSpanListener 、RefsListener 接口,代码如下:

  • hasReference 属性, 是否有 TraceSegmentRef 。
  • applicationId 属性,应用编号。
  • entryServiceId 属性,入口操作编号。
  • entryServiceName 属性,入口操作名。
  • hasEntry 属性,是否有 EntrySpan 。
  • timeBucket 属性,时间( yyyyMMddHHmm )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId) 方法,是否有 TraceSegmentRef 。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId) 方法,从首个 Span 中解析到 timeBucket
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId) 方法,从 EntrySpan 中解析到 applicationIdentryServiceIdentryServiceNamehasEntry
  • #build() 方法,构建,代码如下:
    • 第 96 行:只保存分布式链路的入口操作。
    • 第 98 至 103 行:创建 ServiceEntry 对象。
    • 第 107 行:获取 ServiceEntry 对应的 Graph<ServiceEntry> 对象。
    • 第 108 行:调用 Graph#start(serviceEntry) 方法,进行流式处理。在这过程中,会保存 ServiceEntry 到存储器。

TraceStreamGraph#createServiceEntryGraph() 方法中,我们可以看到 ServiceEntry 对应的 Graph<ServiceEntry> 对象的创建。

10. ServiceReference

org.skywalking.apm.collector.storage.table.serviceref.ServiceReference ,入口操作调用统计,用于记录入口操作的调用,基于入口操作级别的,以分钟为时间最小粒度的聚合统计。

  • 和 NodeReference 类似。
  • 注意,此处的 "入口操作" 不同于 ServiceEntry ,包含每一条 TraceSegment 的入口操作。
  • org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable , ServiceReference 表( service_reference )。字段如下:
    • entry_service_id :入口操作编号。
    • front_service_id :服务消费者操作编号。
    • behind_service_id :服务提供者操作编号。
    • s1_lte :( 0, 1000 ms ] 的调用次数。
    • s3_lte :( 1000, 3000 ms ] 的调用次数。
    • s5_lte :( 3000, 5000ms ] 的调用次数
    • s5_gt :( 5000, +∞ ] 的调用次数。
    • error :发生异常的调用次数。
    • summary :总共的调用次数。
    • cost_summary :总共的花费时间。
    • time_bucket :时间( yyyyMMddHHmm )。
  • org.skywalking.apm.collector.storage.es.dao.ServiceReference ,ServiceReference 的 EsDAO 。
  • 在 ES 存储例子如下图:

org.skywalking.apm.collector.agent.stream.worker.trace.segment.ServiceReferenceSpanListenerServiceReference 的 SpanListener ,实现了 EntrySpanListener 、FirstSpanListener 、RefsListener 接口,代码如下:

  • referenceServices 属性,ReferenceDecorator 数组,记录 TraceSegmentRef 数组。
  • serviceId 属性,入口操作编号。
  • startTime 属性,开始时间。
  • endTime 属性,结束时间。
  • isError 属性,是否有错误。
  • hasEntry 属性,是否有 SpanEntry 。
  • timeBucket 属性,时间( yyyyMMddHHmm )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId) 方法,将 TraceSegmentRef 添加到 referenceServices
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId) 方法,从首个 Span 中解析到 timeBucket
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId) 方法,从 EntrySpan 中解析 serviceIdstartTimeendTimeisErrorhasEntry
  • #build() 方法,构建,代码如下:
    • 第 114 行:判断 hasEntry = true ,存在 EntrySpan 。
    • --------- 有 TraceSegmentRef ---------
    • 第 117 至 120 行:创建 ServiceReference 对象,其中:
      • entryServiceId :TraceSegmentRef 的入口编号。
      • frontServiceId :TraceSegmentRef 的操作编号。
      • behindServiceId : 自己 EntrySpan 的操作编号。
    • 第 121 行:调用 #calculateCost(...) 方法,设置调用次数。
    • 第 126 行:调用 #sendToAggregationWorker(...) 方法,发送 ServiceReference 给 AggregationWorker ,执行流式处理。
    • --------- 无 TraceSegmentRef ---------
    • 第 117 至 120 行:创建 ServiceReference 对象,其中:
      • entryServiceId :自己 EntrySpan 的操作编号。
      • frontServiceIdConst.NONE_SERVICE_ID 对应的操作编号( 系统内置,代表【空】 )。
      • behindServiceId : 自己 EntrySpan 的操作编号。
    • 第 121 行:调用 #calculateCost(...) 方法,设置调用次数。
    • 第 126 行:调用 #sendToAggregationWorker(...) 方法,发送 ServiceReference 给 AggregationWorker ,执行流式处理。

TraceStreamGraph#createServiceReferenceGraph() 方法中,我们可以看到 ServiceReference 对应的 Graph<ServiceReference> 对象的创建。

11. Segment

不同于上述所有数据实体,Segment 无需解析,直接使用 TraceSegment 构建,参见如下方法:

org.skywalking.apm.collector.storage.table.segment.Segment ,全局链路追踪,记录一次分布式链路追踪,包括的 TraceSegment 编号。


TraceStreamGraph#createSegmentGraph() 方法中,我们可以看到 Segment 对应的 Graph<Segment> 对象的创建。

666. 彩蛋

知识星球

抱歉,本文可能会存在一些错误或者细节没有扣到,烦请见谅。
胖友如果有疑惑,请给我的公众号留言,一起探讨。

大量类似的内容,笔者一天都处于昏昏沉沉的状态,中间有一块不小心替换错误,实在是苦闷而又几分枯燥,不得不佩服 SkyWalking 开发者的耐心。

胖友,分享个朋友圈可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. SpanListener
  3. 3. 3. GlobalTrace
  4. 4. 4. InstPerformance
  5. 5. 5. SegmentCost
  6. 6. 6. NodeComponent
  7. 7. 7. NodeMapping
  8. 8. 8. NodeReference
  9. 9. 9. ServiceEntry
  10. 10. 10. ServiceReference
  11. 11. 11. Segment
  12. 12. 666. 彩蛋