《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/RabbitMQ/install/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

推荐阅读如下 RabbitMQ 文章:

1. 概述

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

FROM 《维基百科 —— RabbitMQ》

Rabbit 科技有限公司开发了 RabbitMQ ,并提供对其的支持。起初,Rabbit 科技是 LSHIFT 和 CohesiveFT 在 2007 年成立的合资企业,2010 年 4 月 被 VMware 的旗下的 SpringSource 收购。RabbitMQ 在 2013 年 5 月成为 GoPivotal 的一部分。

  • 这么一看,Spring Cloud 在消息队列主推 RabbitMQ ,可能还是有原因的,嘿嘿。

因为朋友已经写了一篇很不错的 RabbitMQ 入门文章,所以艿艿就可以光明正大的偷懒了。对 RabbitMQ 不了解的胖友,可以先阅读 《消息队列之 RabbitMQ》 文章的如下小节:

2. 单机部署

因为艿艿使用的是 macOS 操作系统,可以使用 brew 快速安装,所以本小节,我们就采用这样的方式。

其它操作系统,可以参考如下的文章,进行安装。

2.1 安装 brew

参考 brew 官网,直接在控制台输入如下命令,即可安装完成。

# 安装 brew
$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

# 更新 brew 源
brew update

如果 brew update 比较慢,可以参考如下两篇文章来解决:

2.2 安装 RabbitMQ

在命令行中,使用 brew 安装 RabbitMQ 服务。命令行操作如下:

$ brew install rabbitmq

耐心等待一会。同时,RabbitMQ 依赖的 Erlang 也会被自动安装完成。

友情提示:如果后续胖友想升级 RabbitMQ 的版本,直接执行命令 brew upgrade rabbitmq 即可。

2.3 启动 RabbitMQ

启动一个 RabbitMQ Broker 服务。命令行操作如下:

# 启动 RabbitMQ Broker 服务。通过 -detached 参数,后台运行
$ rabbitmq-server -detached
Warning: PID file not written; -detached was passed. # 可以忽略

启动完成后,查看日志。

# 查看 RabbitMQ Broker 日志。
$ tail -f /usr/local/var/log/rabbitmq/rabbit@localhost.log

2019-12-08 22:11:27.947 [info] <0.676.0> started MQTT TCP listener on [::]:1883
2019-12-08 22:11:27.964 [notice] <0.96.0> Changed loghwm of /usr/local/var/log/rabbitmq/rabbit@localhost.log to 50
2019-12-08 22:11:28.053 [info] <0.8.0> Server startup complete; 6 plugins started.
* rabbitmq_mqtt
* rabbitmq_amqp1_0
* rabbitmq_stomp
* rabbitmq_management # 默认情况下,启动了 RabbitMQ Management
* rabbitmq_management_agent
* rabbitmq_web_dispatch
2019-12-08 22:11:28.393 [notice] <0.679.0> mqtt_node: candidate -> leader in term: 3

  • 默认情况下,RabbitMQ Broker 日志文件所在地址为 /usr/local/var/log/rabbitmq/ 目录下。如果想要自定义,可以通过 config/log4j.properties 配置文件来进行修改。

😈 至此,我们已经完成了 RabbitMQ 单机部署。因为 RabbitMQ 暂未提供消息的发送和消费的脚本,所以我们只能在「5. 简单示例」中,使用 Java 代码来实现消息的发送和消费的测试。

2.4 查看 RabbitMQ 状态

RabbitMQ 提供了 rabbitmqctl 命令,可以实现 RabbitMQ 各种运维与管理的操作。这里我们使用该命令来查看 RabbitMQ 的状态。命令行操作如下:

# 查看 RabbitMQ 状态 
$ rabbitmqctl status

Runtime

OS PID: 9369
OS: macOS
Uptime (seconds): 497
RabbitMQ version: 3.8.0
Node name: rabbit@localhost
Erlang configuration: Erlang/OTP 22 [erts-10.5.1] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:192] [hipe] [dtrace]
Erlang processes: 472 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /usr/local/etc/rabbitmq/enabled_plugins
Enabled plugins:

* rabbitmq_mqtt
* rabbitmq_amqp1_0
* rabbitmq_stomp
* rabbitmq_management
* rabbitmq_management_agent
* rabbitmq_web_dispatch
* cowboy
* amqp_client
* cowlib
* amqp10_common

Data directory

Node data directory: /usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost

Config files


Log file(s)

* /usr/local/var/log/rabbitmq/rabbit@localhost.log
* /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

Alarms

(none)

Memory

Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 13.7439 gb
other_proc: 0.0338 gb (30.76 %)
code: 0.0261 gb (23.8 %)
other_system: 0.0225 gb (20.52 %)
allocated_unused: 0.0176 gb (16.07 %)
plugins: 0.0036 gb (3.25 %)
other_ets: 0.0033 gb (3.02 %)
atom: 0.0015 gb (1.38 %)
queue_procs: 0.0006 gb (0.52 %)
metrics: 0.0002 gb (0.2 %)
mgmt_db: 0.0002 gb (0.16 %)
binary: 0.0002 gb (0.16 %)
mnesia: 0.0001 gb (0.08 %)
quorum_ets: 0.0 gb (0.04 %)
msg_index: 0.0 gb (0.03 %)
quorum_queue_procs: 0.0 gb (0.01 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
reserved_unallocated: 0.0 gb (0.0 %)

File Descriptors

Total: 4, limit: 4764
Sockets: 0, limit: 4285

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 186.746 gb

Totals

Connection count: 0
Queue count: 2
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: 127.0.0.1, port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API
Interface: [::], port: 61613, protocol: stomp, purpose: STOMP
Interface: [::], port: 1883, protocol: mqtt, purpose: MQTT

  • 有点长哈,胖友简单喵一眼即可。

3. 集群部署

咳咳咳,偷懒下。胖友可以先看如下文章:

在生产环境下,必须搭建 RabbitMQ 高可用集群,不然简直是找死。

4. RabbitMQ Management

RabbitMQ 自带 Management Plugin 插件,提供 RabbitMQ 的管理界面。

The RabbitMQ management plugin provides an HTTP-based API for management and monitoring of RabbitMQ nodes and clusters, along with a browser-based UI and a command line tool, rabbitmqadmin.

RabbitMQ management plugin 提供了管理和监控 RabbitMQ 节点和集群的功能,可以有 HTTP API、浏览器 UI 界面、命令行工具 rabbitmqadmin 三种形式。

4.1 启用 Management Plugin 插件

默认情况下,Management Plugin 插件已经被启用。如果没有的话,可以通过执行 rabbitmq-plugins enable rabbitmq_management 命令来启用。

😈 如果胖友不想使用 Management Plugin 插件 插件,可以通过 rabbitmq-plugins disable rabbitmq_management 命令来禁用。

4.2 简单使用

使用浏览器,访问 http://127.0.0.1:15672/ 地址,我们就可以看到 RabbitMQ Management 的登录界面。默认情况下,我们可以使用用户名为 guest ,密码为 guest 进行登录。

登录完成后,进入如下界面:RabbitMQ Management

从图中,我们可以看到 RabbitMQ 中的一些基本概念,例如每个菜单分别是:

  • Override:整体一览。
  • Connections:网络连接,比如一个 TCP 连接。
  • Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Exchanges:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queues:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

另外,胖友可以在「Admin」菜单中,管理用户以及其权限。注意,一定要记得修改「guest/guset」这个账号哟。

更多具体的功能,胖友可以自己多多体验,艿艿就不啰嗦赘述了。

5. 简单示例

示例代码对应仓库:lab-04-rabbitmq-native

本小节,我们来看看在 Java 中,如何使用生产者 Producer 发送消息,和消费者 Consumer 消费消息。

5.1 引入依赖

pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lab-04</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-04-rabbitmq-native</artifactId>

<dependencies>
<!-- 引入 RabbitMQ 客户端依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
</dependencies>

</project>

具体每个依赖的作用,胖友自己认真看下艿艿添加的所有注释噢。

5.2 RabbitMQProducer

创建 RabbitMQProducer 类,使用 Channel 发送消息。代码如下:

// RabbitMQProducer.java

public class RabbitMQProducer {

private static final String IP_ADDRESS = "127.0.0.1";
private static final Integer PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";

private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
public static final String QUEUE_NAME = "queue_demo"; // 只有 QUEUE_NAME 需要共享给 RabbitMQConsumer

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
Connection connection = getConnection();

// 创建信道
Channel channel = connection.createChannel();

// 初始化测试用的 Exchange 和 Queue
initExchangeAndQueue(channel);

// 发送 3 条消息
for (int i = 0; i < 3; i++) {
String message = "Hello World" + i;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}

// 关闭
channel.close();
connection.close();
}

public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
return factory.newConnection();
}

// 创建 RabbitMQ Exchange 和 Queue ,然后使用 ROUTING_KEY 路由键将两者绑定。
// 该步骤,其实可以在 RabbitMQ Management 上操作,并不一定需要在代码中
private static void initExchangeAndQueue(Channel channel) throws IOException {
// 创建交换器:direct、持久化、不自动删除
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

// 创建队列:持久化、非排他、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
}

}

  • 代码比较简单,胖友根据艿艿添加的注释,理解下哈。

执行 #main(args) 方法,发送 3 条消息到 RabbitMQ 。如果没有报错,说明执行正常成功。

5.3 RabbitMQConsumer

创建 RabbitMQConsumer 类,使用 Consumer 消费消息。代码如下:

// RabbitMQConsumer.java

public class RabbitMQConsumer {

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
Connection connection = RabbitMQProducer.getConnection();

// 创建信道
final Channel channel = connection.createChannel();
channel.basicQos(64); // 设置客户端最多接收未被 ack 的消息数量为 64 。

// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 打印日志
System.out.println(String.format("[线程:%s][路由键:%s][消息内容:%s]",
Thread.currentThread(), envelope.getRoutingKey(), new String(body)));
// ack 消息已经消费
channel.basicAck(envelope.getDeliveryTag(), false);
}

};
// 订阅消费 QUEUE_NAME 队列
channel.basicConsume(RabbitMQProducer.QUEUE_NAME, consumer);

// 关闭
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException ignore) {
}
channel.close();
connection.close();
}

}

  • 代码比较简单,胖友根据艿艿添加的注释,理解下哈。

执行 #main(args) 方法,从 RabbitMQ 消费消息。执行结果如下:

[线程:Thread[pool-1-thread-4,5,main]][路由键:routingkey_demo][消息内容:Hello World0]
[线程:Thread[pool-1-thread-4,5,main]][路由键:routingkey_demo][消息内容:Hello World1]
[线程:Thread[pool-1-thread-5,5,main]][路由键:routingkey_demo][消息内容:Hello World2]

  • 消费成功,符合预期。

6. Spring Boot 使用示例

《芋道 Spring Boot 分布式消息队列 RabbitMQ 入门》 中,我们来详细学习如何在 Spring Boot 中,整合并使用 RabbitMQ 。😈 会方便很多。

7. Spring Cloud 使用示例

在如下的文章中,我们来详细学习如何在 Spring Cloud 中,整合并使用 RabbitMQ 。😈 更加方便。

666. 彩蛋

暂无彩蛋,美滋滋。

更多文章,可见 《芋道 RabbitMQ 实现原理与源码解析系统 —— 精品合集》

文章目录
  1. 1. 1. 概述
  2. 2. 2. 单机部署
    1. 2.1. 2.1 安装 brew
    2. 2.2. 2.2 安装 RabbitMQ
    3. 2.3. 2.3 启动 RabbitMQ
    4. 2.4. 2.4 查看 RabbitMQ 状态
  3. 3. 3. 集群部署
  4. 4. 4. RabbitMQ Management
    1. 4.1. 4.1 启用 Management Plugin 插件
    2. 4.2. 4.2 简单使用
  5. 5. 5. 简单示例
    1. 5.1. 5.1 引入依赖
    2. 5.2. 5.2 RabbitMQProducer
    3. 5.3. 5.3 RabbitMQConsumer
  6. 6. 6. Spring Boot 使用示例
  7. 7. 7. Spring Cloud 使用示例
  8. 8. 666. 彩蛋