自定义协议
1. 自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包,比如java的cafebabe
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊... 跟业务相关
- 请求序号,为了双工通信,提供异步能力, 也就是比如第1、2、3条消息请求,异步也就是并行处理后回复的时候,需要再次带上请求序号,方便客户端识别请求和响应的对应关系。
- 正文长度
- 消息正文
其中序列化算法主要应用在消息正文。
2. 设计聊天室协议
2.1 编写编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用Netty完成收发。
- 添加maven依赖
xml
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.6.2</version>
</dependency>
kryo是一个java高性能的序列化组件。
2. 编写WXMessageCodec类用于编解码消息:
java
public class WXMessageCodec extends ByteToMessageCodec<WXMessage> {
Logger log = LoggerFactory.getLogger(WXMessageCodec.class);
@Override
protected void encode(ChannelHandlerContext ctx, WXMessage msg, ByteBuf out) throws Exception {
// 1. 2 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
Kryo kryo = new Kryo();
kryo.register(LoginMessage.class);
Output output = new Output(10, -1);
kryo.writeObject(output, msg);
byte[] bytes = output.getBuffer();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Kryo kryo = new Kryo();
kryo.register(LoginMessage.class);
LoginMessage wxMessage = kryo.readObject(new Input(bytes), LoginMessage.class);
log.debug("magicNum:{}, version:{}, serializerType:{}, messageType:{}, sequenceId:{}, length:{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("wxMessage:{}, name:{}", wxMessage, wxMessage.getUsername());
out.add(wxMessage);
}
}
2.2 编写测试代码
java
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(LogLevel.INFO),
new WXMessageCodec());
LoginMessage loginMessage = new LoginMessage();
loginMessage.setUsername("jack");
loginMessage.setPassword("jack");
// 测试encode
channel.writeOutbound(loginMessage);
// 测试decode, 需要手动写入buffer
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
new WXMessageCodec().encode(null, loginMessage, buffer);
channel.writeInbound(buffer);
}
运行结果: 但如果数据发来的是不完整的数据,比如:
java
// slice分片
ByteBuf slice1 = buffer.slice(0, 10);
// 将不完整消息发送出去
channel.writeInbound(slice1);
运行结果: 因此测试代码需要加上LengthFieldBasedFrameDecoder进行粘包半包处理:
java
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(LogLevel.INFO),
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new WXMessageCodec());
LoginMessage loginMessage = new LoginMessage();
loginMessage.setUsername("jack");
loginMessage.setPassword("jack");
// 测试encode
channel.writeOutbound(loginMessage);
// 测试decode, 需要手动写入buffer
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
new WXMessageCodec().encode(null, loginMessage, buffer);
ByteBuf slice1 = buffer.slice(0, 10);
ByteBuf slice2 = buffer.slice(10, buffer.readableBytes()-10);
/**
* writeInbound内部会释放slice1的引用,而slice1引用的是buffer的地址,
* 最终会影响slice2的使用,因此需要手动retain,否则会出现异常
*/
slice1.retain();
channel.writeInbound(slice1);
channel.writeInbound(slice2);
}
运行结果:
3. 编解码器优化
自定义协议的编解码器可以考虑重用,使用单例:
java
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 将handler提前创建单例
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
WXMessageCodec wxMessageCodec = new WXMessageCodec();
try {
serverBootstrap.group(group);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 重复使用loggingHandler
ch.pipeline().addLast(loggingHandler);
// LengthFieldBasedFrameDecoder涉及处理半包粘包,没有@Shareable注解,每次都需要创建
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
// 重复使用wxMessageCodec
ch.pipeline().addLast(wxMessageCodec);
}
});
ChannelFuture channelFuture = serverBootstrap.bind("192.168.101.1", 8080);
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
启动发现报错: 因为WXMessageCodec的父类是ByteToMessageCodec,而ByteToMessageCodec的构造函数中对要求不能在其子类中使用@Sharable注解。究其原因就是ByteToMessageCodec被设计出来的时候就考虑到会记录状态的,我们可以更换父类为MessageToMessageCodec即可,前提是WXMessageCodec必须和LengthFieldBasedFrameDecoder同时使用。优化后的WXMessageCodec:
java
// 加上Sharable注解表示可以作为单例使用
@ChannelHandler.Sharable
// MessageToMessageCodec类需要指定输入和输出类型的泛型
public class WXMessageCodec extends MessageToMessageCodec<ByteBuf, WXMessage> {
Logger log = LoggerFactory.getLogger(WXMessageCodec.class);
@Override
protected void encode(ChannelHandlerContext ctx, WXMessage msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
Kryo kryo = new Kryo();
kryo.register(LoginMessage.class);
Output output = new Output(10, -1);
kryo.writeObject(output, msg);
byte[] bytes = output.getBuffer();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Kryo kryo = new Kryo();
kryo.register(LoginMessage.class);
LoginMessage wxMessage = kryo.readObject(new Input(bytes), LoginMessage.class);
log.debug("magicNum:{}, version:{}, serializerType:{}, messageType:{}, sequenceId:{}, length:{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("wxMessage:{}, name:{}", wxMessage, wxMessage.getUsername());
out.add(wxMessage);
}
}