【Netty】深入理解ByteBuf
之前的关于【Netty】的文章我们已经了解到 Netty 里面数据读写是以 ByteBuf 为单位进行交互的,这一小节,我们就来详细剖析一下 ByteBuf之前文章链接:(一)Netty学习前篇(二)Netty学习前篇(三)Netty学习前篇结构从上面这幅图可以看到,ByteBuf 是一个字节容器,容器里面的的数据分为三个部分:第一个部分是已经丢弃的字节,这部分数据是无效的;第...
之前的关于【Netty】的文章我们已经了解到 Netty 里面数据读写是以 ByteBuf 为单位进行交互的,这一小节,我们就来详细剖析一下 ByteBuf
之前文章链接:
(一)Netty学习前篇
(二)Netty学习前篇
(三)Netty学习前篇
结构
从上面这幅图可以看到,ByteBuf 是一个字节容器,容器里面的的数据分为三个部分:
- 第一个部分是已经丢弃的字节,这部分数据是无效的;
- 第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;
- 最后一部分的数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段。最后一部分虚线表示的是该 ByteBuf 最多还能扩容多少容量
以上三段内容是被两个指针给划分出来的,从左到右,依次是读指针(readerIndex)、写指针(writerIndex),然后还有一个变量 capacity,表示 ByteBuf 底层内存的总容量,从 ByteBuf 中每读取一个字节,readerIndex 自增1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读, 由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读
写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了,ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错
Netty 使用 ByteBuf 这个数据结构可以有效地区分可读数据和可写数据,读写之间相互没有冲突,当然,ByteBuf 只是对二进制数据的抽象
Netty抽象出来的ByteBuf和JDK的ByteBuffer 的区别在于:
-
1.扩容
-
ByteBuffer:ByteBuffer缓冲区的长度固定,分多了会浪费内存,分少了存放大的数据时会索引越界,所以使用ByteBuffer时,为了解决这个问题,我们一般每次put操作时,都会对可用空间进行校检,如果剩余空间不足,需要重新创建一个新的ByteBuffer,然后将旧的ByteBuffer复制到新的ByteBuffer中去。
-
ByteBuf:而ByteBuf则对其进行了改进,它会自动扩展,具体的做法是,写入数据时,会调用ensureWritable方法,传入我们需要写的字节长度,判断是否需要扩容:
源码:
public ByteBuf ensureWritable(int minWritableBytes) { if(minWritableBytes < 0) { throw Exception } else if(minWritableBytes <= this.writableBytes()) { return this; } else if(minWritableBytes > this.maxCapacity - this.writerIndex) { throw Exception } else { //扩容 int newCapacity = this.calculateNewCapacity(this.writerIndex + minWritableBytes); this.capacity(newCapacity); return this; } }
可以看到,具体新容量的计算在calculateNewCapacity方法中:
private int calculateNewCapacity(int minNewCapacity) { int maxCapacity = this.maxCapacity; int threshold = 4194304; //4mb阀值 if(minNewCapacity == 4194304) {//如果新容量为阀值,直接返回 return 4194304; } else { int newCapacity; if(minNewCapacity > 4194304) {//如果传入的新容量大于阀值,进行计算 newCapacity = minNewCapacity / 4194304 * 4194304; if(newCapacity > maxCapacity - 4194304) {//如果大于最大容量,新容量为最大容量 newCapacity = maxCapacity; } else {//否则新容量 + 阀值 4mb,按照阀值扩容 newCapacity += 4194304; } return newCapacity; } else {//如果小于阀值,则以64为计数倍增,知道倍增的结果>=需要的容量值 for(newCapacity = 64; newCapacity < minNewCapacity; newCapacity <<= 1) { ; } return Math.min(newCapacity, maxCapacity); } } }
请注意:
- 当申请的新空间大于阀值时,采用每次步进4MB的方式进行扩张内存,而不是倍增,因为这会造成内存膨胀和浪费
- 而但申请的新空间小于阀值时,则以64为基数进行倍增而不是步进,因为当内存比较小的时候,倍增是可以接受的(64 -> 128 和 10Mb -> 20Mb相比)
-
-
2.位置指针
-
ByteBuffer
ByteBuffer中只有一个位置指针position(ByteBuf有两个),所以需要我们手动得调用flip等方法,例如:
ByteBuffer buffer = ByteBuffer.allocate(88); String value = "我的博客"; buffer.put(value.getBytes()); buffer.flip(); byte[] array = new byte[buffer.remaining()]; buffer.get(array); String decodeValue = new String(array);
ByteBuffer中会有三个下标,初始位置0,当前位置positon,limit位置,初始时,position为0,limit为Buffer数组末尾
调用buffer.put(value.getBytes())后:
不调用flip:
从缓冲区读取的是position — limit位置的数据,明显不是我们要的
调用flip:
会将limit设置为position,position设置为0,,此时读取的数据
-
ByteBuf
ByteBuf中使用两个指针,readerIndex,writerIndex来指示位置,初始时readrIndex = writerIndex = 0,当写入数据后:
writerIndex — capacity:可写容量
readerIndex — writerIndex:可读部分当读取了M个字节后:
调用discardReadBytes,会释放掉discardReadBytes的空间,并把readableBytes复制到从0开始的位置,因此这里会发生内存复制,频繁调用会影响性能
-
ByteBuf分析
ByteBuf
最重要的两个基类,AbstractByteBuf
提供了骨干实现,AbstractReferenceCountedByteBuf
是AbstractByteBuf
的子类,它的子类很常使用
1.分类
1.1 从内存分配角度分类:
- 堆内存字节缓冲区(
HeapByteBuf
):UnPoolHeapByteBuf
、PooledHeapByteBuf
它的特点是内存的分配和回收都在堆,所以速度很快;缺点就是进行Socket
的IO
读写,需要把堆内存对应的缓冲区复制到内核Channel
中,这内存复制会影响性能 - 直接内存缓冲区(
DirectByteBuf
):UnPoolDirectByteBuf
、UnPoolUnsafeDirectByteBuf
、PoolDirectByteBuf
、PoolUnsafeDirectByteBuf
它的特点是由于内存的分配在非堆(方法区),不需要内存复制,所以IO读取的速度较快,但是内存的分配较慢 - 总结:
根据两种内存的特点,我们可以知道,IO
读写时最好使用DirectByteBuf
,而在后端业务消息的解编码最好使用HeapByteBuf
1.2 从内存回收的角度分类
- 基于对象池的
ByteBuf
(PoolByteBuf
):PooledByteBuf
和它的子类PoolDirectByteBuf
、PoolUnsafeDirectByteBuf
、PooledHeapByteBuf
它的特点是可以循环利用创建的ByteBuf,提高了内存的使用效率,PoolByteBuf的实现牵涉的数据结构很多,PoolByteBuf
首先会申请一大块内存区域PoolArena
,PoolArena
由多个Chunk
组成,而每个Chunk
由一个或多个page
组成
具体看《Netty权威指南》; - 普通的
ByteBuf(UnPoolByteBuf)
:UnPoolDirectByteBuf
、UnPoolUnsafeDirectByteBuf
、UnPoolHeapByteBuf
- 总结: 在高负载,大并发的情况下对象池的
ByteBuf
更好,而在一般情况下,可以使用UnPoolByteBuf
2.Netty的零拷贝
Netty的零拷贝主要体现在三个方面:
-
第一种实现:
DirectByteBuf
就如上所说,ByteBuf
可以分为HeapByteBuf
和DirectByteBuf
,当使用DirectByteBuf
可以实现零拷贝 -
第二种实现:
CompositeByteBuf
CompositeByteBuf
将多个ByteBuf封装成一个ByteBuf
,对外提供封装后的ByteBuf接口 -
第三种实现:
DefaultFileRegion
DefaultFileRegion
是Netty的文件传输类,它通过transferTo
方法将文件直接发送到目标Channel
,而不需要循环拷贝的方式,提升了传输性能
3.Netty的内存回收管理
Netty会通过 引用计数法 及时申请释放不再被引用的对象 ,实现上是通过 AbstractReferenceCountedByteBuf
来实现的,我们看上面的结构图,可以看到AbstractReferenceCountedByteBuf
是AbstractByteBuf
的直接子类,所有具体的实现ByteBuf
(堆,非堆等)都是继承自AbstractReferenceCountedByteBuf
,也就是说,Netty的具体的实现ByteBuf,都是具有内存回收管理的功能的
AbstractReferenceCountedByteBuf
有两个重要的成员变量:
-
AtomicIntegerFieldUpdater< AbstractReferenceCountedByteBuf> refCntUpdater
用来更新引用数,使用原子类,达到线程安全 -
volatile int refCnt = 1
用来记录引用数,保证可见性
引用+1
public ByteBuf retain() {
int refCnt;
do {
refCnt = this.refCnt;
if(refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if(refCnt == 2147483647) {
throw new IllegalReferenceCountException(2147483647, 1);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt + 1));
return this;
}
引用-1
public final boolean release() {
int refCnt;
do {
refCnt = this.refCnt;
if(refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - 1));
if(refCnt == 1) {
this.deallocate();
return true;
} else {
return false;
}
}
可以看出,无论是增加引用还是释放引用,都是使用了CAS
refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)
对象是his自身,也就是说,统计的是自身的引用数,例如对于UnPoolHeapByteBuf来说,它具有统计有多少对象引用着它,当引用数refCnt == 1时,表示此事已经没有对象引用它了,此时便调用deallocate来释放内存。
ByteBuf常用API介绍
容量 API
capacity()
表示 ByteBuf 底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节)
maxCapacity()
表示 ByteBuf 底层最大能够占用多少字节的内存,当向 ByteBuf 中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到 maxCapacity,超过这个数,就抛异常
readableBytes() 与 isReadable()
readableBytes() 表示 ByteBuf 当前可读的字节数,它的值等于 writerIndex-readerIndex,如果两者相等,则不可读,isReadable() 方法返回 false
writableBytes()、 isWritable() 与 maxWritableBytes()
writableBytes() 表示 ByteBuf 当前可写的字节数,它的值等于 capacity-writerIndex,如果两者相等,则表示不可写,isWritable() 返回 false,但是这个时候,并不代表不能往 ByteBuf 中写数据了, 如果发现往 ByteBuf 中写数据写不进去的话,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity,而 maxWritableBytes() 就表示可写的最大字节数,它的值等于 maxCapacity-writerIndex
读写指针相关的 API
readerIndex() 与 readerIndex(int)
前者表示返回当前的读指针 readerIndex, 后者表示设置读指针
writeIndex() 与 writeIndex(int)
前者表示返回当前的写指针 writerIndex, 后者表示设置写指针
markReaderIndex() 与 resetReaderIndex()
前者表示把当前的读指针保存起来,后者表示把当前的读指针恢复到之前保存的值,下面两段代码是等价的
// 代码片段1
int readerIndex = buffer.readerIndex();
// .. 其他操作
buffer.readerIndex(readerIndex);
// 代码片段二
buffer.markReaderIndex();
// .. 其他操作
buffer.resetReaderIndex();
根据Netty大佬闪电侠介绍:
希望大家多多使用代码片段二这种方式,不需要自己定义变量,无论 buffer 当作参数传递到哪里,调用 resetReaderIndex() 都可以恢复到之前的状态,在解析自定义协议的数据包的时候非常常见,推荐大家使用这一对 API
markWriterIndex() 与 resetWriterIndex()
作用与上述一对 API 类似,不再 赘述
读写 API
关于 ByteBuf 的读写都可以看作从指针开始的地方开始读写数据
writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst)
writeBytes() 表示把字节数组 src 里面的数据全部写到 ByteBuf,而 readBytes() 指的是把 ByteBuf 里面的数据全部读取到 dst,这里 dst 字节数组的大小通常等于 readableBytes(),而 src 字节数组大小的长度通常小于等于 writableBytes()
writeByte(byte b) 与 buffer.readByte()
writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 这里就不一一赘述了,相信读者应该很容易理解这些 API
与读写 API 类似的 API 还有 getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针,这点在解析数据的时候千万要注意
release() 与 retain()
由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。有点类似于c语言里面,申请到的内存必须手工释放,否则会造成内存泄漏。
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存。默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
slice()、duplicate()、copy()
-
slice() 方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex,同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes()
-
duplicate() 方法把整个 ByteBuf 都截取出来,包括所有的数据,指针信息
-
slice() 方法与 duplicate() 方法的相同点是:底层内存以及引用计数与原始的 ByteBuf 共享,也就是说经过 slice() 或者 duplicate() 返回的 ByteBuf 调用 write 系列方法都会影响到 原始的 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针.
-
slice() 方法与 duplicate() 不同点就是:slice() 只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到 原始 ByteBuf 的 readableBytes(), 而 duplicate() 是把整个 ByteBuf 都与原始的 ByteBuf 共享.
-
lice() 方法与 duplicate() 方法不会拷贝数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy() 会直接从原始的 ByteBuf 中拷贝所有的信息,包括读写指针以及底层对应的数据,因此,往 copy() 返回的 ByteBuf 中写数据不会影响到原始的 ByteBuf
-
slice() 和 duplicate() 不会改变 ByteBuf 的引用计数,所以原始的 ByteBuf 调用 release() 之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放,这个时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain() 方法 来增加引用,表示它们对应的底层的内存多了一次引用,引用计数为2,在释放内存的时候,需要调用两次 release() 方法,将引用计数降到零,才会释放内存.
这三个方法均维护着自己的读写指针,与原始的 ByteBuf 的读写指针无关,相互之间不受影响,其次使用到 slice 和 duplicate 方法的时候,千万要理清内存共享,引用计数共享,读写指针不共享,不释放会造成内存泄漏。
在一个函数体里面,只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法
Github地址:https://github.com/Bylant/LeetCode
CSDN地址:https://blog.csdn.net/ZBylant
微信公众号
更多推荐
所有评论(0)