⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://blog.csdn.net/u013256816/article/details/79996138 「朱小厮」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前文概述

在上一篇文章《集群管理工具KafkaAdminClient——原理与示例》中讲述了KafkaAdminClient的功能以及相应的原理,但是同时也提出了目前的KafkaAdminClient并没有非常的完善,还有许多功能还需要去丰富,这些功能可以自定义实现,在《如何获取Kafka的消费者详情——从Scala到Java的切换》一文中介绍了如何获取Kafka的消费详情,其原理是通过Java调用Kafka的Scala代码实现的,如果要使用纯Java的方式实现就需要用到了KafkaAdminClient,另外Scala版的AdminClient也被标注为:“This client is deprecated, and will be replaced by KafkaAdminClient.”,说明官方也推荐使用KafkaAdminClient。不过现在的版本(目前最新1.1.0)并没有提供类似describeConsumerGroup和listGroupOffsets的方法实现,这一点在前文《集群管理工具KafkaAdminClient——原理与示例》也有提及,所以如果要实现获取类似消费者详情的功能,那么就需要自己动手进行改造。

改造

参考Scala版的AdminClient,要实现获取Kafka的消费者详情的功能首先需要实现describeConsumerGroup和listGroupOffsets的方法,其中describeConsumerGroup方法内部还需要一个findCoordinator的方法用来提供消费者对应的coodinator节点,以便提供详细的消费者详情。describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法都将在KafkaAdminClient类里提供自定义实现。KafkaAdminClient和XXXOptions、XXXResult的类都位于org.apache.kafka.clients.admin包下,笔者也将扩展的类也置于其同一包下,不过也进行了一些区分,如下图所示,新加入的XXXOptions、XXXResult类放入extend下,新加入的JavaBean放入model下,然后与具体应用功能对应的放在app下: img

首先建立对应的XXXOptions、XXXResult类,就那简单的ListGroupOffsets来说,其ListGroupOffsetsOptions只是继承了AbstractOptions的空实现,而ListGroupOffsetsResult也很简单,提供了一个KafkaFuture的调用,代码参考如下:

public class ListGroupOffsetsResult {
private final KafkaFutureImpl<Map<TopicPartition, Long>> future;
public ListGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, Long>> future) {
this.future = future;
}
public KafkaFutureImpl<Map<TopicPartition, Long>> values(){
return this.future;
}
}

model目录下的ConsumerGroupSummary是所要实现的describeConsumerGroup方法中所要获取的值类型,封装在DescribeConsumerGroupResult 中;ConsumerSummary在describeConsumerGroup方法内部使用,用来封装消费状态,包括consumerId、clientId、host(消费者主机)以及TopicPartition列表,最终被封装进ConsumerGroupSummary中。PartitionAssignmentState是服务于KafkaConsumerGroupService的,用来最后显示消费者详情列表。

KafkaAdminClient的父类是AdminClient(kafka-client中的抽象类),describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法也需要在AdminClient类中做申明,详细参考如下:

public abstract DescribeConsumerGroupResult describeConsumerGroup(final String group,
final DescribeConsumerGroupOptions options);
public DescribeConsumerGroupResult describeConsumerGroup(final String group) {
return describeConsumerGroup(group, new DescribeConsumerGroupOptions());
}
public abstract FindCoordinatorResult findCoordinator(final String group,
final FindCoordinatorOptions options);
public FindCoordinatorResult findCoordinator(final String group) {
return findCoordinator(group, new FindCoordinatorOptions());
}
public abstract ListGroupOffsetsResult listGroupOffsets(final String group,
final ListGroupOffsetsOptions options);
public ListGroupOffsetsResult listGroupOffsets(final String group){
return listGroupOffsets(group, new ListGroupOffsetsOptions());
}

在前面2篇文章《集群管理工具KafkaAdminClient——原理与示例》和《如何获取Kafka的消费者详情——从Scala到Java的切换》中都详细解释了describeConsumerGroup、listGroupOffsets方法,所以这里不在赘述,具体实现也很简单,可以参考笔者的实现

最后来讲述一下org.apache.kafka.clients.admin.app包下的KafkaConsumerGroupService,具体代码地址在这里,其内部通过上面改造的KafkaAdminClient和KafkaConsumer来实现,其内部逻辑和《如何获取Kafka的消费者详情——从Scala到Java的切换》一文中的KafkaConsumerGroupCustomService一样,这里就不在赘述了。

本篇以及《Kafka的Lag计算误区及正确实现》、《如何获取Kafka的消费者详情——从Scala到Java的切换》这三篇文章都是围绕如何获取消费者详情来做具体的陈述,回到问题的初衷:kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,那么真的需要这么复杂的转变过程么,详细请参考下一篇《Scala与Java语言的互操作》。

666. 彩蛋

如果你对 Kafka 并发感兴趣,欢迎加入我的知识一起交流。

知识星球

文章目录
  1. 1. 前文概述
  2. 2. 改造
  • 666. 彩蛋