Netty文档之引用计数对象

摘要:ByteBuf是最值得关注的一种缓存类型,它使用引用计数来提升分配内存和释放内存的性能
自从Netty发布版本4后,对象的生命周期交给它们的引用计数管理,因此当一个对象不再被使用,Netty可以把它返回给对象池或者它的创建者。引用计数机制虽然会带来一些不便,但相对垃圾回收和引用队列机制,对不可达的对象的判断它能够保证高效实时的保证。

ByteBuf是最值得关注的一种缓存类型,它使用引用计数来提升分配内存和释放内存的性能,这篇文章将会说明Netty中ByteBuf如何使用引用计数来工作的。


引用计数的基础

每个引用计数对象的初始引用数值等于1:
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
当释放引用计数对象时,它的引用数值减1,当该对象的引用数值等于0,那么这个引用计数对象将会被释放或者返回给对象池。
assert buf.refCnt() == 1;
// release() returns true only if the reference count
// becomes 0.boolean destroyed = buf.release();
assert destroyed;
assert buf.refCnt() == 0;

Dangling引用

尝试访问一个引用数值为0的引用计数对象时,会抛出一个IllegalReferenceCountExeception异常:
assert buf.refCnt() == 0;
try {
  buf.writeLong(0xdeadbeef);
  throw new Error("should not reach here");
} catch (IllegalReferenceCountExeception e) {
  // Expected
}

增加引用数值

如果一个引用计数对象没有被释放,那么可以通过调用retain()方法来增加引用数值:
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
buf.retain();
assert buf.refCnt() == 2;
boolean destroyed = buf.release();
assert !destroyed;
assert buf.refCnt() == 1;


谁来释放?

一般的原则是最后调用引用计数对象的代码块负责释放引用计数对象,具体如下:
  • 如果组件A把引用计数对象传递给组件B,那么一般来说组件A不需要释放对象,而是把决定权交给组件B;
  • 如果组件A消费了引用计数对象并且知道没有其它组件再访问该引用计数对象,那么组件A需要释放该对象;
编者注:组件A消费引用计数对象的情况如把原有的对象做了转化。在Netty中组件可以理解为Handler,假如每一个Handler对象都把消息往下传递,Handler也不知道谁会是最后的那一个,所以Netty在Handler链的最后加了一个TailHandler,当消息传递到TailHandler如果仍然是ReferenceCounted类型就会释放掉。

下面是一个简单的例子:
public ByteBuf a(ByteBuf input) {
    input.writeByte(42);
    return input;
}
public ByteBuf b(ByteBuf input) {
    try {
        output = input.alloc().directBuffer(
               input.readableBytes() + 1);
        output.writeBytes(input);
        output.writeByte(42);
        return output;
    } finally {
        input.release();
    }
}
public void c(ByteBuf input) {
    System.out.println(input);
    input.release();
}
public void main() {
    ...
    ByteBuf buf = ...;
    // This will print buf to System.out and destroy it.
    c(b(a(buf)));
    assert buf.refCnt() == 0;
}
操作 谁来释放? 谁释放了?
1. main() creates buf bufmain()
2. main() calls a() with buf bufa()
3. a() returns buf merely. bufmain()
4. main() calls b() with buf bufb()
5. b() returns the copy of buf bufb()copymain() b() releases buf
6. main() calls c() with copy copyc()
7. c() swallows copy copyc() c() releases copy

派生buffers

ByteBuf.dulicate()、ByteBuf.slice()和ByteBuf.order()方法会创建派生的buffer,派生buffer会共享原buffer的一部分内存。派生buffer没有自己独立的引用计数,而是共享原buffer的引用计数。
ByteBuf parent = ctx.alloc().directBuffer();
ByteBuf derived = parent.duplicate();
// Creating a derived buffer does not increase the
// reference count.assert parent.refCnt() == 1;
assert derived.refCnt() == 1;
注意派生buffer与原buffer共享相同的引用计数,当创建一个派生buffer时不会增加引用计数的值。所以,在应用中你想要传递一个派生buffer到另外一个组件,首先你应该调用retain()方法。
ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);
try {
    while (parent.isReadable(16)) {
        ByteBuf derived = parent.readSlice(16);
        derived.retain();
        process(derived);
    }
} finally {
    parent.release();
}
...
public void process(ByteBuf buf) {
    ...
    buf.release();
}

ByteBufHolder接口

有时,一个ByteBuf被一个buffer holder对象持有,例如DatagramPacket、HttpContent和WebSocketframe,这些类都实现了同一个接口ByteBufHolder。
一个buffer holder对象共享它所持有的ByteBuf对象的引用计数,如同派生Buffer那样。

ChannelHandler中的引用计数

上行消息

当在一个循环事件读取消息到一个ByteBuf,并触发channelRead事件来处理这个ByteBuf,那么在管道中这个channelHandler应该负责释放这个ByteBuf。因此处理接收消息的handler需要在它的处理方法channelRead()方法中调用release()方法来释放ByteBuf:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf buf = (ByteBuf) msg;
    try {
        ...
    } finally {
        buf.release();
    }
}
如同文档中“谁来释放?”一节中所解释的那样,如果你的handler把buffer传递给下一个handler,那么你就不需要负责释放这个buffer:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf buf = (ByteBuf) msg;
    ...
    ctx.fireChannelRead(buf);
}
注意在Netty体系中不是只有ByteBuf一种类型的引用计数对象,假如你正在处理的由解码器生成的消息,它很有可能就是一种引用计数的对象:
// Assuming your handler is placed next to `HttpRequestDecoder`
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        HttpRequest req = (HttpRequest) msg;
        ...
    }
    if (msg instanceof HttpContent) {
        HttpContent content = (HttpContent) msg;
        try {
            ...
        } finally {
            content.release();
        }
    }
}
如果你不知道什么时候释放或者你想简化释放消息这些工作,你可以调用ReferenceCountUtil.release(msg):
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        ...
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
还有一种选择,你可以考虑继承SimpleChannelHandler,它在所有接收消息的地方都调用了ReferenceCountUtil.release(msg)。


下行消息

与上行消息不同的是,下行消息是由应用程序自身创建的,当这些消息被发送到网络端口后,Netty应该负责释放这些对象。但是,如果有handler拦截了你的请求消息,并创建了一些中间对象,那么这些handler要确保正确释放这些中间对象(如编码器)。
// Simple-pass through
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
    System.err.println("Writing: " + message);
    ctx.write(message, promise);
}
// Transformation
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
    if (message instanceof HttpContent) {
        // Transform HttpContent to ByteBuf.
        HttpContent content = (HttpContent) message;
        try {
            ByteBuf transformed = ctx.alloc().buffer();
            ....
            ctx.write(transformed, promise);
        } finally {
            content.release();
        }
    } else {
        // Pass non-HttpContent through.
        ctx.write(message, promise);
    }
}


解决buffer泄漏

引用计数的缺点是容易发生内存泄漏,因为JVM不清楚Netty实现的引用计数。一旦一个对象不可达就会被GC自动回收,即使这个对象的引用计数值还不是0。一个对象如果被垃圾回收后就不可用了,因此这些对象不能返回给对象池,因此可能造成内存泄漏。
编者注:所谓内存泄漏,主要是针对池化ByteBuf对象的。ByteBuf在被GC之前没有调用release()把持有的DirectByteBuf或byte[]归还给对象池,那么ByteBuf被GC掉后对象池是无法感知到的,所以它也不能回收借出去的DirectByteBuf或byte[]。虽然这部分的DirectByteBuf或byte[]仍然被对象池引用,但是对象池已经不能再把它们分配出去了,因为它们还被标记着未释放状态。对于非池化的ByteBuf,它可能被GC掉,所以最终会被释放,不会造成内存泄漏的问题。

幸运的是,虽然找到内存泄漏是比较困难的,但是Netty提供了一种方案来帮助发现泄漏,此方案默认在你的程序中提取已分配的buffer的大约1%的内存作为样本,来检查是否存在内存泄漏,如果存在泄漏,你会发现下面的日志信息:
LEAK: ByteBuf.release() was not called before it's garbage-collected.
Enable advanced leak reporting to find out where the leak occurred.
To enable advanced leak reporting, specify the JVM option '-Dio.netty
.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel()

使用上面的JVM选项重启你的程序,你可以看到在你的程序中最近访问已泄漏的内存的位置。下面输出展示了来自单元测试的一个泄漏问题(XmlFrameDecoderTest.testDecodeWithXml() ):
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release()
was not called before it's garbage-collected.
Recent access records: 1
#1:
    io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697)
    io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157)
    io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
    ...
Created at:
    io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
    io.netty.buffer.UnpooledUnsafeDirectByteBuf.copy(UnpooledUnsafeDirectByteBuf.java:465)
    io.netty.buffer.WrappedByteBuf.copy(WrappedByteBuf.java:697)
    io.netty.buffer.AdvancedLeakAwareByteBuf.copy(AdvancedLeakAwareByteBuf.java:656)
    io.netty.handler.codec.xml.XmlFrameDecoder.extractFrame(XmlFrameDecoder.java:198)
    io.netty.handler.codec.xml.XmlFrameDecoder.decode(XmlFrameDecoder.java:174)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:227)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:140)
    io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74)
    io.netty.channel.embedded.EmbeddedEventLoop.invokeChannelRead(EmbeddedEventLoop.java:142)
    io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:317)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:176)
    io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:147)
    io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
    ...
如果使用Netty5或者更新版本,还提供了一个额外的信息来帮助我们找到最后操作了泄漏buffer的handler。下面的例子展示了名为EchoServerHandler#0的handler操作了已泄露的缓冲,并且缓冲已被GC了,这意味着EchoServerHandler#0忘记释放了这个buffer:
12:05:24.374 [nioEventLoop-1-1] ERROR io.netty.util.ResourceLeakDetector 
- LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 2
#2:
    Hint: 'EchoServerHandler#0' will handle the message from this point.
    io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:329)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:133)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
    java.lang.Thread.run(Thread.java:744)
#1:
    io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:589)
    io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:208)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
    java.lang.Thread.run(Thread.java:744)
Created at:
    io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
    io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
    java.lang.Thread.run(Thread.java:744)

泄漏检测级别

当前有4个泄漏检测级别:
  • 禁用(DISABLED)   - 完全禁止泄露检测。不推荐。
  • 简单(SIMPLE)       - 告诉我们取样的1%的缓冲是否发生了泄露。默认。
  • 高级(ADVANCED) - 告诉我们取样的1%的缓冲发生泄露的地方
  • 偏执(PARANOID)  - 跟高级选项类似,但此选项检测所有缓冲,而不仅仅是取样的那1%。此选项在自动测试阶段很有用。如果构建(build)输出包含了LEAK,可认为构建失败。
你可以使用JVM的-Dio.netty.leakDetectionLevel选项来指定泄漏检测级别:
java -Dio.netty.leakDetectionLevel=advanced ...

避免泄漏的最佳实践

  • 在简单级别和偏执级别上运行你的单元测试和集成测试(integration tests)。
  • 在rolling out到整个集群之前,使用简单级别,以一个合理的、足够长的时间测试你的程序,来发现是否存在泄露。
  • 如果存在泄露,再用高级级别来测试以获得一些关于泄露的提示。
  • 不要部署存在泄露的程序到整个集群。


单元测试中修复问题

单元测试中很容易忘记释放buffer。这会产生一个泄露的警告,但并不是说就肯定存在泄露。你可以使用ReferenceCountUtil.releaseLater()工具方法,放弃用try-finally来包裹你的单元测试代码以释放所有的buffer:
import static io.netty.util.ReferenceCountUtil.*;
@Testpublic void testSomething() throws Exception {
    // ReferenceCountUtil.releaseLater() will keep the reference of buf,
    // and then release it when the test thread is terminated.
    ByteBuf buf = releaseLater(Unpooled.directBuffer(512));
    ...
}

版权说明:如无特殊说明,文章均为本站原创,如需转载请注明出处

本文标题:Netty文档之引用计数对象

本文地址:http://www.wolfbe.com/detail/201609/377.html

本文标签: bytebuf netty nio

相关文章

感谢您的支持,朗度云将继续前行

扫码打赏,金额随意

温馨提醒:打赏一旦完成,金额无法退还,请谨慎操作!

扫二维码 我要反馈 回到顶部