⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/data-carrier/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 SkyWalking DataCarrier 异步处理库

基于生产者消费者的模式,大体结构如下图:

  • 实际项目中,没有 Producer 这个类。所以本文提到的 Producer ,更多的是一种角色

下面我们来看看整体的项目结构,如下图所示 :

2. buffer

org.skywalking.apm.commons.datacarrier.buffer 包,主要包含 Channels 、Buffer 两个类。Channels 是 Buffer 数组的封装。

2.1 Buffer

org.skywalking.apm.commons.datacarrier.buffer.Buffer ,缓存区。

Buffer 在保存数据时,把 buffer 作为一个 "",使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。通过这样的方式,带来良好的存储性能,避免扩容问题。But ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理。整体流程见 #save(data) 方法。

当 Buffer 被 Consumer 消费时,被调用 #obtain(start, end) 方法,获得数据并清空。为什么会带 startend 方法参数呢?下文揭晓答案。

2.2 Channels

org.skywalking.apm.commons.datacarrier.buffer.Channels ,内嵌多个 Buffer 的通道。

Channels 在保存数据时,相比 Buffer ,从 buffer 变成了多 buffer ,因此需要先选一个 buffer 。通过使用不同的 IDataPartitioner 实现类,进行 Buffer 的选择。当缓冲策略为 BufferStrategy.IF_POSSIBLE 时,根据 IDataPartitioner 定义的重试次数,进行多次保存数据直到成功。整体流程见 #save(data) 方法。

3. partition

org.skywalking.apm.commons.datacarrier.partition.IDataPartitioner ,数据分配者接口。定义了如下方法:

IDataPartitioner 目前有两个子类实现:

4. consumer

org.skywalking.apm.commons.datacarrier.consumer 包,主要包含 ConsumerPool 、ConsumerThread 、IConsumer 三个类。

  • ConsumerThread 使用 IConsumer ,消费数据
  • ConsumerPool 是 ConsumerThread 的线程池封装

4.1 IConsumer

org.skywalking.apm.commons.datacarrier.consumer.IConsumer ,消费者接口。定义了如下方法:

我们在使用时,自定义 Consumer 类,实现 IConsumer 接口。例如:RemoteMessageConsumer

4.2 ConsumerThread

org.skywalking.apm.commons.datacarrier.consumer.ConsumerThread ,继承 java.lang.Thread ,消费线程。

#run() 实现方法,不断批量的消费数据。代码如下:

  • 第 78 至 88 行:不断消费,直到线程关闭( #shutdown() )。
    • 第 80 行:调用 #consume() 方法,批量消费数据。
    • 第 82 至 87 行:当未消费到数据,说明 dataSources 为空,等待 20 ms ,避免 CPU 空跑。
  • 第 93 行:当线程关闭,调用 #consume() 方法,消费完 dataSources 剩余的数据。
  • 第 95 行:调用 IConsumer#onExit() 方法,处理当消费结束。

#consume() 方法,批量消费数据。代码如下:

  • 第 107 至 117 行:从 dataSources 中,获取要消费的数据。
  • 第 120 至 126 行:当有数据可消费时,调用 IConsumer#consume(List<T>) 方法。当消费发生异常时,调用 IConsumer#onError(List<T>, Throwable) 方法。
  • 第 127 行:返回是否有消费数据。

4.3 ConsumerPool

org.skywalking.apm.commons.datacarrier.consumer.ConsumerPool ,消费者池,提供了对 Channels 启动指定数量的 ConsumerThread 进行消费。

  • running 属性,是否运行中。
  • consumerThreads 属性,ConsumerThread 数组,通过构造方法的 num 参数进行指定。
  • channels 属性,数据通道。
  • lock 属性,锁。保证 ConsumerPool 启动或关闭时的线程安全。

#begin() 方法,启动 ConsumerPool ,进行数据消费。代码如下:

  • 第 97 至 99 行:正在运行中,直接返回。
  • 第 101 行:获得锁。
  • 第 104 行:调用 #allocateBuffer2Thread() 方法,将 channels多个 Buffer ,分配给 consumerThreads多个 ConsumerThread。
  • 第 107 至 109 行:启动每个 ConsumerThread ,开始消费。
  • 第 112 行:标记正在运行中。
  • 第 114 行:释放锁。

close() 方法,关闭 ConsumerPool 。代码如下:

  • 第 168 行:获得锁。
  • 第 169 行:标记不在运行中。
  • 第 170 至 172 行:关闭每个 ConsumerThread ,结束消费。
  • 第 174 行:释放锁。

#allocateBuffer2Thread() 方法,将 channels多个 Buffer ,分配给 consumerThreads多个 ConsumerThread。一共会有三种情况:

  • Buffer 数量等于 ConsumerThread 数量,这个十分好分配,一比一。

  • Buffer 数量大于 ConsumerThread 数量,那么按照 Buffer 数量 % ConsumerThread 数量进行分组,分配给 ConsumerThread ,如下图所示:

  • Buffer 数量大于 ConsumerThread 数量,那么按照 ConsumerThread 数量 % Buffer 数量进行分组,分配给 Buffer 。其中,一个 Buffer 会被均分给多个 ConsumerThread ,如下图所示:

4. DataCarrier

org.skywalking.apm.commons.datacarrier.DataCarrier ,DataCarrier 异步处理库的入口程序。通过创建 DataCarrier 对象,使用生产者消费者的模式,执行异步执行逻辑。

构造方法 ,代码如下:

设置消费者和消费线程数量

生产消息

关闭消费

666. 彩蛋

知识星球

本文的图,画的真难看,来自自己的吐槽,哈哈哈。

胖友,分享一波朋友圈可好。

文章目录
  1. 1. 1. 概述
  2. 2. 2. buffer
    1. 2.1. 2.1 Buffer
    2. 2.2. 2.2 Channels
  3. 3. 3. partition
  4. 4. 4. consumer
    1. 4.1. 4.1 IConsumer
    2. 4.2. 4.2 ConsumerThread
    3. 4.3. 4.3 ConsumerPool
  5. 5. 4. DataCarrier
  6. 6. 666. 彩蛋