堆外内存及其在 RxCache 中的使用

简介: 堆外内存及其在 RxCache 中的使用

RxCache



RxCache 是一款支持 Java 和 Android 的 Local Cache 。目前,支持堆内存、堆外内存(off-heap memory)、磁盘缓存。


github地址:https://github.com/fengzhizi715/RxCache


堆外内存(off-heap memory)



对象可以存储在 堆内存、堆外内存、磁盘缓存甚至是分布式缓存。


在 Java 中,与堆外内存相对的是堆内存。堆内存遵守 JVM 的内存管理机制,而堆外内存不受到此限制,它由操作系统进行管理。


image.png

JVM的内存管理以及堆外内存.jpg


堆外内存和堆内存有明显的区别,或者说有相反的应用场景。


堆外内存更适合:


  • 存储生命周期长的对象
  • 可以在进程间可以共享,减少 JVM 间的对象复制,使得 JVM 的分割部署更容易实现。
  • 本地缓存,减少磁盘缓存或者分布式缓存的响应时间。


RxCache 中使用的堆外内存



首先,创建一个 DirectBufferConverter ,用于将对象和 ByteBuffer 相互转换,以及对象和byte数组相互转换。其中,ByteBuffer.allocteDirect(capability) 用于分配堆外内存。Cleaner 是自己定义的一个类,用于释放 DirectByteBuffer。具体代码可以查看:https://github.com/fengzhizi715/RxCache/blob/master/offheap/src/main/java/com/safframework/rxcache/offheap/converter/Cleaner.java

public abstract class DirectBufferConverter<V> {
    public void dispose(ByteBuffer direct) {
        Cleaner.clean(direct);
    }
    public ByteBuffer to(V from) {
        if(from == null) return null;
        byte[] bytes = toBytes(from);
        ByteBuffer.wrap(bytes);
        ByteBuffer bf = ByteBuffer.allocateDirect(bytes.length);
        bf.put(bytes);
        bf.flip();
        return bf;
    }
    abstract public byte[] toBytes(V value);
    abstract public V toObject(byte[] value);
    public V from(ByteBuffer to) {
        if(to == null) return null;
        byte[] bs = new byte[to.capacity()];
        to.get(bs);
        to.flip();
        return toObject(bs);
    }
}


接下来,定义一个 ConcurrentDirectHashMap<K, V> 实现Map接口。它是一个范性,支持将 V 转换成 ByteBuffer 类型,存储到 ConcurrentDirectHashMap 的 map 中。

public abstract class ConcurrentDirectHashMap<K, V> implements Map<K, V> {
    final private Map<K, ByteBuffer> map;
    private final DirectBufferConverter<V> converter = new DirectBufferConverter<V>() {
        @Override
        public byte[] toBytes(V value) {
            return convertObjectToBytes(value);
        }
        @Override
        public V toObject(byte[] value) {
            return convertBytesToObject(value);
        }
    };
    ConcurrentDirectHashMap() {
        map = new ConcurrentHashMap<>();
    }
    ConcurrentDirectHashMap(Map<K, V> m) {
        map = new ConcurrentHashMap<>();
        for (Entry<K, V> entry : m.entrySet()) {
            K key = entry.getKey();
            ByteBuffer val = converter.to(entry.getValue());
            map.put(key, val);
        }
    }
    protected abstract byte[] convertObjectToBytes(V value);
    protected abstract V convertBytesToObject(byte[] value);
    @Override
    public int size() {
        return map.size();
    }
    @Override
    public boolean isEmpty() {
        return map.isEmpty();
    }
    @Override
    public boolean containsKey(Object key) {
        return map.containsKey(key);
    }
    @Override
    public V get(Object key) {
        final ByteBuffer byteBuffer = map.get(key);
        return converter.from(byteBuffer);
    }
    @Override
    public V put(K key, V value) {
        final ByteBuffer byteBuffer = map.put(key, converter.to(value));
        converter.dispose(byteBuffer);
        return converter.from(byteBuffer);
    }
    @Override
    public V remove(Object key) {
        final ByteBuffer byteBuffer = map.remove(key);
        final V value = converter.from(byteBuffer);
        converter.dispose(byteBuffer);
        return value;
    }
    @Override
    public void putAll(Map<? extends K, ? extends V> m) {
        for (Entry<? extends K, ? extends V> entry : m.entrySet()) {
            ByteBuffer byteBuffer = converter.to(entry.getValue());
            map.put(entry.getKey(), byteBuffer);
        }
    }
    @Override
    public void clear() {
        final Set<K> keys = map.keySet();
        for (K key : keys) {
            map.remove(key);
        }
    }
    @Override
    public Set<K> keySet() {
        return map.keySet();
    }
    @Override
    public Collection<V> values() {
        Collection<V> values = new ArrayList<>();
        for (ByteBuffer byteBuffer : map.values())
        {
            V value = converter.from(byteBuffer);
            values.add(value);
        }
        return values;
    }
    @Override
    public Set<Entry<K, V>> entrySet() {
        Set<Entry<K, V>> entries = new HashSet<>();
        for (Entry<K, ByteBuffer> entry : map.entrySet()) {
            K key = entry.getKey();
            V value = converter.from(entry.getValue());
            entries.add(new Entry<K, V>() {
                @Override
                public K getKey() {
                    return key;
                }
                @Override
                public V getValue() {
                    return value;
                }
                @Override
                public V setValue(V v) {
                    return null;
                }
            });
        }
        return entries;
    }
    @Override
    public boolean containsValue(Object value) {
        for (ByteBuffer v : map.values()) {
            if (v.equals(value)) {
                return true;
            }
        }
        return false;
    }
}


创建 ConcurrentStringObjectDirectHashMap,它的 K 是 String 类型,V 是任意的 Object 对象。其中,序列化和反序列化采用《Java 字节的常用封装》提到的 bytekit

public class ConcurrentStringObjectDirectHashMap extends ConcurrentDirectHashMap<String,Object> {
    @Override
    protected byte[] convertObjectToBytes(Object value) {
        return Bytes.serialize(value);
    }
    @Override
    protected Object convertBytesToObject(byte[] value) {
        return Bytes.deserialize(value);
    }
}


基于 FIFO 以及堆外内存来实现 Memory 级别的缓存。

public class DirectBufferMemoryImpl extends AbstractMemoryImpl {
    private ConcurrentStringObjectDirectHashMap cache;
    private List<String> keys;
    public DirectBufferMemoryImpl(long maxSize) {
        super(maxSize);
        cache = new ConcurrentStringObjectDirectHashMap();
        this.keys = new LinkedList<>();
    }
    @Override
    public <T> Record<T> getIfPresent(String key) {
        T result = null;
        if(expireTimeMap.get(key)!=null) {
            if (expireTimeMap.get(key)<0) { // 缓存的数据从不过期
                result = (T) cache.get(key);
            } else {
                if (timestampMap.get(key) + expireTimeMap.get(key) > System.currentTimeMillis()) {  // 缓存的数据还没有过期
                    result = (T) cache.get(key);
                } else {                     // 缓存的数据已经过期
                    evict(key);
                }
            }
        }
        return result != null ? new Record<>(Source.MEMORY,key, result, timestampMap.get(key),expireTimeMap.get(key)) : null;
    }
    @Override
    public <T> void put(String key, T value) {
        put(key,value, Constant.NEVER_EXPIRE);
    }
    @Override
    public <T> void put(String key, T value, long expireTime) {
        if (keySet().size()<maxSize) { // 缓存还有空间
            saveValue(key,value,expireTime);
        } else {                       // 缓存空间不足,需要删除一个
            if (containsKey(key)) {
                keys.remove(key);
                saveValue(key,value,expireTime);
            } else {
                String oldKey = keys.get(0); // 最早缓存的key
                evict(oldKey);               // 删除最早缓存的数据 FIFO算法
                saveValue(key,value,expireTime);
            }
        }
    }
    private <T> void saveValue(String key, T value, long expireTime) {
        cache.put(key,value);
        timestampMap.put(key,System.currentTimeMillis());
        expireTimeMap.put(key,expireTime);
        keys.add(key);
    }
    @Override
    public Set<String> keySet() {
        return cache.keySet();
    }
    @Override
    public boolean containsKey(String key) {
        return cache.containsKey(key);
    }
    @Override
    public void evict(String key) {
        cache.remove(key);
        timestampMap.remove(key);
        expireTimeMap.remove(key);
        keys.remove(key);
    }
    @Override
    public void evictAll() {
        cache.clear();
        timestampMap.clear();
        expireTimeMap.clear();
        keys.clear();
    }
}


到了这里,已经完成了堆外内存在 RxCache 中的封装。其实,已经有很多缓存框架都支持堆外内存,例如 Ehcache、MapDB 等。RxCache 目前已经有了 MapDB 的模块。

相关文章
|
JSON atlas 图形学
unity之spine骨骼动画使用
unity实现spine骨骼动画使用
unity之spine骨骼动画使用
|
编解码 算法 关系型数据库
物理层系统设计架构及关键技术 | 带你读《5G 无线系统设计与国际标准》之六
物理层的设计是整个 5G 系统设计中最核心的部分。相对于 4G,ITU 及 3GPP 对 5G提出了更高而且更全面的关键性能指标要求。其中最具有挑战的峰值速率、频谱效率、用户体验速率、时延等关键指标均需要通过物理层的设计来达成。为迎接这些挑战,5G的新空口设计在充分借鉴 LTE 设计的基础上,也引入了一些全新的设计。
物理层系统设计架构及关键技术 | 带你读《5G 无线系统设计与国际标准》之六
|
SQL API 数据安全/隐私保护
打造现代化后端服务:从零到一实现RESTful API
【10月更文挑战第20天】在数字化时代的浪潮中,构建一个高效、可靠的后端服务是每个软件工程师的必备技能。本文将引导你理解RESTful API的设计哲学,并通过实际的代码示例,展示如何从无到有地实现一个现代化的后端服务。无论你是初学者还是有经验的开发者,这篇文章都将为你提供宝贵的知识和启发。
|
存储 Ubuntu 安全
如何在 Ubuntu 14.04 上安装和配置 Syncthing 来同步目录
如何在 Ubuntu 14.04 上安装和配置 Syncthing 来同步目录
361 0
|
机器学习/深度学习 人工智能 算法
从零构建现代深度学习框架(TinyDL-0.01)
本文主要以一个Java工程师视角,阐述如何从零(无任何二三方依赖)构建一个极简(麻雀虽小五脏俱全)现代深度学习框架(类比AI的操作系统)。
|
Java 程序员 Spring
如何理解AOP中的连接点(Joinpoint)、切点(Pointcut)、增强(Advice)、引介(Introduction)、织入(Weaving)、切面(Aspect)这些概念?
a. 连接点(Joinpoint):程序执行的某个特定位置(如:某个方法调用前、调用后,方法抛出异常后)。一个类或一段程序代码拥有一些具有边界性质的特定点,这些代码中的特定点就是连接点。
2675 0
二面头条、三面拼多多、五面蚂蚁分享面经总结,助你拿大厂offer
蚂蚁金服、头条、拼多多的面试总结 文章有点长,请耐心看完,绝对有收获!不想听我BB直接进入面试分享: 准备过程 蚂蚁金服面试分享 拼多多面试分享 字节跳动面试分享 总结
|
存储 弹性计算 网络协议
Open-TDP 多云资源管理系统介绍
Open-TDP 多云资源管理系统介绍
699 1
Open-TDP 多云资源管理系统介绍
|
Java 数据库 Spring
清晰、明了的@Transcation事务嵌套使用
清晰、明了的@Transcation事务嵌套使用
|
弹性计算 网络安全
阿里云服务器更换ip地址操作流程
阿里云服务器更换ip地址操作流程
436 1