【分布式】Zookeeper使用--开源客户端(一)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。

一、前言


  上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。



二、ZkClient

  ZkClient是在Zookeeper原声API接口之上进行了包装,是一个更易用的Zookeeper客户端,其内部还实现了诸如Session超时重连、Watcher反复注册等功能。

  

2.1 添加依赖

  在pom.xml文件中添加如下内容即可。

<dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>


2.2 创建会话

  使用ZkClient可以轻松的创建会话,连接到服务端。  

package com.hust.grid.leesf.zkclient.examples;
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
public class Create_Session_Sample {
    public static void main(String[] args) throws IOException, InterruptedException {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        System.out.println("ZooKeeper session established.");
    }
}

 运行结果:  

ZooKeeper session established.

结果表明已经成功创建会话。

  

2.3 创建节点 

  ZkClient提供了递归创建节点的接口,即其帮助开发者完成父节点的创建,再创建子节点。

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Create_Node_Sample {
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        String path = "/zk-book/c1";
        zkClient.createPersistent(path, true);
        System.out.println("success create znode.");
    }
}

 运行结果: 

success create znode.

 结果表明已经成功创建了节点,值得注意的是,在原生态接口中是无法创建成功的(父节点不存在),但是通过ZkClient可以递归的先创建父节点,再创建子节点。

  1.png

  可以看到确实成功创建了/zk-book和/zk-book/c1两个节点。

  

2.4 删除节点

  ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点。  

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.createPersistent(path, "");
        zkClient.createPersistent(path+"/c1", "");
        zkClient.deleteRecursive(path);
        System.out.println("success delete znode.");
    }
}

 运行结果:  


success delete znode.


 结果表明ZkClient可直接删除带子节点的父节点,因为其底层先删除其所有子节点,然后再删除父节点。

  

2.5 获取子节点 

package com.hust.grid.leesf.zkclient.examples;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Children_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
            }
        });
        zkClient.createPersistent(path);
        Thread.sleep(1000);
        zkClient.createPersistent(path + "/c1");
        Thread.sleep(1000);
        zkClient.delete(path + "/c1");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

 运行结果:  

/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null


  结果表明:

  客户端可以对一个不存在的节点进行子节点变更的监听。

  一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端

  该节点本身的创建或删除也会通知到客户端。

  

2.6 获取数据

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.createEphemeral(path, "123");
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("Node " + dataPath + " deleted.");
            }
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("Node " + dataPath + " changed, new data: " + data);
            }
        });
        System.out.println(zkClient.readData(path));
        zkClient.writeData(path, "456");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

  运行结果: 

123
Node /zk-book changed, new data: 456
Node /zk-book deleted.


结果表明可以成功监听节点数据变化或删除事件。

  

2.7 检测节点是否存在  

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Exist_Node_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000);
        System.out.println("Node " + path + " exists " + zkClient.exists(path));
    }
}

  运行结果:

Node /zk-book exists false

  结果表明,可以通过ZkClient轻易检测节点是否存在,其相比于原生态的接口更易于理解。


三、Curator客户端


  Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,现已成为Apache的顶级项目。

 

 3.1 添加依赖

  在pom.xml文件中添加如下内容即可。  

  <!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.4.2</version>
        </dependency>


3.2 创建会话

  Curator除了使用一般方法创建会话外,还可以使用fluent风格进行创建。

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Create_Session_Sample {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
        client.start();
        System.out.println("Zookeeper session1 established. ");
        CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
        client1.start();
        System.out.println("Zookeeper session2 established. ");        
    }
}
Zookeeper session1 established. 
Zookeeper session2 established.

  值得注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。

 

 3.3 创建节点

  通过使用Fluent风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Create_Node_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        System.out.println("success create znode: " + path);
    }
}

运行结果: 

success create znode: /zk-book/c1

  其中,也创建了/zk-book/c1的父节点/zk-book节点。

  

3.4 删除节点  

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
        client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
        System.out.println("success delete znode " + path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

 运行结果: 

init
success delete znode /zk-book/c1

  结果表明成功删除/zk-book/c1节点。

 

 3.5 获取数据 

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Get_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
    }
}


  运行结果:  

init

  结果表明成功获取了节点的数据。

  

3.6 更新数据  

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Set_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("Success set node for : " + path + ", new version: "
                + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
        try {
            client.setData().withVersion(stat.getVersion()).forPath(path);
        } catch (Exception e) {
            System.out.println("Fail set node due to " + e.getMessage());
        }
    }
}

运行结果:  

Success set node for : /zk-book, new version: 1
Fail set node due to KeeperErrorCode = BadVersion for /zk-book

  结果表明当携带数据版本不一致时,无法完成更新操作。

  

3.7 异步接口

  如同Zookeeper原生API提供了异步接口,Curator也提供了异步接口。在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。 

package com.hust.grid.leesf.curator.examples;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Create_Node_Background_Sample {
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    static CountDownLatch semaphore = new CountDownLatch(2);
    static ExecutorService tp = Executors.newFixedThreadPool(2);
    public static void main(String[] args) throws Exception {
        client.start();
        System.out.println("Main thread: " + Thread.currentThread().getName());
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                System.out.println();
                semaphore.countDown();
            }
        }, tp).forPath(path, "init".getBytes());
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());
        semaphore.await();
        tp.shutdown();
    }
}


 运行结果:

Main thread: main
event[code: -110, type: CREATE], Thread of processResult: main-EventThread
event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1

  其中,创建节点的事件由线程池自己处理,而非默认线程处理。

  Curator除了提供很便利的API,还提供了一些典型的应用场景,开发人员可以使用参考更好的理解如何使用Zookeeper客户端,所有的都在recipes包中,只需要在pom.xml中添加如下依赖即可

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.4.2</version>
</dependency>


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
1月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
24 0
|
2月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
2月前
|
存储 关系型数据库 MySQL
深度评测:PolarDB-X 开源分布式数据库的优势与实践
本文对阿里云开源分布式数据库 PolarDB-X 进行了详细评测。PolarDB-X 以其高性能、强可用性和出色的扩展能力在云原生数据库市场中脱颖而出。文章首先介绍了 PolarDB-X 的核心产品优势,包括金融级高可靠性、海量数据处理能力和高效的混合负载处理能力。随后,分析了其分布式架构设计,包括计算节点、存储节点、元数据服务和日志节点的功能分工。评测还涵盖了在 Windows 平台通过 WSL 环境部署 PolarDB-X 的过程,强调了环境准备和工具安装的关键步骤。使用体验方面,PolarDB-X 在处理分布式事务和实时分析时表现稳定,但在网络问题和性能瓶颈上仍需优化。最后,提出了改进建
6748 2
|
2月前
|
分布式计算 API 对象存储
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
275 11
|
2月前
|
API
【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
|
2月前
|
算法 前端开发
|
2月前
|
关系型数据库 分布式数据库 数据库
PolarDB,阿里云的开源分布式数据库,与微服务相结合,提供灵活扩展和高效管理解决方案。
【7月更文挑战第3天】PolarDB,阿里云的开源分布式数据库,与微服务相结合,提供灵活扩展和高效管理解决方案。通过数据分片和水平扩展支持微服务弹性,保证高可用性,且兼容MySQL协议,简化集成。示例展示了如何使用Spring Boot配置PolarDB,实现服务动态扩展。PolarDB缓解了微服务数据库挑战,加速了开发部署,为云原生应用奠定基础。
203 3
|
2月前
|
关系型数据库 分布式数据库 PolarDB
**PolarDB开源指南:构建分布式数据库集群**踏上PolarDB开源之旅,了解如何从零开始搭建分布式集群
【7月更文挑战第3天】**PolarDB开源指南:构建分布式数据库集群**踏上PolarDB开源之旅,了解如何从零开始搭建分布式集群。采用存储计算分离架构,适用于大规模OLTP和OLAP。先准备硬件和软件环境,包括Linux、Docker和Git。然后,克隆源码,构建Docker镜像,部署控制节点和计算节点。使用PDCli验证集群状态,开始探索PolarDB的高性能与高可用性。在实践中深化学习,贡献于数据库技术创新。记得在安全环境下测试。
163 1
|
2月前
|
NoSQL 前端开发 算法
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
|
2月前
|
安全 Java
使用Zookeeper实现分布式锁的最佳实践
使用Zookeeper实现分布式锁的最佳实践

热门文章

最新文章