开发者社区> 问答> 正文

HBase2.X通过Netty来实现RPC 那内存泄漏是怎么做监控

需要监控哪些指标。

展开
收起
hbase小助手 2018-11-15 14:01:31 3190 0
1 条回答
写回答
取消 提交回答
  • java 数据分析 数据可视化 大数据

    引用计数
    netty中使用引用计数机制来管理资源,当一个实现ReferenceCounted的对象实例化时,引用计数置1.
    客户代码中需要保持一个该对象的引用时需要调用接口的retain方法将计数增1.对象使用完毕时调用release将计数减1.
    当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池.

    内存泄露
    按上述规则使用Direct和Pooled的ByteBuf尤其重要.对于DirectBuf,其内存不受VM垃圾回收控制只有在调用release导致计数为0时才会主动释放内存,而PooledByteBuf只有在release后才能被回收到池中以循环利用.
    如果客户代码没有按引用计数规则使用这两种对象,将会导致内存泄露.

    内存使用跟踪
    在netty.io.util包中含有如下两个类
    ResourceLeak 用于跟踪内存泄露
    ResourceLeakDetector 内存泄露检测工具
    在io.netty.buffer.AbstractByteBufAllocator类中有如下代码

    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    //装饰器模式,用SimpleLeakAwareByteBuf或AdvancedLeakAwareByteBuf来包装原始的ByteBuf
    //两个包装类均通过调用ResourceLeak的record方法来记录ByteBuf的方法调用堆栈,区别在于后者比前者记录更多的内容
    protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {

    ResourceLeak leak;  
    //根据设置的Level来选择使用何种包装器  
    switch (ResourceLeakDetector.getLevel()) {  
        case SIMPLE:  
            //创建用于跟踪和表示内容泄露的ResourcLeak对象  
            leak = AbstractByteBuf.leakDetector.open(buf);  
            if (leak != null) {  
                //只在ByteBuf.order方法中调用ResourceLeak.record  
                buf = new SimpleLeakAwareByteBuf(buf, leak);  
            }  
            break;  
        case ADVANCED:  
        case PARANOID:  
            leak = AbstractByteBuf.leakDetector.open(buf);  
            if (leak != null) {  
                //在ByteBuf几乎所有方法中调用ResourceLeak.record    
                buf = new AdvancedLeakAwareByteBuf(buf, leak);  
            }  
            break;  
    }  
    return buf;  

    }

    下图展示了该方法被调用的时机.可见Netty只对PooledByteBuf和DirectByteBuf监控内存泄露.

    内存泄露检测

    下面观察上述代码中的AbstractByteBuf.leakDetector.open(buf);

    实现代码如下
    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    //创建用于跟踪和表示内容泄露的ResourcLeak对象
    public ResourceLeak open(T obj) {

    Level level = ResourceLeakDetector.level;  
    if (level == Level.DISABLED) {//禁用内存跟踪  
        return null;  
    }  
    if (level.ordinal() < Level.PARANOID.ordinal()) {  
        //如果监控级别低于PARANOID,在一定的采样频率下报告内存泄露  
        if (leakCheckCnt ++ % samplingInterval == 0) {  
            reportLeak(level);  
            return new DefaultResourceLeak(obj);  
        } else {  
            return null;  
        }  
    } else {  
        //每次需要分配 ByteBuf 时,报告内存泄露情况  
        reportLeak(level);  
        return new DefaultResourceLeak(obj);  
    }  

    }
    其中reportLeak方法中完成对内存泄露的检测和报告,如下面代码所示.

    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    private void reportLeak(Level level) {

    //......  
    
    // 报告生成了太多的活跃资源  
    int samplingInterval = level == Level.PARANOID? 1 : this.samplingInterval;  
    if (active * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) {  
        logger.error("LEAK: You are creating too many " + resourceType + " instances.  " +  
                resourceType + " is a shared resource that must be reused across the JVM," +  
                "so that only a few instances are created.");  
    }  
    
    // 检测并报告之前发生的内存泄露  
    for (;;) {  
        @SuppressWarnings("unchecked")  
        //检查引用队列(为什么通过检查该队列,可以判断是否存在内存泄露)  
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();  
        if (ref == null) {//队列为空,没有未报告的内存泄露或者从未发生内存泄露  
            break;  
        }  

    //清理引用

        ref.clear();  
    
        if (!ref.close()) {  
            continue;  
        }  
        //通过错误日志打印资源的方法调用记录,并将其保存在reportedLeaks中  
        String records = ref.toString();  
        if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {  
            if (records.isEmpty()) {  
                logger.error("LEAK: {}.release() was not called before it's garbage-collected. " +  
                        "Enable advanced leak reporting to find out where the leak occurred. " +  
                        "To enable advanced leak reporting, " +  
                        "specify the JVM option '-D{}={}' or call {}.setLevel()",  
                        resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this));  
            } else {  
                logger.error(  
                        "LEAK: {}.release() was not called before it's garbage-collected.{}",  
                        resourceType, records);  
            }  
        }  
    }  

    }
    综合上面的三段代码,可以看出, Netty 在分配新 ByteBuf 时进行内存泄露检测和报告.
    DefaultResourceLeak的声明如下

    [java] view plaincopy在CODE上查看代码片派生到我的代码片

    private final class DefaultResourceLeak extends PhantomReference

    //......  
    
    public DefaultResourceLeak(Object referent) {  
        //使用一个静态的引用队列(refQueue)初始化  
        //refQueue是ResourceLeakDecetor的成员变量并由其初始化  
        super(referent, referent != null? refQueue : null);  
        //......  
    }  
    
    //......  

    }

    可见DefaultResourceLeak是个”虚”引用类型,有别于常见的普通的”强”引用,虚引用完全不影响目标对象的垃圾回收,但是会在目标对象被VM垃圾回收时被加入到引用队列中.
    在正常情况下ResourceLeak对象会所监控的资源的引用计数为0时被清理掉(不在被加入引用队列),所以一旦资源的引用计数失常,ResourceLeak对象会被加入到引用队列.例如没有成对调用ByteBuf的retain和relaease方法,导致ByteBuf没有被正常释放(对于DirectByteBuf没有及时释放内存,对于PooledByteBuf没有返回Pool),当引用队列中存在元素时意味着程序中有内存泄露发生.
    ResourceLeakDetector通过检查引用队列来判断是否有内存泄露,并报告跟踪情况.

    2019-07-17 23:14:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
大数据时代的存储 ——HBase的实践与探索 立即下载
Hbase在滴滴出行的应用场景和最佳实践 立即下载
阿里云HBase主备双活 立即下载