⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/Netty/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labslab-67 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

《芋道 Spring Boot WebSocket 入门》文章中,我们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。

然后就有胖友私信艿艿,希望使用纯 Netty 实现一个类似的功能。良心的艿艿,当然不会给她发红人卡,因此就有了本文。可能有胖友不知道 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。

Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

下面,我们来新建三个项目,如下图所示:

三个项目

另外,我们也会提供 Netty 常用功能的示例:

  • 心跳机制,实现服务端对客户端的存活检测。
  • 断线重连,实现客户端对服务端的重新连接。

不哔哔,直接开干。

友情提示:可能会胖友担心,没有 Netty 基础是不是无法阅读本文?!

艿艿的想法,看!就硬看,按照代码先自己能搭建一下哈~文末,艿艿会提供一波 Netty 基础入门的文章。

2. 构建 Netty 服务端与客户端

本小节,我们先来使用 Netty 构建服务端与客户端的核心代码,让胖友对项目的代码有个初始的认知。

2.1 构建 Netty 服务端

创建 lab-67-netty-demo-server 项目,搭建 Netty 服务端。如下图所示:

项目结构

下面,我们只会暂时看看 server 包下的代码,避免信息量过大,击穿胖友的秃头。

2.1.1 NettyServer

创建 NettyServer 类,Netty 服务端。代码如下:

@Component
public class NettyServer {

private Logger logger = LoggerFactory.getLogger(getClass());

@Value("${netty.port}")
private Integer port;

@Autowired
private NettyServerHandlerInitializer nettyServerHandlerInitializer;

/**
* boss 线程组,用于服务端接受客户端的连接
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
* worker 线程组,用于服务端接受客户端的数据读写
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* Netty Server Channel
*/
private Channel channel;

/**
* 启动 Netty Server
*/
@PostConstruct
public void start() throws InterruptedException {
// <2.1> 创建 ServerBootstrap 对象,用于 Netty Server 启动
ServerBootstrap bootstrap = new ServerBootstrap();
// <2.2> 设置 ServerBootstrap 的各种属性
bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象
.channel(NioServerSocketChannel.class) // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel
.localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口
.option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小
.childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
.childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 允许较小的数据包的发送,降低延迟
.childHandler(nettyServerHandlerInitializer);
// <2> 绑定端口,并同步等待成功,即启动服务端
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
channel = future.channel();
logger.info("[start][Netty Server 启动在 {} 端口]", port);
}
}

/**
* 关闭 Netty Server
*/
@PreDestroy
public void shutdown() {
// <3.1> 关闭 Netty Server
if (channel != null) {
channel.close();
}
// <3.2> 优雅关闭两个 EventLoopGroup 对象
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

🔥 ① 在类上,添加 @Component 注解,把 NettyServer 的创建交给 Spring 管理。

  • port 属性,读取 application.yml 配置文件的 netty.port 配置项。
  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 服务器。
  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 服务器。

🔥 ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Server 的启动。

<2.1> 处,创建 ServerBootstrap 类,Netty 提供的服务器的启动类,方便我们初始化 Server。

<2.2> 处,设置 ServerBootstrap 的各种属性。

友情提示:这里涉及较多 Netty 组件的知识,艿艿先以简单的语言描述,后续胖友在文末的 Netty 基础入门的文章,补充学噢。

<2.2.1> 处,调用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 方法,设置使用 bossGroupworkerGroup。其中:

  • bossGroup 属性:Boss 线程组,用于服务端接受客户端的连接
  • workerGroup 属性:Worker 线程组,用于服务端接受客户端的数据读写

Netty 采用的是多 Reactor 多线程的模型,服务端可以接受更多客户端的数据读写的能力。原因是:

  • 创建专门用于接受客户端连接bossGroup 线程组,避免因为已连接的客户端的数据读写频繁,影响新的客户端的连接。
  • 创建专门用于接收客户端读写workerGroup 线程组,多个线程进行客户端的数据读写,可以支持更多客户端。

课后习题:感兴趣的胖友,后续可以看看《【NIO 系列】——之 Reactor 模型》文章。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioServerSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Socket 实现类。

<2.2.3> 处,调用 #localAddress(SocketAddress localAddress) 方法,设置服务端的端口

<2.2.4> 处,调用 option#(ChannelOption<T> option, T value) 方法,设置服务端接受客户端的连接队列大小。因为 TCP 建立连接是三次握手,所以第一次握手完成后,会添加到服务端的连接队列中。

课后习题:更多相关内容,后续可以看看《浅谈 TCP Socket 的 backlog 参数》文章。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。

课后习题:更多相关内容,后续可以看看《TCP Keepalive 机制刨根问底》文章。

<2.2.6> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许较小的数据包的发送,降低延迟。

课后习题:更多相关内容,后续可以看看《详解 Socket 编程 --- TCP_NODELAY 选项》文章。

<2.2.7> 处,调用 #childHandler(ChannelHandler childHandler) 方法,设置客户端连接上来的 Channel 的处理器为 NettyServerHandlerInitializer。稍后我们在「2.1.2 NettyServerHandlerInitializer」小节来看看。

<2.3> 处,调用 #bind() + #sync() 方法,绑定端口,并同步等待成功,即启动服务端。

🔥 ③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Server 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Server,这样客户端就不再能连接了。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

2.1.2 NettyServerHandlerInitializer

在看 NettyServerHandlerInitializer 的代码之前,我们需要先了解下 Netty 的 ChannelHandler 组件,用来处理 Channel 的各种事件。这里的事件很广泛,比如可以是连接、数据读写、异常、数据转换等等。

ChannelHandler 有非常多的子类,其中有个非常特殊的 ChannelInitializer,它用于 Channel 创建时,实现自定义的初始化逻辑。这里我们创建的 NettyServerHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,代码如下:

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

/**
* 心跳超时时间
*/
private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;

@Autowired
private MessageDispatcher messageDispatcher;
@Autowired
private NettyServerHandler nettyServerHandler;

@Override
protected void initChannel(Channel ch) {
// <1> 获得 Channel 对应的 ChannelPipeline
ChannelPipeline channelPipeline = ch.pipeline();
// <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中
channelPipeline
// 空闲检测
.addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))
// 编码器
.addLast(new InvocationEncoder())
// 解码器
.addLast(new InvocationDecoder())
// 消息分发器
.addLast(messageDispatcher)
// 服务端处理器
.addLast(nettyServerHandler)
;
}

}

在每一个客户端与服务端建立完成连接时,服务端会创建一个 Channel 与之对应。此时,NettyServerHandlerInitializer 会进行执行 #initChannel(Channel c) 方法,进行自定义的初始化。

友情提示:创建的客户端的 Channel,不要和「2.1.1 NettyServer」小节的 NioServerSocketChannel 混淆,不是同一个哈。

#initChannel(Channel ch) 方法的 ch 参数,就是此时创建的客户端 Channel。

<1> 处,调用 Channel 的 #pipeline() 方法,获得客户端 Channel 对应的 ChannelPipeline。ChannelPipeline 由一系列的 ChannelHandler 组成,又或者说是 ChannelHandler 。这样, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。

<2> 处,添加五个 ChannelHandler 到 ChannelPipeline 中,每一个的作用看其上的注释。具体的,我们会在后续的小节详细解释。

2.1.3 NettyServerHandler

创建 NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立连接、断开连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private NettyChannelManager channelManager;

@Override
public void channelActive(ChannelHandlerContext ctx) {
// 从管理器中添加
channelManager.add(ctx.channel());
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
// 从管理器中移除
channelManager.remove(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
// 断开连接
ctx.channel().close();
}

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

channelManager 属性,是我们实现的客户端 Channel 的管理器。

  • #channelActive(ChannelHandlerContext ctx) 方法,在客户端和服务端建立连接完成时,调用 NettyChannelManager 的 #add(Channel channel) 方法,添加到其中
  • #channelUnregistered(ChannelHandlerContext ctx) 方法,在客户端和服务端断开连接时,调用 NettyChannelManager 的 #add(Channel channel) 方法,从其中移除

具体的 NettyChannelManager 的源码,我们在「2.1.4 NettyChannelManager」 小节中来瞅瞅~

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法,断开和客户端的连接。

2.1.4 NettyChannelManager

创建 NettyChannelManager 类,提供两种功能。

🔥 ① 客户端 Channel 的管理。代码如下:

@Component
public class NettyChannelManager {

/**
* {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户
*/
private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");

private Logger logger = LoggerFactory.getLogger(getClass());

/**
* Channel 映射
*/
private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
/**
* 用户与 Channel 的映射。
*
* 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。
*/
private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();

/**
* 添加 Channel 到 {@link #channels} 中
*
* @param channel Channel
*/
public void add(Channel channel) {
channels.put(channel.id(), channel);
logger.info("[add][一个连接({})加入]", channel.id());
}

/**
* 添加指定用户到 {@link #userChannels} 中
*
* @param channel Channel
* @param user 用户
*/
public void addUser(Channel channel, String user) {
Channel existChannel = channels.get(channel.id());
if (existChannel == null) {
logger.error("[addUser][连接({}) 不存在]", channel.id());
return;
}
// 设置属性
channel.attr(CHANNEL_ATTR_KEY_USER).set(user);
// 添加到 userChannels
userChannels.put(user, channel);
}

/**
* 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除
*
* @param channel Channel
*/
public void remove(Channel channel) {
// 移除 channels
channels.remove(channel.id());
// 移除 userChannels
if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {
userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());
}
logger.info("[remove][一个连接({})离开]", channel.id());
}
}

🔥 ② 向客户端 Channel 发送消息。代码如下:

@Component
public class NettyChannelManager {

/**
* 向指定用户发送消息
*
* @param user 用户
* @param invocation 消息体
*/
public void send(String user, Invocation invocation) {
// 获得用户对应的 Channel
Channel channel = userChannels.get(user);
if (channel == null) {
logger.error("[send][连接不存在]");
return;
}
if (!channel.isActive()) {
logger.error("[send][连接({})未激活]", channel.id());
return;
}
// 发送消息
channel.writeAndFlush(invocation);
}

/**
* 向所有用户发送消息
*
* @param invocation 消息体
*/
public void sendAll(Invocation invocation) {
for (Channel channel : channels.values()) {
if (!channel.isActive()) {
logger.error("[send][连接({})未激活]", channel.id());
return;
}
// 发送消息
channel.writeAndFlush(invocation);
}
}

}

2.1.5 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lab-67-netty-demo</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-67-netty-demo-server</artifactId>

<properties>
<!-- 依赖相关配置 -->
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<!-- 插件相关配置 -->
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Spring Boot 基础依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!-- Netty 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>

<!-- 引入 netty-demo-common 封装 -->
<dependency>
<groupId>cn.iocoder.springboot.labs</groupId>
<artifactId>lab-67-netty-demo-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>

2.1.6 NettyServerApplication

创建 NettyServerApplication 类,Netty Server 启动类。代码如下:

@SpringBootApplication
public class NettyServerApplication {

public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
}

}

2.1.7 简单测试

执行 NettyServerApplication 类,启动 Netty Server 服务器。日志如下:

... // 省略其他日志

2020-06-21 00:16:38.801 INFO 41948 --- [ main] c.i.s.l.n.server.NettyServer : [start][Netty Server 启动在 8888 端口]
2020-06-21 00:16:38.893 INFO 41948 --- [ main] c.i.s.l.n.NettyServerApplication : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)

Netty Server 启动在 8888 端口。

2.2 构建 Netty 客户端

创建 lab-67-netty-demo-client 项目,搭建 Netty 客户端。如下图所示:

项目结构

下面,我们只会暂时看看 client 包下的代码,避免信息量过大,击穿胖友的秃头。

2.2.1 NettyClient

创建 NettyClient 类,Netty 客户端。代码如下:

@Component
public class NettyClient {

/**
* 重连频率,单位:秒
*/
private static final Integer RECONNECT_SECONDS = 20;

private Logger logger = LoggerFactory.getLogger(getClass());

@Value("${netty.server.host}")
private String serverHost;
@Value("${netty.server.port}")
private Integer serverPort;

@Autowired
private NettyClientHandlerInitializer nettyClientHandlerInitializer;

/**
* 线程组,用于客户端对服务端的连接、数据读写
*/
private EventLoopGroup eventGroup = new NioEventLoopGroup();
/**
* Netty Client Channel
*/
private volatile Channel channel;

/**
* 启动 Netty Server
*/
@PostConstruct
public void start() throws InterruptedException {
// <2.1> 创建 Bootstrap 对象,用于 Netty Client 启动
Bootstrap bootstrap = new Bootstrap();
// <2.2>
bootstrap.group(eventGroup) // <2.2.1> 设置一个 EventLoopGroup 对象
.channel(NioSocketChannel.class) // <2.2.2> 指定 Channel 为客户端 NioSocketChannel
.remoteAddress(serverHost, serverPort) // <2.2.3> 指定连接服务器的地址
.option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
.option(ChannelOption.TCP_NODELAY, true) //<2.2.5> 允许较小的数据包的发送,降低延迟
.handler(nettyClientHandlerInitializer);
// <2.3> 连接服务器,并异步等待成功,即启动客户端
bootstrap.connect().addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 连接失败
if (!future.isSuccess()) {
logger.error("[start][Netty Client 连接服务器({}:{}) 失败]", serverHost, serverPort);
reconnect();
return;
}
// 连接成功
channel = future.channel();
logger.info("[start][Netty Client 连接服务器({}:{}) 成功]", serverHost, serverPort);
}

});
}

public void reconnect() {
// ... 暂时省略代码。
}

/**
* 关闭 Netty Server
*/
@PreDestroy
public void shutdown() {
// <3.1> 关闭 Netty Client
if (channel != null) {
channel.close();
}
// <3.2> 优雅关闭一个 EventLoopGroup 对象
eventGroup.shutdownGracefully();
}

/**
* 发送消息
*
* @param invocation 消息体
*/
public void send(Invocation invocation) {
if (channel == null) {
logger.error("[send][连接不存在]");
return;
}
if (!channel.isActive()) {
logger.error("[send][连接({})未激活]", channel.id());
return;
}
// 发送消息
channel.writeAndFlush(invocation);
}

}

友情提示:整体代码,是和「2.1.1 NettyServer」对等,且基本是一致的。

🔥 ① 在类上,添加 @Component 注解,把 NettyClient 的创建交给 Spring 管理。

  • serverHostserverPort 属性,读取 application.yml 配置文件的 netty.server.hostnetty.server.port 配置项。
  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 客户端。
  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 客户端。

🔥 ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Client 的启动,建立和服务器的连接。

<2.1> 处,创建 Bootstrap 类,Netty 提供的客户端的启动类,方便我们初始化 Client。

<2.2> 处,设置 Bootstrap 的各种属性。

<2.2.1> 处,调用 #group(EventLoopGroup group) 方法,设置使用 eventGroup 线程组,实现客户端对服务端的连接、数据读写。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Client 实现类。

<2.2.3> 处,调用 #remoteAddress(SocketAddress localAddress) 方法,设置连接服务端的地址

<2.2.4> 处,调用 #option(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许较小的数据包的发送,降低延迟。

<2.2.7> 处,调用 #handler(ChannelHandler childHandler) 方法,设置自己 Channel 的处理器为 NettyClientHandlerInitializer。稍后我们在「2.2.2 NettyClientHandlerInitializer」小节来看看。

<2.3> 处,调用 #connect() 方法,连接服务器,并异步等待成功,即启动客户端。同时,添加回调监听器 ChannelFutureListener,在连接服务端失败的时候,调用 #reconnect() 方法,实现定时重连。😈 具体 #reconnect() 方法的代码,我们稍后在瞅瞅哈。

③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Client 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Client,这样客户端就断开和服务端的连接。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

#send(Invocation invocation) 方法,实现向服务端发送消息。

因为 NettyClient 是客户端,所以无需像 NettyServer 一样使用「2.1.4 NettyChannelManager」维护 Channel 的集合。

2.2.2 NettyClientHandlerInitializer

创建的 NettyClientHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,实现和服务端建立连接后,添加相应的 ChannelHandler 处理器。代码如下:

@Component
public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> {

/**
* 心跳超时时间
*/
private static final Integer READ_TIMEOUT_SECONDS = 60;

@Autowired
private MessageDispatcher messageDispatcher;

@Autowired
private NettyClientHandler nettyClientHandler;

@Override
protected void initChannel(Channel ch) {
ch.pipeline()
// 空闲检测
.addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0))
.addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS))
// 编码器
.addLast(new InvocationEncoder())
// 解码器
.addLast(new InvocationDecoder())
// 消息分发器
.addLast(messageDispatcher)
// 客户端处理器
.addLast(nettyClientHandler)
;
}

}

「2.1.2 NettyServerHandlerInitializer」的代码基本一样,差别在于空闲检测额外增加 IdleStateHandler,客户端处理器换成了 NettyClientHandler

2.2.3 NettyClientHandler

创建 NettyClientHandler 类,实现客户端 Channel 断开连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private NettyClient nettyClient;

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 发起重连
nettyClient.reconnect();
// 继续触发事件
super.channelInactive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
// 断开连接
ctx.channel().close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
// 空闲时,向服务端发起一次心跳
if (event instanceof IdleStateEvent) {
logger.info("[userEventTriggered][发起一次心跳]");
HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, event);
}
}

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

#channelInactive(ChannelHandlerContext ctx) 方法,实现在和服务端断开连接时,调用 NettyClient 的 #reconnect() 方法,实现客户端定时和服务端重连

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法,断开和客户端的连接。

#userEventTriggered(ChannelHandlerContext ctx, Object event) 方法,在客户端在空闲时,向服务端发送一次心跳,即心跳机制。这块的内容,我们稍后详细讲讲。

2.2.4 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lab-67-netty-demo</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-67-netty-demo-client</artifactId>

<properties>
<!-- 依赖相关配置 -->
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<!-- 插件相关配置 -->
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 实现对 Spring MVC 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Netty 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>

<!-- 引入 netty-demo-common 封装 -->
<dependency>
<groupId>cn.iocoder.springboot.labs</groupId>
<artifactId>lab-67-netty-demo-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>

2.2.5 NettyClientApplication

创建 NettyClientApplication 类,Netty Client 启动类。代码如下:

@SpringBootApplication
public class NettyClientApplication {

public static void main(String[] args) {
SpringApplication.run(NettyClientApplication.class, args);
}

}

2.2.6 简单测试

执行 NettyClientApplication 类,启动 Netty Client 客户端。日志如下:

... // 省略其他日志

2020-06-21 09:06:12.205 INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient : [start][Netty Client 连接服务器(127.0.0.1:8888) 成功]

同时 Netty Server 服务端发现有一个客户端接入,打印如下日志:

2020-06-21 09:06:12.268  INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager     : [add][一个连接(db652822)加入]

2.3 小结

至此,我们已经构建 Netty 服务端和客户端完成。因为 Netty 提供的 API 非常便利,所以我们不会像直接使用 NIO 时,需要处理大量底层且细节的代码。

不过,如上的内容仅仅是本文的开胃菜,正片即将开始!美滋滋,继续往下看,奥利给!

3. 通信协议

「2. 构建 Netty 服务端与客户端」小节中,我们实现了客户端和服务端的连接功能。而本小节,我们要让它们两能够说上话,即进行数据的读写

在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用文本内容进行交互,数据格式一般是 JSON。但是在 TCP 的世界里,我们需要自己基于二进制构建,构建客户端和服务端的通信协议。

我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求,对应的类如下:

public class AuthRequest {

/** 用户名 **/
private String username;
/** 密码 **/
private String password;

}

  • 显然,我们无法将一个 Java 对象直接丢到 TCP Socket 当中,而是需要将其转换成 byte 字节数组,才能写入到 TCP Socket 中去。即,需要将消息对象通过序列化,转换成 byte 字节数组。
  • 同时,在服务端收到 byte 字节数组时,需要将其又转换成 Java 对象,即反序列化。不然,服务端对着一串 byte 字节处理个毛线?!

友情提示:服务端向客户端发消息,也是一样的过程哈!

序列化的工具非常多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。如下图所示:

Netty  包

但是考虑到很多胖友对 Protobuf 并不了解,因为它实现序列化又增加胖友的额外学习成本。因此,艿艿仔细一个捉摸,还是采用 JSON 方式进行序列化。可能胖友会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,我们再把字符串转换成 byte 字节数组就可以啦~

下面,我们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现我们自定义的通信协议。如下图所示:

项目结构

3.1 Invocation

创建 Invocation 类,通信协议的消息体。代码如下:

/**
* 通信协议的消息体
*/
public class Invocation {

/**
* 类型
*/
private String type;
/**
* 消息,JSON 格式
*/
private String message;

// 空构造方法
public Invocation() {
}

public Invocation(String type, String message) {
this.type = type;
this.message = message;
}

public Invocation(String type, Message message) {
this.type = type;
this.message = JSON.toJSONString(message);
}

// ... 省略 setter、getter、toString 方法
}

type 属性,类型,用于匹配对应的消息处理器。如果类比 HTTP 协议,type 属性相当于请求地址。

message 属性,消息内容,使用 JSON 格式。

另外,Message 是我们定义的消息接口。代码如下:

public interface Message {

// ... 空,作为标记接口

}

3.2 粘包与拆包

在开始看 Invocation 的编解码处理器之前,我们先了解下粘包拆包的概念。

如果的内容,引用《Netty 解决粘包和拆包问题的四种方案》文章的内容,进行二次编辑。

3.2.1 产生原因

产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。

  • 如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。

    例如说,在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。

  • 如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况:

示例图

  • A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送。
  • A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端。
  • B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。

3.2.2 解决方案

对于粘包和拆包问题,常见的解决方案有三种:

🔥 ① 客户端在发送数据包的时候,每个包都固定长度。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。

这种方式,艿艿暂时没有找到采用这种方式的案例。

🔥 ② 客户端在每个包的末尾使用固定的分隔符。例如 \r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 \r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。

具体的案例,有 HTTP、WebSocket、Redis。

🔥 ③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

友情提示:方案 ③ 是 ① 的升级版,动态长度

本文,艿艿将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。如下图所示:

Invocation 序列化

3.3 InvocationEncoder

创建 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。代码如下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {
// <2.1> 将 Invocation 转换成 byte[] 数组
byte[] content = JSON.toJSONBytes(invocation);
// <2.2> 写入 length
out.writeInt(content.length);
// <2.3> 写入内容
out.writeBytes(content);
logger.info("[encode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());
}

}

MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 <I> 消息转换成字节数组。

#encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。

<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。

<2.2> 处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「3.4 InvocationDecoder」可以根据该长度,解析到消息,解决粘包和拆包的问题

友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。

<2.3> 处,将字节数组,写入到 TCP Socket 当中。

3.4 InvocationDecoder

创建 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。代码如下:

public class InvocationDecoder extends ByteToMessageDecoder {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// <2.1> 标记当前读取位置
in.markReaderIndex();
// <2.2> 判断是否能够读取 length 长度
if (in.readableBytes() <= 4) {
return;
}
// <2.3> 读取长度
int length = in.readInt();
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
// <3.1> 如果 message 不够可读,则退回到原读取位置
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// <3.2> 读取内容
byte[] content = new byte[length];
in.readBytes(content);
// <3.3> 解析成 Invocation
Invocation invocation = JSON.parseObject(content, Invocation.class);
out.add(invocation);
logger.info("[decode][连接({}) 解析到一条消息({})]", ctx.channel().id(), invocation.toString());
}

}

ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。

② 在 <2.1><2.2><2.3> 处,从 TCP Socket 中读取长度

③ 在 <3.1><3.2><3.3> 处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。

最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,我们将在「4. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

3.5 引入依赖

创建 pom.xml 文件,引入 Netty、FastJSON 等等依赖。

  
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lab-67-netty-demo</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-67-netty-demo-common</artifactId>

<properties>
<!-- 插件相关配置 -->
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>

<dependencies>
<!-- Netty 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>

<!-- FastJSON 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>

<!-- 引入 Spring 相关依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.5.RELEASE</version>
</dependency>

<!-- 引入 SLF4J 依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>

</project>

3.6 小结

至此,我们已经完成通信协议的定义、编解码的逻辑,是不是蛮有趣的?!

另外,我们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。如下图所示:

编解码器的初始化

4. 消息分发

SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。

lab-67-netty-demo-client 项目的 dispatcher 包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

 包

下面,我们来看看具体的代码实现。

4.1 Message

创建 Message 接口,定义消息的标记接口。代码如下:

public interface Message {
}

下图,是我们涉及到的 Message 实现类。如下图所示:

Message 实现类

4.2 MessageHandler

创建 MessageHandler 接口,消息处理器接口。代码如下:

public interface MessageHandler<T extends Message> {

/**
* 执行处理消息
*
* @param channel 通道
* @param message 消息
*/
void execute(Channel channel, T message);

/**
* @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
*/
String getType();

}

  • 定义了泛型 <T> ,需要是 Message 的实现类。
  • 定义的两个接口方法,胖友自己看下注释哈。

下图,是我们涉及到的 MessageHandler 实现类。如下图所示:

MessageHandler 实现类

4.3 MessageHandlerContainer

创建 MessageHandlerContainer 类,作为 MessageHandler 的容器。代码如下:

public class MessageHandlerContainer implements InitializingBean {

private Logger logger = LoggerFactory.getLogger(getClass());

/**
* 消息类型与 MessageHandler 的映射
*/
private final Map<String, MessageHandler> handlers = new HashMap<>();

@Autowired
private ApplicationContext applicationContext;

@Override
public void afterPropertiesSet() throws Exception {
// 通过 ApplicationContext 获得所有 MessageHandler Bean
applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean
.forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
logger.info("[afterPropertiesSet][消息处理器数量:{}]", handlers.size());
}

/**
* 获得类型对应的 MessageHandler
*
* @param type 类型
* @return MessageHandler
*/
MessageHandler getMessageHandler(String type) {
MessageHandler handler = handlers.get(type);
if (handler == null) {
throw new IllegalArgumentException(String.format("类型(%s) 找不到匹配的 MessageHandler 处理器", type));
}
return handler;
}

/**
* 获得 MessageHandler 处理的消息类
*
* @param handler 处理器
* @return 消息类
*/
static Class<? extends Message> getMessageClass(MessageHandler handler) {
// 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
// 获得接口的 Type 数组
Type[] interfaces = targetClass.getGenericInterfaces();
Class<?> superclass = targetClass.getSuperclass();
while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准
interfaces = superclass.getGenericInterfaces();
superclass = targetClass.getSuperclass();
}
if (Objects.nonNull(interfaces)) {
// 遍历 interfaces 数组
for (Type type : interfaces) {
// 要求 type 是泛型参数
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// 要求是 MessageHandler 接口
if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
// 取首个元素
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return (Class<Message>) actualTypeArguments[0];
} else {
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
}
}
}
}
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}

}

① 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

② 在 #getMessageHandler(String type) 方法中,获得类型对应的 MessageHandler 对象。稍后,我们会在 MessageDispatcher 调用该方法。

③ 在 #getMessageClass(MessageHandler handler) 方法中,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

友情提示:如果胖友对 Java 的泛型机制没有做过一点了解,可能略微有点硬核。可以先暂时跳过,知道意图即可。

4.4 MessageDispatcher

创建 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。代码如下:

@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

@Autowired
private MessageHandlerContainer messageHandlerContainer;

private final ExecutorService executor = Executors.newFixedThreadPool(200);

@Override
protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {
// <3.1> 获得 type 对应的 MessageHandler 处理器
MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
// 获得 MessageHandler 处理器的消息类
Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
// <3.2> 解析消息
Message message = JSON.parseObject(invocation.getMessage(), messageClass);
// <3.3> 执行逻辑
executor.submit(new Runnable() {

@Override
public void run() {
// noinspection unchecked
messageHandler.execute(ctx.channel(), message);
}

});
}

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 <I> 泛型时。

#channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。

消息分发

<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,获得 Invocation 的 type 对应的 MessageHandler 处理器

然后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,获得 MessageHandler 处理器的消息类

<3.2> 处,调用 JSON 的 # parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的消息对象

<3.3> 处,丢到线程池中,然后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行业务逻辑

注意,为什么要丢到 executor 线程池中呢?我们先来了解下 EventGroup 的线程模型。

友情提示:在我们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。

EventGroup 我们可以先简单理解成一个线程池,并且线程池的大小仅仅是 CPU 数量 * 2。每个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。并且,多个 Channel 会共享一个线程,即使用同一个线程进行数据的读写。

那么胖友试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中,阻塞了共享当前线程的其它 Channel 的数据读取。

因此,我们在这里创建了 executor 线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。

可能会有胖友说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现阻塞数据读取的情况。

友情提示:executor 线程池,我们一般称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。

这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。

后续,胖友可以认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。

4.5 NettyServerConfig

创建 NettyServerConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyServerConfig {

@Bean
public MessageDispatcher messageDispatcher() {
return new MessageDispatcher();
}

@Bean
public MessageHandlerContainer messageHandlerContainer() {
return new MessageHandlerContainer();
}

}

4.6 NettyClientConfig

友情提示:和「4.5 NettyServerConfig」小结一致。

创建 NettyClientConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyClientConfig {

@Bean
public MessageDispatcher messageDispatcher() {
return new MessageDispatcher();
}

@Bean
public MessageHandlerContainer messageHandlerContainer() {
return new MessageHandlerContainer();
}

}

4.7 小结

后续,我们将在如下小节,具体演示消息分发的使用:

5. 断开重连

Netty 客户端需要实现断开重连机制,解决各种情况下的断开情况。例如说:

  • Netty 客户端启动时,Netty 服务端处于挂掉,导致无法连接上。
  • 在运行过程中,Netty 服务端挂掉,导致连接被断开。
  • 任一一端网络抖动,导致连接异常断开。

具体的代码实现比较简单,只需要在两个地方增加重连机制。

  • Netty 客户端启动时,无法连接 Netty 服务端时,发起重连。
  • Netty 客户端运行时,和 Netty 断开连接时,发起重连。

考虑到重连会存在失败的情况,我们采用定时重连的方式,避免占用过多资源。

5.1 具体代码

① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。代码如下:

// NettyClient.java

public void reconnect() {
eventGroup.schedule(new Runnable() {
@Override
public void run() {
logger.info("[reconnect][开始重连]");
try {
start();
} catch (InterruptedException e) {
logger.error("[reconnect][重连失败]", e);
}
}
}, RECONNECT_SECONDS, TimeUnit.SECONDS);
logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);
}

通过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起连接 Netty 服务端。

又因为 NettyClient 在 #start() 方法在连接 Netty 服务端失败时,又会调用 #reconnect() 方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端连接上 Netty 服务端。如下图所示:

NettyClient 重连

② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect() 方法,发起重连。代码如下:

// NettyClientHandler.java

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 发起重连
nettyClient.reconnect();
// 继续触发事件
super.channelInactive(ctx);
}

5.2 简单测试

① 启动 Netty Client,不要启动 Netty Server,控制台打印日志如下图:

重连失败

可以看到 Netty Client 在连接失败时,不断发起定时重连。

② 启动 Netty Server,控制台打印如下图:

重连成功

可以看到 Netty Client 成功重连上 Netty Server。

6. 心跳机制与空闲检测

在上文中,艿艿推荐胖友阅读《TCP Keepalive 机制刨根问底》文章,我们可以了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来说是可以接受的。

但是在业务层面,如果 2 小时才发现客户端与服务端的连接实际已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。

因此,我们需要在业务层面,自己实现空闲检测,保证尽快发现客户端与服务端实际已经断开的情况。实现逻辑如下:

  • 服务端发现 180 秒未从客户端读取到消息,主动断开连接。
  • 客户端发现 180 秒未从服务端读取到消息,主动断开连接。

考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加心跳机制

  • 客户端每 60 秒向服务端发起一次心跳消息,保证服务端可以读取到消息。
  • 服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端可以读取到消息。

友情提示:

  • 为什么是 180 秒?可以加大或者减小,看自己希望多快检测到连接异常。过短的时间,会导致心跳过于频繁,占用过多资源。
  • 为什么是 60 秒?三次机会,确认是否心跳超时。

虽然听起来有点复杂,但是实现起来并不复杂哈。

6.1 服务端的空闲检测

NettyServerHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

ReadTimeoutHandler

通过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开连接。

6.2 客户端的空闲检测

友情提示:和「6.1 服务端的空闲检测」一致。

NettyClientHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

ReadTimeoutHandler

通过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开连接。

6.3 心跳机制

Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。

这样,我们只需要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。如下图所示:

客户端心跳

同时,我们在服务端项目中,创建了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。代码如下:

@Component
public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void execute(Channel channel, HeartbeatRequest message) {
logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());
// 响应心跳
HeartbeatResponse response = new HeartbeatResponse();
channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response));
}

@Override
public String getType() {
return HeartbeatRequest.TYPE;
}

}

6.4 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,可以看到心跳日志如下:

// ... 客户端
2020-06-22 08:24:47.275 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler : [userEventTriggered][发起一次心跳]
2020-06-22 08:24:47.335 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][连接(44223e18) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.408 INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(44223e18) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:24:47.409 INFO 57005 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatResponseHandler : [execute][收到连接(44223e18) 的心跳响应]

// ... 服务端
2020-06-22 08:24:47.388 INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(34778465) 解析到一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.390 INFO 56998 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatRequestHandler : [execute][收到连接(34778465) 的心跳请求]
2020-06-22 08:24:47.399 INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationEncoder : [encode][连接(34778465) 编码了一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]

7. 认证逻辑

友情提示:从本小节开始,我们就具体看看业务逻辑的处理示例。

认证的过程,如下图所示:

认证流程

7.1 AuthRequest

创建 AuthRequest 类,定义用户认证请求。代码如下:

public class AuthRequest implements Message {

public static final String TYPE = "AUTH_REQUEST";

/**
* 认证 Token
*/
private String accessToken;

// ... 省略 setter、getter、toString 方法
}

这里我们使用 accessToken 认证令牌进行认证。

因为一般情况下,我们使用 HTTP 进行登录系统,然后使用登录后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。

7.2 AuthResponse

创建 AuthResponse 类,定义用户认证响应。代码如下:

public class AuthResponse implements Message {

public static final String TYPE = "AUTH_RESPONSE";

/**
* 响应状态码
*/
private Integer code;
/**
* 响应提示
*/
private String message;

// ... 省略 setter、getter、toString 方法
}

7.3 AuthRequestHandler

服务端...

创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。代码如下:

@Component
public class AuthRequestHandler implements MessageHandler<AuthRequest> {

@Autowired
private NettyChannelManager nettyChannelManager;

@Override
public void execute(Channel channel, AuthRequest authRequest) {
// <1> 如果未传递 accessToken
if (StringUtils.isEmpty(authRequest.getAccessToken())) {
AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入");
channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
return;
}

// <2> ... 此处应有一段

// <3> 将用户和 Channel 绑定
// 考虑到代码简化,我们先直接使用 accessToken 作为 User
nettyChannelManager.addUser(channel, authRequest.getAccessToken());

// <4> 响应认证成功
AuthResponse authResponse = new AuthResponse().setCode(0);
channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
}

@Override
public String getType() {
return AuthRequest.TYPE;
}

}

代码比较简单,胖友看看 <1><2><3><4> 上的注释。

7.4 AuthResponseHandler

客户端...

创建 AuthResponseHandler 类,为客户端处理服务端的认证响应。代码如下:

@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void execute(Channel channel, AuthResponse message) {
logger.info("[execute][认证结果:{}]", message);
}

@Override
public String getType() {
return AuthResponse.TYPE;
}

}

打印个认证结果,方便调试。

7.5 TestController

客户端...

创建 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。代码如下:

@RestController
@RequestMapping("/test")
public class TestController {

@Autowired
private NettyClient nettyClient;

@PostMapping("/mock")
public String mock(String type, String message) {
// 创建 Invocation 对象
Invocation invocation = new Invocation(type, message);
// 发送消息
nettyClient.send(invocation);
return "success";
}

}

7.6 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,然后使用 Postman 模拟一次认证请求。如下图所示:

Postman 模拟认证请求

同时,可以看到认证成功的日志如下:

// 客户端...
2020-06-22 08:41:12.364 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][连接(9e086597) 编码了一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.390 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(9e086597) 解析到一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]
2020-06-22 08:41:12.392 INFO 57583 --- [pool-1-thread-1] c.i.s.l.n.m.auth.AuthResponseHandler : [execute][认证结果:AuthResponse{code=0, message='null'}]

// 服务端...
2020-06-22 08:41:12.374 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(791f122b) 解析到一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.379 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [encode][连接(791f122b) 编码了一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]

8. 单聊逻辑

私聊的过程,如下图所示:

私聊流程

服务端负责将客户端 A 发送的私聊消息,转发给客户端 B。

8.1 ChatSendToOneRequest

创建 ChatSendToOneRequest 类,发送给指定人的私聊消息的请求。代码如下:

public class ChatSendToOneRequest implements Message {

public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST";

/**
* 发送给的用户
*/
private String toUser;
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;

// ... 省略 setter、getter、toString 方法
}

8.2 ChatSendResponse

创建 ChatSendResponse 类,聊天发送消息结果的响应。代码如下:

public class ChatSendResponse implements Message {

public static final String TYPE = "CHAT_SEND_RESPONSE";

/**
* 消息编号
*/
private String msgId;
/**
* 响应状态码
*/
private Integer code;
/**
* 响应提示
*/
private String message;

// ... 省略 setter、getter、toString 方法
}

8.3 ChatRedirectToUserRequest

创建 ChatRedirectToUserRequest 类, 转发消息给一个用户的请求。代码如下:

public class ChatRedirectToUserRequest implements Message {

public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST";

/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;

// ... 省略 setter、getter、toString 方法
}

友情提示:写完之后,艿艿突然发现少了一个 fromUser 字段,表示来自谁的消息。

8.4 ChatSendToOneHandler

服务端...

创建 ChatSendToOneHandler 类,为服务端处理客户端的私聊请求。代码如下:

@Component
public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {

@Autowired
private NettyChannelManager nettyChannelManager;

@Override
public void execute(Channel channel, ChatSendToOneRequest message) {
// <1> 这里,假装直接成功
ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));

// <2> 创建转发的消息,发送给指定用户
ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
}

@Override
public String getType() {
return ChatSendToOneRequest.TYPE;
}

}

代码比较简单,胖友看看 <1><2> 上的注释。

8.5 ChatSendResponseHandler

客户端...

创建 ChatSendResponseHandler 类,为客户端处理服务端的聊天响应。代码如下:

@Component
public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void execute(Channel channel, ChatSendResponse message) {
logger.info("[execute][发送结果:{}]", message);
}

@Override
public String getType() {
return ChatSendResponse.TYPE;
}

}

打印个聊天发送结果,方便调试。

8.6 ChatRedirectToUserRequestHandler

客户端

创建 ChatRedirectToUserRequestHandler 类,为客户端处理服务端的转发消息的请求。代码如下:

@Component
public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void execute(Channel channel, ChatRedirectToUserRequest message) {
logger.info("[execute][收到消息:{}]", message);
}

@Override
public String getType() {
return ChatRedirectToUserRequest.TYPE;
}

}

打印个聊天接收消息,方便调试。

8.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。如下图所示:

Postman 模拟认证请求

③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。如下图所示:

IDEA 设置

然后使用 Postman 模拟一次认证请求(用户为 tutou)。如下图所示:

Postman 模拟认证请求

④ 最后使用 Postman 模拟一次 yunai 芋艿给 tutou 土豆发送一次私聊消息。如下图所示:

Postman 模拟私聊请求

同时,可以看到客户端 A 向客户端 B 发送私聊消息的日志如下:

// 客户端 A...(芋艿)
2020-06-22 08:48:09.505 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tudou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:09.510 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:09.511 INFO 57583 --- [ool-1-thread-69] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]
2020-06-22 08:48:35.148 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.150 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.150 INFO 57583 --- [ool-1-thread-70] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]

// 服务端 ...
2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.149 INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]

// 客户端 B...(秃头)
2020-06-22 08:48:18.277 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler : [userEventTriggered][发起一次心跳]
2020-06-22 08:48:18.278 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [encode][连接(24fbc3e8) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:48:18.280 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:48:18.281 INFO 59613 --- [pool-1-thread-4] c.i.s.l.n.m.h.HeartbeatResponseHandler : [execute][收到连接(24fbc3e8) 的心跳响应]
2020-06-22 08:48:35.150 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]
2020-06-22 08:48:35.151 INFO 59613 --- [pool-1-thread-5] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='1', content='你猜'}]

9. 群聊逻辑

群聊的过程,如下图所示:

群聊流程

服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。

友情提示:考虑到逻辑简洁,艿艿提供的本小节的示例,并不是一个一个群,而是所有人在一个大的群聊中哈~

9.1 ChatSendToAllRequest

创建 ChatSendToOneRequest 类,发送给所有人的群聊消息的请求。代码如下:

public class ChatSendToAllRequest implements Message {

public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;

// ... 省略 setter、getter、toString 方法
}

友情提示:如果是正经的群聊,会有一个 groupId 字段,表示群编号。

9.2 ChatSendResponse

「8.2 ChatSendResponse」小节一致。

9.3 ChatRedirectToUserRequest

「8.3 ChatRedirectToUserRequest」小节一致。

9.4 ChatSendToAllHandler

服务端...

创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。代码如下:

@Component
public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {

@Autowired
private NettyChannelManager nettyChannelManager;

@Override
public void execute(Channel channel, ChatSendToAllRequest message) {
// <1> 这里,假装直接成功
ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));

// <2> 创建转发的消息,并广播发送
ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
}

@Override
public String getType() {
return ChatSendToAllRequest.TYPE;
}

}

代码比较简单,胖友看看 <1><2> 上的注释。

9.5 ChatSendResponseHandler

「8.5 ChatSendResponseHandler」小节一致。

9.6 ChatRedirectToUserRequestHandler

「8.6 ChatRedirectToUserRequestHandler」小节一致。

9.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。如下图所示:

Postman 模拟认证请求

③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。

IDEA 设置

④ 启动 Netty Client 客户端 C。注意,需要设置 --server.port 端口为 8082,避免冲突。

IDEA 设置

⑤ 最后使用 Postman 模拟一次发送群聊消息。如下图所示:

Postman 模拟群聊请求

同时,可以看到客户端 A 群发给所有客户端的日志如下:

// 客户端 A...
2020-06-22 08:55:44.898 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901 INFO 57583 --- [ol-1-thread-148] c.i.s.l.n.m.c.ChatSendResponseHandler : [execute][发送结果:ChatSendResponse{msgId='2', code=0, message='null'}]
2020-06-22 08:55:44.901 INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903 INFO 57583 --- [ol-1-thread-149] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

// 服务端...
2020-06-22 08:55:44.898 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901 INFO 56998 --- [ntLoopGroup-3-4] c.i.s.l.n.codec.InvocationEncoder : [decode][连接(9dc03826) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]

// 客户端 B...
2020-06-22 08:55:44.902 INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.902 INFO 59613 --- [ool-1-thread-83] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

// 客户端 C...
2020-06-22 08:55:44.901 INFO 61597 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder : [decode][连接(9128c71c) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903 INFO 61597 --- [ool-1-thread-16] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

666. 彩蛋

至此,我们已经通过 Netty 实现了一个简单的 IM 功能,是不是收获蛮大的,嘿嘿。

下面,良心的艿艿,再来推荐一波文章,嘿嘿。

等后续,艿艿会在 https://github.com/YunaiV/onemall 开源项目中,实现一个相对完整的客服功能,哈哈哈~

文章目录
  1. 1. 1. 概述
  2. 2. 2. 构建 Netty 服务端与客户端
    1. 2.1. 2.1 构建 Netty 服务端
      1. 2.1.1. 2.1.1 NettyServer
      2. 2.1.2. 2.1.2 NettyServerHandlerInitializer
      3. 2.1.3. 2.1.3 NettyServerHandler
      4. 2.1.4. 2.1.4 NettyChannelManager
      5. 2.1.5. 2.1.5 引入依赖
      6. 2.1.6. 2.1.6 NettyServerApplication
      7. 2.1.7. 2.1.7 简单测试
    2. 2.2. 2.2 构建 Netty 客户端
      1. 2.2.1. 2.2.1 NettyClient
      2. 2.2.2. 2.2.2 NettyClientHandlerInitializer
      3. 2.2.3. 2.2.3 NettyClientHandler
      4. 2.2.4. 2.2.4 引入依赖
      5. 2.2.5. 2.2.5 NettyClientApplication
      6. 2.2.6. 2.2.6 简单测试
    3. 2.3. 2.3 小结
  3. 3. 3. 通信协议
    1. 3.1. 3.1 Invocation
    2. 3.2. 3.2 粘包与拆包
      1. 3.2.1. 3.2.1 产生原因
      2. 3.2.2. 3.2.2 解决方案
    3. 3.3. 3.3 InvocationEncoder
    4. 3.4. 3.4 InvocationDecoder
    5. 3.5. 3.5 引入依赖
    6. 3.6. 3.6 小结
  4. 4. 4. 消息分发
    1. 4.1. 4.1 Message
    2. 4.2. 4.2 MessageHandler
    3. 4.3. 4.3 MessageHandlerContainer
    4. 4.4. 4.4 MessageDispatcher
    5. 4.5. 4.5 NettyServerConfig
    6. 4.6. 4.6 NettyClientConfig
    7. 4.7. 4.7 小结
  5. 5. 5. 断开重连
    1. 5.1. 5.1 具体代码
    2. 5.2. 5.2 简单测试
  6. 6. 6. 心跳机制与空闲检测
    1. 6.1. 6.1 服务端的空闲检测
    2. 6.2. 6.2 客户端的空闲检测
    3. 6.3. 6.3 心跳机制
    4. 6.4. 6.4 简单测试
  7. 7. 7. 认证逻辑
    1. 7.1. 7.1 AuthRequest
    2. 7.2. 7.2 AuthResponse
    3. 7.3. 7.3 AuthRequestHandler
    4. 7.4. 7.4 AuthResponseHandler
    5. 7.5. 7.5 TestController
    6. 7.6. 7.6 简单测试
  8. 8. 8. 单聊逻辑
    1. 8.1. 8.1 ChatSendToOneRequest
    2. 8.2. 8.2 ChatSendResponse
    3. 8.3. 8.3 ChatRedirectToUserRequest
    4. 8.4. 8.4 ChatSendToOneHandler
    5. 8.5. 8.5 ChatSendResponseHandler
    6. 8.6. 8.6 ChatRedirectToUserRequestHandler
    7. 8.7. 8.7 简单测试
  9. 9. 9. 群聊逻辑
    1. 9.1. 9.1 ChatSendToAllRequest
    2. 9.2. 9.2 ChatSendResponse
    3. 9.3. 9.3 ChatRedirectToUserRequest
    4. 9.4. 9.4 ChatSendToAllHandler
    5. 9.5. 9.5 ChatSendResponseHandler
    6. 9.6. 9.6 ChatRedirectToUserRequestHandler
    7. 9.7. 9.7 简单测试
  10. 10. 666. 彩蛋