Skip to content

自定义协议

1. 自定义协议要素

  1. 魔数,用来在第一时间判定是否是无效数据包,比如java的cafebabe
  2. 版本号,可以支持协议的升级
  3. 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  4. 指令类型,是登录、注册、单聊、群聊... 跟业务相关
  5. 请求序号,为了双工通信,提供异步能力, 也就是比如第1、2、3条消息请求,异步也就是并行处理后回复的时候,需要再次带上请求序号,方便客户端识别请求和响应的对应关系。
  6. 正文长度
  7. 消息正文

其中序列化算法主要应用在消息正文。

2. 设计聊天室协议

2.1 编写编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用Netty完成收发。

  1. 添加maven依赖
xml
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>5.6.2</version>
</dependency>

kryo是一个java高性能的序列化组件。
2. 编写WXMessageCodec类用于编解码消息:

java
public class WXMessageCodec extends ByteToMessageCodec<WXMessage> {

    Logger log = LoggerFactory.getLogger(WXMessageCodec.class);

    @Override
    protected void encode(ChannelHandlerContext ctx, WXMessage msg, ByteBuf out) throws Exception {
        // 1. 2 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        Kryo kryo = new Kryo();
        kryo.register(LoginMessage.class);
        Output output = new Output(10, -1);
        kryo.writeObject(output, msg);
        byte[] bytes = output.getBuffer();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        Kryo kryo = new Kryo();
        kryo.register(LoginMessage.class);
        LoginMessage wxMessage = kryo.readObject(new Input(bytes), LoginMessage.class);
        log.debug("magicNum:{}, version:{}, serializerType:{}, messageType:{}, sequenceId:{}, length:{}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("wxMessage:{}, name:{}", wxMessage, wxMessage.getUsername());
        out.add(wxMessage);
    }
}

2.2 编写测试代码

java
public static void main(String[] args) throws Exception {
    EmbeddedChannel channel = new EmbeddedChannel(
            new LoggingHandler(LogLevel.INFO),
            new WXMessageCodec());
    LoginMessage loginMessage = new LoginMessage();
    loginMessage.setUsername("jack");
    loginMessage.setPassword("jack");
    // 测试encode
    channel.writeOutbound(loginMessage);
    // 测试decode, 需要手动写入buffer
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    new WXMessageCodec().encode(null, loginMessage, buffer);
    channel.writeInbound(buffer);
}

运行结果:
Alt text 但如果数据发来的是不完整的数据,比如:

java
// slice分片
ByteBuf slice1 = buffer.slice(0, 10);
// 将不完整消息发送出去
channel.writeInbound(slice1);

运行结果:
Alt text 因此测试代码需要加上LengthFieldBasedFrameDecoder进行粘包半包处理:

java
public static void main(String[] args) throws Exception {
    EmbeddedChannel channel = new EmbeddedChannel(
            new LoggingHandler(LogLevel.INFO),
            new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
            new WXMessageCodec());
    LoginMessage loginMessage = new LoginMessage();
    loginMessage.setUsername("jack");
    loginMessage.setPassword("jack");
    // 测试encode
    channel.writeOutbound(loginMessage);
    // 测试decode, 需要手动写入buffer
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    new WXMessageCodec().encode(null, loginMessage, buffer);

    ByteBuf slice1 = buffer.slice(0, 10);
    ByteBuf slice2 = buffer.slice(10, buffer.readableBytes()-10);
    /**
     * writeInbound内部会释放slice1的引用,而slice1引用的是buffer的地址,
     * 最终会影响slice2的使用,因此需要手动retain,否则会出现异常
     */
    slice1.retain();
    channel.writeInbound(slice1);
    channel.writeInbound(slice2);
}

运行结果:
Alt text

3. 编解码器优化

自定义协议的编解码器可以考虑重用,使用单例:

java
public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    // 将handler提前创建单例
    LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
    WXMessageCodec wxMessageCodec = new WXMessageCodec();
    try {
        serverBootstrap.group(group);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 重复使用loggingHandler
                ch.pipeline().addLast(loggingHandler);
                // LengthFieldBasedFrameDecoder涉及处理半包粘包,没有@Shareable注解,每次都需要创建
                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
                // 重复使用wxMessageCodec
                ch.pipeline().addLast(wxMessageCodec);
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind("192.168.101.1", 8080);
        channelFuture.channel().closeFuture().sync();
    } finally {
        group.shutdownGracefully();
    }
}

启动发现报错:
Alt text 因为WXMessageCodec的父类是ByteToMessageCodec,而ByteToMessageCodec的构造函数中对要求不能在其子类中使用@Sharable注解。究其原因就是ByteToMessageCodec被设计出来的时候就考虑到会记录状态的,我们可以更换父类为MessageToMessageCodec即可,前提是WXMessageCodec必须和LengthFieldBasedFrameDecoder同时使用。优化后的WXMessageCodec:

java
// 加上Sharable注解表示可以作为单例使用
@ChannelHandler.Sharable
// MessageToMessageCodec类需要指定输入和输出类型的泛型
public class WXMessageCodec extends MessageToMessageCodec<ByteBuf, WXMessage> {

    Logger log = LoggerFactory.getLogger(WXMessageCodec.class);

    @Override
    protected void encode(ChannelHandlerContext ctx, WXMessage msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        Kryo kryo = new Kryo();
        kryo.register(LoginMessage.class);
        Output output = new Output(10, -1);
        kryo.writeObject(output, msg);
        byte[] bytes = output.getBuffer();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        Kryo kryo = new Kryo();
        kryo.register(LoginMessage.class);
        LoginMessage wxMessage = kryo.readObject(new Input(bytes), LoginMessage.class);
        log.debug("magicNum:{}, version:{}, serializerType:{}, messageType:{}, sequenceId:{}, length:{}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("wxMessage:{}, name:{}", wxMessage, wxMessage.getUsername());
        out.add(wxMessage);
    }
}