【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列

简介: 前言上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

//继承AbstractQueue类并实现Queue接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {
    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);
    
    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
   public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        init();
    }


    private void init() {
        //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedQueue#init] error : " + e.toString(), e);
        }
    }
}

offer

/**
 * Offer boolean.
 *
 * @param o the o
 * @return the boolean
 */
@Override
public boolean offer(E o) {
    //构建要插入的节点名称
    String fullPath = dir.concat("/").concat(node);
    try {
        //创建子节点成功则返回入队成功
      zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
        return true;
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] error : " + e.toString(), e);
    }
    return false;
}

poll

/**
 * Poll e.
 *
 * @return the e
 */
@Override
public E poll() {
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                zooKeeper.delete(fullPath, -1);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);
    }

    return null;
}

peek

/**
 * Peek e.
 *
 * @return the e
 */
@Override
public E peek() {
   
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);
    }

    return null;
}

size

/**
 * Size int.
 *
 * @return the int
 */
@Override
public int size() {
    try {
        //获取根节点的子节点名称
        List<String> children = zooKeeper.getChildren(dir, null);
        //返回子结点信息数量
        return children.size();
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] size : " + e.toString(), e);
    }

    return 0;
}

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr
如果有好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
9月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
160 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
313 11
|
6月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
7月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
335 2
|
9月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
365 1
分布式新闻数据采集系统的同步效率优化实战
|
10月前
|
数据采集 机器学习/深度学习 数据可视化
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
本文探讨了MSE与Cauchy损失函数在线性回归中的表现,特别是在含噪声数据环境下的差异。研究发现,MSE虽具良好数学性质,但对异常值敏感;而Cauchy通过其对数惩罚机制降低异常值影响,展现出更强稳定性。实验结果表明,Cauchy损失函数在处理含噪声数据时参数估计更接近真实值,为实际应用提供了更鲁棒的选择。
371 1
让回归模型不再被异常值"带跑偏",MSE和Cauchy损失函数在噪声数据环境下的实战对比
|
10月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2696 7
|
11月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
962 4
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2