⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.csdn.net/tianyaleixiaowu/article/details/107714868 「天涯泪小武」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

单纯netty结合protostuff进行rpc对象传输的demo网上有很多,大部分都是一个模子刻出来的,一开始我也是抄了一个,本地测试畅通无阻,未发生任何异常。部署预发环境,进行压测后,问题巨多,各种报错层出不穷。当然,压测时我用的数据量大、发送请求非常密集,单机是每秒前100ms发送2万个对象,其他900ms歇息,死循环发送,共计40台机器作为客户端,同时往2台netty Server服务器发送对象,那么平均每个server每秒大概要接收40万个对象,由于后面还有业务逻辑,逻辑每秒只能处理35万实测。

对于网上的代码,进行了多次修改,反复测试,最终是达到了不报错无异常,单机秒级接收35万个对象以上,故写篇文章记录一下,文中代码会和线上逻辑保持一致。

Protostuff序列化和反序列化

这个没什么特殊的,网上找个工具类就好了。

引入pom

<protostuff.version>1.7.2</protostuff.version>

<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>${protostuff.version}</version>
</dependency>

<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>${protostuff.version}</version>
</dependency>

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtobufIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
*
* @author wuweifeng10
* @date 2020/7/18
**/
public class ProtostuffUtils {
/**
* 避免每次序列化都重新申请Buffer空间
* 这句话在实际生产上没有意义,耗时减少的极小,但高并发下,如果还用这个buffer,会报异常说buffer还没清空,就又被使用了
*/
// private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
/**
* 缓存Schema
*/
private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();

/**
* 序列化方法,把指定对象序列化成字节数组
*
* @param obj
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
Class<T> clazz = (Class<T>) obj.getClass();
Schema<T> schema = getSchema(clazz);
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
byte[] data;
try {
data = ProtobufIOUtil.toByteArray(obj, schema, buffer);
// data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}

return data;
}

/**
* 反序列化方法,将字节数组反序列化成指定Class类型
*
* @param data
* @param clazz
* @param <T>
* @return
*/
public static <T> T deserialize(byte[] data, Class<T> clazz) {
Schema<T> schema = getSchema(clazz);
T obj = schema.newMessage();
ProtobufIOUtil.mergeFrom(data, obj, schema);
// ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
}

@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> clazz) {
Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
if (Objects.isNull(schema)) {
//这个schema通过RuntimeSchema进行懒创建并缓存
//所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
schema = RuntimeSchema.getSchema(clazz);
if (Objects.nonNull(schema)) {
schemaCache.put(clazz, schema);
}
}

return schema;
}
}

此处有坑,就是最上面大部分网上代码都是用了static的buffer。在单线程情况下没有问题。在多线程情况下,非常容易出现buffer一次使用后尚未被clear,就再次被另一个线程使用,会抛异常。而所谓的避免每次都申请buffer空间,实测性能影响极其微小。

另里面两次ProtostuffIOUtil都改成了ProtobufIOUtil,因为也是出过异常,修改后未见有异常。

自定义序列化方式

解码器decoder:

import com.jd.platform.hotkey.common.model.HotKeyMsg;
import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
* @author wuweifeng
* @version 1.0
* @date 2020-07-29
*/
public class MsgDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) {
try {

byte[] body = new byte[in.readableBytes()]; //传输正常
in.readBytes(body);

list.add(ProtostuffUtils.deserialize(body, HotKeyMsg.class));

// if (in.readableBytes() < 4) {
// return;
// }
// in.markReaderIndex();
// int dataLength = in.readInt();
// if (dataLength < 0) {
// channelHandlerContext.close();
// }
// if (in.readableBytes() < dataLength) {
// in.resetReaderIndex();
// return;
// }
//
// byte[] data = new byte[dataLength];
// in.readBytes(data);
//
// Object obj = ProtostuffUtils.deserialize(data, HotKeyMsg.class);
// list.add(obj);
} catch (Exception e) {
e.printStackTrace();
}
}
}

编码器encoder

import com.jd.platform.hotkey.common.model.HotKeyMsg;
import com.jd.platform.hotkey.common.tool.Constant;
import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* @author wuweifeng
* @version 1.0
* @date 2020-07-30
*/
public class MsgEncoder extends MessageToByteEncoder {

@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {
if (in instanceof HotKeyMsg) {
byte[] bytes = ProtostuffUtils.serialize(in);
byte[] delimiter = Constant.DELIMITER.getBytes();

byte[] total = new byte[bytes.length + delimiter.length];
System.arraycopy(bytes, 0, total, 0, bytes.length);
System.arraycopy(delimiter, 0, total, bytes.length, delimiter.length);

out.writeBytes(total);
}
}
}

先看Decoder解码器,这个是用来netty收到消息后,进行解码,将字节转为对象(自定义的HotKeyMsg)用的。里面有一堆被我注释掉了,注释掉的,应该在网上找到的帖子都是那么写的。这种方式本身在普通场景下是没问题的,解码还算正常,但是当上几十万时非常容易出现粘包问题。所以我是在这个解码器前增加了一个DelimiterBasedFrameDecoder分隔符解码器。

当收到消息时,先过这个分隔符解码器,之后到MsgDecoder那里时,就是已经分隔好的一个对象字节流了,就可以直接用proto工具类进行反序列化的。Constant.DELIMITER是我自定义的一个特殊字符串,用来做分隔符。

再看encoder,编码器,首先将要传输的对象用ProtostuffUtils序列化为byte[],然后在尾巴上挂上我自定义的那个分隔符。这样在对外发送对象时,就会走这个编码器,并被加上分隔符。

对应的server端代码大概是这样:

之后在Handler里就可以直接使用这个传输的对象了。

再看client端

和Server端是一样的,也是这几个编解码器,没有区别。因为netty和server之间通讯,我都是用的同一个对象定义。

同理handler也是一样的。

单机和集群

以上都写完后,其实就可以测试了,我们可以启动一个client,一个server,然后搞个死循环往Server发这个对象了,然后你在server端在收到这个对象后,再直接把这个对象也写回来,原样发送到客户端。会发现运行的很顺畅,每秒发N万个没问题,编解码都正常,client和server端都比较正常,当前前提是ProtoBuf的工具类和我的一样,不要共享那个buffer。网上找的文章基本上到这样也就结束了,随便发几个消息没问题也就算OK。然而实际上,这种代码上线后,会坑的不要不要的。

其实本地测试也很容易,再启动几个客户端,都连同一个Server,然后给他死循环发对象,再看看两端会不会有异常。这种情况下,和第一种的区别其实客户端没什么变化,Server端就有变化了,之前同时只给一个client发消息,现在同时给两个client发消息,这一步如果不谨慎就会出问题了,建议自行尝试。

之后,我们再加点料,我启动两个Server,分别用两个端口,线上其实是两台不同的server服务器,client会同时往两台server死循环发对象,如下图代码。

发消息,我们常用的就是channel.writeAndFlush(),大家可以把那个sync去掉,然后跑一下代码看看。会发现异常抛的一坨一坨的。我们明明是往两个不同的channel发消息,只不过时间是同时,结果就是发生了严重的粘包。server端收到的消息很多都是不规范的,会大量报错。如果在两个channel发送间隔100ms,情况就解决了。当然,最终我们可以使用sync同步发送,这样就不会抛异常了。

以上代码经测试,40台client,2台Server,平均每个server每秒大概接收40万个对象,可以持续稳定运行。特此记录。

文章目录
  1. 1. Protostuff序列化和反序列化
  2. 2. 自定义序列化方式
  3. 3. 单机和集群