⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 勇哥java实战分享 「勇哥」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。

这篇文章,笔者整理了 RocketMQ 源码中创建线程的几点技巧,希望大家读完之后,能够有所收获。

1. 创建单线程

首先我们先温习下常用的创建单线程的两种方式:

  • 实现 Runnable 接口
  • 继承 Thread 类

▍一、实现 Runnable 接口

图中,MyRunnable 类实现了 Runnable 接口的 run 方法,run 方法中定义具体的任务代码或处理逻辑,而Runnable 对象是作为线程构造函数的参数。

▍二、 继承 Thread 类

线程实现类直接继承 Thread ,本质上也是实现 Runnable 接口的 run 方法。

2. 单线程抽象类

创建单线程的两种方式都很简单,但每次创建线程代码显得有点冗余,于是 RocketMQ 里实现了一个抽象类 ServiceThread 。

抽象类 ServiceThread

我们可以看到抽象类中包含了如下核心方法:

  1. 定义线程名;
  2. 启动线程;
  3. 关闭线程。

下图展示了 RocketMQ 众多的单线程实现类。

实现类的编程模版类似 :

我们仅仅需要继承抽象类,并实现 getServiceNamerun 方法即可。启动的时候,调用 start 方法 , 关闭的时候调用 shutdown 方法。

3. 线程池原理

线程池是一种基于池化思想管理线程的工具,线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。关注公z号:码猿技术专栏,回复关键词:1111 获取阿里内部性能调优手册

JDK中提供的 ThreadPoolExecutor 类,是我们最常使用的线程池类。

ThreadPoolExecutor构造函数

参数名 作用
corePoolSize 队列没满时,线程最大并发数
maximumPoolSizes 队列满后线程能够达到的最大并发数
keepAliveTime 空闲线程过多久被回收的时间限制
unit keepAliveTime 的时间单位
workQueue 阻塞的队列类型
threadPoolFactory 改变线程的名称、线程组、优先级、守护进程状态
RejectedExecutionHandler 超出 maximumPoolSizes + workQueue 时,任务会交给RejectedExecutionHandler来处理

任务的调度通过执行 execute方法完成,方法的核心流程如下:

  1. 如果 workerCount < corePoolSize,创建并启动一个线程来执行新提交的任务。
  2. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  4. 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

4. 线程池封装

在 RocketMQ 里 ,网络请求都会携带命令编码,每种命令映射对应的处理器,而处理器又会注册对应的线程池。

当服务端 Broker 接收到发送消息命令时,都会有单独的线程池 sendMessageExecutor 来处理这种命令请求。

基于 ThreadPoolExecutor 做了一个简单的封装 ,BrokerFixedThreadPoolExecutor 构造函数包含六个核心参数:

  1. 核心线程数和最大线程数相同 ,数量是:cpu核数和4比较后的最小值;
  2. 空闲线程的回收的时间限制,默认1分钟;
  3. 发送消息队列,有界队列,默认10000;
  4. 线程工厂 ThreadFactoryImpl ,定义了线程名前缀:SendMessageThread_ 。

RocketMQ 实现了一个简单的线程工厂:ThreadFactoryImpl,线程工厂可以定义线程名称,以及是否是守护线程 。

线程工厂

开源项目 Cobar ,Xmemcached,Metamorphosis 中都有类似线程工厂的实现 。

5. 线程名很重要

线程名很重要,线程名很重要,线程名很重要 ,重要的事情说三遍。

我们看到 RocketMQ 中,无论是单线程抽象类还是多线程的封装都会配置线程名 ,因为通过线程名,非常容易定位问题,从而大大提升解决问题的效率。

定位的媒介常见有两种:日志文件堆栈记录

▍一、日志文件

经常处理业务问题的同学,一定都经常与日志打交道。

  • 查看 ERROR 日志,追溯到执行线程, 要是线程池隔离做的好,基本可以判断出哪种业务场景出了问题;
  • 通过查看线程打印的日志,推断线程调度是否正常,比如有的定时任务线程打印了开始,没有打印结束,推论当前线程可能已经挂掉或者阻塞。

▍二、堆栈记录

jstack 是 java 虚拟机自带的一种堆栈跟踪工具 ,主要用来查看 Java 线程的调用堆栈,线程快照包含当前 java 虚拟机内每一条线程正在执行的方法堆栈的集合,可以用来分析线程问题。

jstack -l 进程pid

笔者查看线程堆栈,一般关注如下几点:

  1. 当前 jvm 进程中的线程数量和线程分类是否在预期的范围内;
  2. 系统接口超时或者定时任务停止的异常场景下 ,分析堆栈中是否有锁未释放,或者线程一直等待网络通讯响应;
  3. 分析 jvm 进程中哪个线程占用的 CPU 最高。

6. 总结

本文是RocketMQ 系列文章的开篇,和朋友们简单聊聊 RocketMQ 源码里创建线程的技巧。

  1. 单线程抽象类 ServiceThread

    使用者只需要实现业务逻辑以及定义线程名即可 ,不需要写冗余的代码。

  2. 线程池封装

    适当封装,定义线程工厂,并合理配置线程池参数。

  3. 线程名很重要

    文件日志,堆栈记录配合线程名能大大提升解决问题的效率。

RocketMQ 的多线程编程技巧很多,比如线程通讯,并发控制,线程模型等等,后续的文章会一一为大家展现。

文章目录
  1. 1. 1. 创建单线程
  2. 2. 2. 单线程抽象类
  3. 3. 3. 线程池原理
  4. 4. 4. 线程池封装
  5. 5. 5. 线程名很重要
  6. 6. 6. 总结