ByteBuf
对字节数据的封装
1. 创建ByteBuf
使用ByteBufAllocator对象可以创建ByteBuf,默认创建大小为256比特:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
log(buffer);
for (int i = 0; i < 32; i++) {
buffer.writeByte(i);
}
log(buffer);
}
public static void log(ByteBuf buffer){
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder stringBuilder = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(stringBuilder, buffer);
System.out.println(stringBuilder.toString());
}
运行结果:
1.2 使用直接内存创建ByteBuf
// 创建池化基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
// 默认创建直接内存
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
1.3 使用堆内存创建ByteBuf
// 创建池化基于堆的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
直接内存vs堆内存
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
1.4 池化和非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
- 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
开启池化功能, 通过下面的jvm参数来设置:
-Dio.netty.allocator.type=pooled
- Netty4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现
- Netty4.1之前,池化功能还不成熟,默认是非池化实现
2. ByteBuf的组成
ByteBuf由四部分组成: 最开始读写指针都在0位置。相比JDK中的ByteBuffer,提供了两个指针分别用来读和写,不用切换读写模式。此外ByteBuf提供了最大容量和容量的概念,支持动态扩容。
3. 写入
3.1 常用API列表
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence( CharSequence sequence, Charset charset) | 写入字符串 |
3.2 使用API
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
结果是:
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
再写入一个int整数,也是4个字节:
buffer.writeInt(8);
log(buffer);
结果是:
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 08 |........ |
+--------+-------------------------------------------------+----------------+
4. 动态扩容
buffer.writeInt(6);
log(buffer);
再写入一个int整数时,容量不够了(初始容量是10),这时会引发扩容。结果是:
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 08 00 00 00 06 |............ |
+--------+-------------------------------------------------+----------------+
4.1 扩容规则
- 如何写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12 ,则扩容后capacity是16
- 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了)
- 扩容不能超过max capacity会报错
5. 读取
例如读了4次,每次一个字节:
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分:
1
2
3
4
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 08 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
如果需要重复读取int整数5,可以在read前先做个标记mark:
// 做个标记mark
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
// 重置读指针
buffer.resetReaderIndex();
log(buffer);
运行结果:
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
还有一种办法是采用get开头的一系列方法,这些方法不会改变read index。
6. retain&release
由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收:
- UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可
- UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存
可以看出不同ByteBuf类型的内存回收是不一样的,Netty提供统一的API进行内存释放:
`protected abstract void deallocate()`
7. 内存回收机制
7.1 引用计数法
Netty采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口:
- 每个ByteBuf对象的初始计数为1
- 调用
release()
方法计数减1,如果计数为0,ByteBuf内存被回收 - 调用
retain()
方法计数加1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收 - 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用
7.2 释放时机
多个ChannelHandler之间传递ByteBuf,如果其中一个ChannelHandler中执行release()
方法就会导致其他ChannelHandler无法正常使用客户端传入的ByteBuf。因此ByteBuf释放规则如下:
- Head的ChannelHandler: 首次创建ByteBuf放入pipeline。
- 入站ByteBuf处理原则:
- 对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release
- 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须release
- 如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release
- 注意各种异常,如果ByteBuf没有成功传递到下一个 ChannelHandler,必须release
- 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的ByteBuf)
- 出站ByteBuf处理原则: 出站消息最终都会转为ByteBuf输出,一直向前传,由HeadContext flush后release
- 异常处理原则: 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true。
- Tail的ChannelHandler: 对ByteBuf内存进行释放。
关于Tail中释放ByteBuf的代码:
// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
// 调用引用计数器工具类释放内存方法
ReferenceCountUtil.release(msg);
}
}
// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
8. slice
8.1 slice介绍
ByteBuf【零拷贝】的体现之一(还有其他的api也能实现零拷贝),对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针。 原始ByteBuf进行一些初始操作:
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
System.out.println(ByteBufUtil.prettyHexDump(origin));
这时调用slice进行切片,无参slice是从原始ByteBuf的read index到write index之间的内容进行切片,切片后的max capacity被固定为这个区间的大小,因此不能追加write。
// 切片过程中,没有发生数据复制
ByteBuf slice = origin.slice(0, 2);
System.out.println(ByteBufUtil.prettyHexDump(slice));
// 修改指定位置的字节内容
slice.setByte(0, 9);
System.out.println(ByteBufUtil.prettyHexDump(origin));
System.out.println(ByteBufUtil.prettyHexDump(slice));
运行结果: 可以看到修改了slice的字节内容,origin的字节内容也发生了变化,说明slice并不是新的ByteBuf, 仍然是origin的部分内容的引用。
8.2 不能更改slice结构
但如果是对slice后的数据不进行更新而是添加或者删除则会报错,因为这样操作会改变ByteBuf结构,比如长度发生变化:
// 添加字节内容
slice.writeByte(7);
运行结果:
8.3 自行管理slice
ByteBuf可以通过release()
方法进行释放,使用slice得到的ByteBuf不能受到父ByteBuf的release限制:
// 比如父ByteBuf被使用它的人释放了
origin.release();
System.out.println(ByteBufUtil.prettyHexDump(slice));
报错的原因就是因为slice得到ByteBuf也被释放了,对于自行创建的ByteBuf需要创建的人进行维护,而不是被其他人管理不利于维护:
// 解决办法就是在创建slice的ByteBuf后,固定格式加上retain, 使得引用计数+1
slice.retain();
// 释放的时候不会释放掉slice
origin.release();
System.out.println(ByteBufUtil.prettyHexDump(slice));
9. duplicate
【零拷贝】的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。
10. copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关
11. CompositeByteBuf
【零拷贝】的体现之一,可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免拷贝。
11.1 普通方式拷贝
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer().writeBytes(buf1).writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
结果输出:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
11.2 零拷贝方式
public static void main(String[] args) throws ExecutionException, InterruptedException {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 获取compositeBuffer
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true表示更新维护写指针,若为false就无法合并,
buf3.addComponents(true, buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
}
运行结果:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
另外读指针就是0,就是要从0开始读,不用维护。
12. Unpooled
Unpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等操作。当包装ByteBuf个数超过一个时, 底层使用了CompositeByteBuf,从而实现零拷贝
// 创建空的ByteBuf
ByteBuf buffer = Unpooled.buffer();
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
// 包装普通字节数组,底层也不会有拷贝操作
ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
读和写的误解
Java Socket是全双工的:在任意时刻,线路上存在A 到 B
和 B 到 A
的双向信号传输。即使是阻塞BIO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读。