⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://blog.csdn.net/u013256816/article/details/79955578 「朱小厮」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前言

消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的麻烦,比如消息堆积势必会影响上下游整个调用链的时效性,有些中间件如RabbitMQ在发生消息堆积时在某些情况下还会影响自身的性能。对于Kafka而言,虽然消息堆积不会对其自身性能带来多大的困扰,但难免不会影响上下游的业务,堆积过多有可能会造成磁盘爆满,或者触发日志清除策略而造成消息丢失的情况。如何利用好消息堆积这把双刃剑,监控是最为关键的一步。

正文

消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。对于Kafka而言,消息被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,虽然Partition可以继续细分为若干个段文件(Segment),但是对于上层应用来说可以将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。我们来看下图,其就是Partition的一个真实写照: img

上图中有四个概念:

  1. LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
  2. ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
  3. HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  4. LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。 img

要计算Kafka中某个消费者的滞后量很简单,首先看看其消费了几个Topic,然后针对每个Topic来计算其中每个Partition的Lag,每个Partition的Lag计算就显得非常的简单了,参考下图:

由图可知消费Lag=HW - ConsumerOffset。对于这里大家有可能有个误区,就是认为Lag应该是LEO与ConsumerOffset之间的差值,笔者在这之前也犯过这样的错误认知,详细可以参考《如何使用JMX监控Kafka》。LEO是对消费者不可见的,既然不可见何来消费滞后一说。

那么这里就引入了一个新的问题,HW和ConsumerOffset的值如何获取呢? img

首先来说说ConsumerOffset,Kafka中有两处可以存储,一个是Zookeeper,而另一个是”__consumer_offsets这个内部topic中,前者是0.8.x版本中的使用方式,但是随着版本的迭代更新,现在越来越趋向于后者。就拿1.0.0版本来说,虽然默认是存储在”__consumer_offsets”中,但是保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种方式都去拉取,然后哪个有值的取哪个。不过这里还有一个问题,对于消费位移来说,其一般不会实时的更新,而更多的是定时更新,这样可以提高整体的性能。那么这个定时的时间间隔就是ConsumerOffset的误差区间之一。

再来说说HW,其也是Kafka中Partition的一个状态。有可能你会察觉到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,但是这个值不是LEO而是HW。

那么怎样正确的计算消费的Lag呢?对Kafka熟悉的同学可能会想到Kafka中自带的kafka-consumer_groups.sh脚本中就有Lag的信息,示例如下:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID

我们深究一下kafka-consumer_groups.sh脚本,发现只有一句代码:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

其含义就是执行kafka.admin.ConsumerGroupCommand而已。进一步深究,在ConsumerGroupCommand内部抓住了2句关键代码:

val consumerGroupService = new KafkaConsumerGroupService(opts)
val (state, assignments) = consumerGroupService.describeGroup()

代码详解:consumerGroupService的类型是ConsumerGroupServicesealed trait类型),而KafkaConsumerGroupService只是ConsumerGroupService的一种实现,还有一种实现是ZkConsumerGroupService,分别对应新版的消费方式(消费位移存储在__consumer_offsets中)和旧版的消费方式(消费位移存储在zk中),详细计算步骤参考下一段落的内容。opt参数是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等参数。第2句代码是调用describeGroup()方法来获取具体的信息,即二元组中的assignments,这个assignments中保存了上面打印信息中的所有内容。

Scala小知识: 在Scala中trait(特征)相当于Java的接口,实际上它比接口更大强大。与Java中的接口不同的是,它还可以定义属性和方法的实现(JDK8起的接口默认方法)。一般情况下Scala中的类只能继承单一父类,但是如果是trait的话就可以继承多个,从结果来看是实现了多重继承。被sealed声明的trait仅能被同一文件的类继承。

ZkConsumerGroupService中计算消费lag的步骤如下:

  1. 通过zk获取一些基本信息,对应上面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不过不会有HOST和CLIENT-ID。
  2. 通过OffsetFetchRequest请求获取消费位移(offset),如果获取失败则在通过zk获取。
  3. 通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO,可见的LEO)。
  4. 计算LogEndOffset与消费位移的差值来获取lag。

KafkaConsumerGroupService中计算消费lag的步骤如下:

  1. 通过DescibeGroupsRequest请求获取一些基本信息,不仅包括TOPIC、PARTITION、CONSUMER-ID,还有HOST和CLIENT-ID。其实还有通过 FindCoordinatorRequest请求来获取coordinator信息,如果不了解coordinator在这里也没影响。
  2. 通过OffsetFetchRequest请求获取消费位移。
  3. 通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO)。
  4. 计算LogEndOffset与消费位移的差值来获取lag。

可以看到KafkaConsumerGroupService与ZkConsumerGroupService的计算Lag的方式都差不多,但是KafkaConsumerGroupService能获取更多消费详情,并且ZkConsumerGroupService也被标注为@Deprecated的了,后面内容都针对KafkaConsumerGroupService来做说明。既然Kafka已经为我们提供了线程的方法来获取Lag,那么我们有何必再重复造轮子,这里笔者写了一个调用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala语言编写的,在Java的程序里使用类似scala.collection.Seq这样的全名称以防止混淆):

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand
.PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup();
scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get();
scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator();
while (iterable.hasNext()) {
ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
pas.topic().get(), pas.partition().get(), pas.offset().get(),
pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
pas.host().get(), pas.clientId().get()));
}

在使用时,你可以封装一下这段代码然后返回一个类似List<ConsumerGroupCommand.PartitionAssignmentState>的东西给上层业务代码做进一步的使用。ConsumerGroupCommand.PartitionAssignmentState的代码如下:

case class PartitionAssignmentState(
group: String, coordinator: Option[Node], topic: Option[String],
partition: Option[Int], offset: Option[Long], lag: Option[Long],
consumerId: Option[String], host: Option[String],
clientId: Option[String], logEndOffset: Option[Long])

Scala小知识: 对于case class, 在这里你可以简单的把它看成是一个JavaBean,但是它远比JavaBean强大,比如它会自动生成equals、hashCode、toString、copy、伴生对象、apply、unapply等等东西。在 scala 中,对保护(Protected)成员的访问比 java 更严格一些。因为它只允许保护成员在定义了该成员的的类的子类中被访问。而在java中,用protected关键字修饰的成员,除了定义了该成员的类的子类可以访问,同一个包里的其他类也可以进行访问。Scala中,如果没有指定任何的修饰符,则默认为 public。这样的成员在任何地方都可以被访问。

如果你正在试着运行上面一段程序,你会发现编译失败,报错:cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。这时候需要将所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState类前面的protected修饰符去掉才能编译通过。

现实情况下你可能并不想变更kafka-core的代码然后再重新打包,而是寻求直接能够调用的东西,至于到底怎么实现将会在下一篇文章中介绍,如果你比较猴急,可以先预览一下代码的实现,具体参见:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/com/hidden/custom/kafka/admin/KafkaConsumerGroupCustomService.java。详细的逻辑解析敬请期待….

666. 彩蛋

如果你对 Kafka 并发感兴趣,欢迎加入我的知识一起交流。

知识星球

文章目录
  1. 1. 前言
  2. 2. 正文
  • 666. 彩蛋