Skip to content

ByteBuf

对字节数据的封装

1. 创建ByteBuf

使用ByteBufAllocator对象可以创建ByteBuf,默认创建大小为256比特:

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    log(buffer);
    for (int i = 0; i < 32; i++) {
        buffer.writeByte(i);
    }
    log(buffer);
}

public static void log(ByteBuf buffer){
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder stringBuilder = new StringBuilder(rows * 80 * 2)
            .append("read index:").append(buffer.readerIndex())
            .append(" write index:").append(buffer.writerIndex())
            .append(" capacity:").append(buffer.capacity())
            .append(StringUtil.NEWLINE);
    ByteBufUtil.appendPrettyHexDump(stringBuilder, buffer);
    System.out.println(stringBuilder.toString());
}

运行结果:
Alt text

1.2 使用直接内存创建ByteBuf

java
// 创建池化基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
// 默认创建直接内存
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

1.3 使用堆内存创建ByteBuf

java
// 创建池化基于堆的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

直接内存vs堆内存

  1. 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  2. 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

1.4 池化和非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
  • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

开启池化功能, 通过下面的jvm参数来设置:

sh
-Dio.netty.allocator.type=pooled
  • Netty4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现
  • Netty4.1之前,池化功能还不成熟,默认是非池化实现

2. ByteBuf的组成

ByteBuf由四部分组成:
Alt text 最开始读写指针都在0位置。相比JDK中的ByteBuffer,提供了两个指针分别用来读和写,不用切换读写模式。此外ByteBuf提供了最大容量和容量的概念,支持动态扩容。

3. 写入

3.1 常用API列表

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(
CharSequence sequence,
Charset charset)
写入字符串

3.2 使用API

java
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);

结果是:

sh
read index:0 write index:4 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

再写入一个int整数,也是4个字节:

java
buffer.writeInt(8);
log(buffer);

结果是:

sh
read index:0 write index:8 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 08                         |........        |
+--------+-------------------------------------------------+----------------+

4. 动态扩容

java
buffer.writeInt(6);
log(buffer);

再写入一个int整数时,容量不够了(初始容量是10),这时会引发扩容。结果是:

sh
read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 08 00 00 00 06             |............    |
+--------+-------------------------------------------------+----------------+

4.1 扩容规则

  1. 如何写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12 ,则扩容后capacity是16
  2. 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了)
  3. 扩容不能超过max capacity会报错

5. 读取

例如读了4次,每次一个字节:

java
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分:

sh
1
2
3
4
read index:4 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 08 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+

如果需要重复读取int整数5,可以在read前先做个标记mark:

java
// 做个标记mark
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
// 重置读指针
buffer.resetReaderIndex();
log(buffer);

运行结果:

sh
read index:4 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06                         |........        |
+--------+-------------------------------------------------+----------------+

还有一种办法是采用get开头的一系列方法,这些方法不会改变read index。

6. retain&release

由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收:

  • UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可
  • UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存

可以看出不同ByteBuf类型的内存回收是不一样的,Netty提供统一的API进行内存释放:

java
`protected abstract void deallocate()`

7. 内存回收机制

7.1 引用计数法

Netty采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口:

  • 每个ByteBuf对象的初始计数为1
  • 调用release()方法计数减1,如果计数为0,ByteBuf内存被回收
  • 调用retain()方法计数加1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
  • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

7.2 释放时机

多个ChannelHandler之间传递ByteBuf,如果其中一个ChannelHandler中执行release()方法就会导致其他ChannelHandler无法正常使用客户端传入的ByteBuf。因此ByteBuf释放规则如下:

  1. Head的ChannelHandler: 首次创建ByteBuf放入pipeline。
  2. 入站ByteBuf处理原则:
    • 对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release
    • 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须release
    • 如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release
    • 注意各种异常,如果ByteBuf没有成功传递到下一个 ChannelHandler,必须release
    • 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的ByteBuf)
  3. 出站ByteBuf处理原则: 出站消息最终都会转为ByteBuf输出,一直向前传,由HeadContext flush后release
  4. 异常处理原则: 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true。
  5. Tail的ChannelHandler: 对ByteBuf内存进行释放。

关于Tail中释放ByteBuf的代码:

java
// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        // 调用引用计数器工具类释放内存方法
        ReferenceCountUtil.release(msg);
    }
}

// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}

8. slice

8.1 slice介绍

ByteBuf【零拷贝】的体现之一(还有其他的api也能实现零拷贝),对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针。
Alt text 原始ByteBuf进行一些初始操作:

java
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
System.out.println(ByteBufUtil.prettyHexDump(origin));

这时调用slice进行切片,无参slice是从原始ByteBuf的read index到write index之间的内容进行切片,切片后的max capacity被固定为这个区间的大小,因此不能追加write。

java
// 切片过程中,没有发生数据复制
ByteBuf slice = origin.slice(0, 2);
System.out.println(ByteBufUtil.prettyHexDump(slice));
// 修改指定位置的字节内容
slice.setByte(0, 9);
System.out.println(ByteBufUtil.prettyHexDump(origin));
System.out.println(ByteBufUtil.prettyHexDump(slice));

运行结果:
Alt text 可以看到修改了slice的字节内容,origin的字节内容也发生了变化,说明slice并不是新的ByteBuf, 仍然是origin的部分内容的引用。

8.2 不能更改slice结构

但如果是对slice后的数据不进行更新而是添加或者删除则会报错,因为这样操作会改变ByteBuf结构,比如长度发生变化:

java
// 添加字节内容
slice.writeByte(7);

运行结果:
Alt text

8.3 自行管理slice

ByteBuf可以通过release()方法进行释放,使用slice得到的ByteBuf不能受到父ByteBuf的release限制:

java
// 比如父ByteBuf被使用它的人释放了
origin.release();
System.out.println(ByteBufUtil.prettyHexDump(slice));

Alt text 报错的原因就是因为slice得到ByteBuf也被释放了,对于自行创建的ByteBuf需要创建的人进行维护,而不是被其他人管理不利于维护:

java
// 解决办法就是在创建slice的ByteBuf后,固定格式加上retain, 使得引用计数+1
slice.retain();
// 释放的时候不会释放掉slice
origin.release();
System.out.println(ByteBufUtil.prettyHexDump(slice));

9. duplicate

【零拷贝】的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。
Alt text

10. copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关

11. CompositeByteBuf

【零拷贝】的体现之一,可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免拷贝。

11.1 普通方式拷贝

java
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer().writeBytes(buf1).writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

结果输出:

sh
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

11.2 零拷贝方式

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
    buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
    buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
    // 获取compositeBuffer
    CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
    // true表示更新维护写指针,若为false就无法合并,
    buf3.addComponents(true, buf1, buf2);
    System.out.println(ByteBufUtil.prettyHexDump(buf3));
}

运行结果:

sh
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

另外读指针就是0,就是要从0开始读,不用维护。

12. Unpooled

Unpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等操作。当包装ByteBuf个数超过一个时, 底层使用了CompositeByteBuf,从而实现零拷贝

java
// 创建空的ByteBuf
ByteBuf buffer = Unpooled.buffer();
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
// 包装普通字节数组,底层也不会有拷贝操作
ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});

读和写的误解

Java Socket是全双工的:在任意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是阻塞BIO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读。