分布式调用与高并发处理 Zookeeper分布式协调服务(三)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 分布式调用与高并发处理 Zookeeper分布式协调服务(三)

3.5 Watcher监听机制

ZooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。

注意:

在ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

监听节点变化

#纯净客户端监听Java节点的变化
ls -w /java
#在zk-1客户端创建/java/spring节点
[zk: localhost:2181(CONNECTED) 1] create /java/spring   spirngmvc
Created /java/spring
#纯净客户端监听到了Java节点的变化
[zk: localhost:2181(CONNECTED) 11] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/java

注意:

命令如果使用 -w ,那么监听的是节点的变化,而不是值的变化。

监听节点的值的变化

#纯净客户端监听节点/java/spring的值
get -w /java/spring
#zk-1客户端修改/java/spring节点的值
[zk: localhost:2181(CONNECTED) 2] set /java/spring  mybatis
#纯净客户端监听到节点值的变化
[zk: localhost:2181(CONNECTED) 2] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/java/spring

注意:

watch监听机制只能够使用一次,如果下次想要使用,必须重新监听,就比如ls path watch命令,只能监听节点路径的改变一次,如果还想监听,那么需要再执行一次ls path watch命令。

3.6 权限控制ACL

在ZooKeeper的实际使用中,我们的做法往往是搭建一个共用的ZooKeeper集群,统一为若干个应用提供服务。在这种情况下,不同的应用之间往往是不会存在共享数据的使用场景的,因此需要解决不同应用之间的权限问题。

注意:

  1. ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  2. 每个znode支持设置多种权限控制方案和多个权限
  3. 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

权限模式schema

ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:

方案 描述
world 只有一个用户:anyone,代表所有人(默认)
ip 使用IP地址认证
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证

授权对象ID

授权对象ID是指,权限赋予的用户或者一个实体,例如:IP 地址或者机器。授权模式 schema 与 授权对象 ID 之间关系:

权限模式 授权对象
IP 通常是一个IP地址或是IP段,例如“192.168.66.101”
Digest 自定义,通常是“username:BASE64(SHA-1(username:password))”
World 只有一个ID:"anyone"
Super 与Digest模式一致

权限permission

权限 ACL简写 描述
CREATE c 可以创建子节点
DELETE d 可以删除子节点(仅下一级节点)
READ r 可以读取节点数据及显示子节点列表
WRITE w 可以设置节点数据
ADMIN a 可以设置节点访问控制列表权限

权限相关命令

命令 使用方式 描述
getAcl getAcl 读取ACL权限
setAcl setAcl 设置ACL权限
addauth addauth 添加认证用户

示例:World方案

[zk: localhost:2181(CONNECTED) 0] create /node1 1
Created /node1
[zk: localhost:2181(CONNECTED) 1] getAcl /node1
'world,'anyone  #默认为world方案
: cdrwa #任何人都拥有所有权限

示例:IP方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node2 1
Created /node2
#设置权限(只有ip地址为192.168.66.100的客户端才能操作该节点)
[zk: localhost:2181(CONNECTED) 1] setAcl /node2 ip:192.168.66.100:cdrwa 
#使用IP非 192.168.66.100 的机器
[zk: localhost:2181(CONNECTED) 0] get /node2
Authentication is not valid : /node2 #没有权限

示例:Auth方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node3 1
Created /node3
#添加认证用户zj,密码123456
[zk: localhost:2181(CONNECTED) 1] addauth digest zj:123456
#为创建的权限用户zj设置权限
[zk: localhost:2181(CONNECTED) 2] setAcl /node3 auth:zj:cdrwa
#获取权限
[zk: localhost:2181(CONNECTED) 3] getAcl /node3
'digest,'zj:UvJWhBril5yzpEiA2eV7bwwhfLs=
: cdrwa

示例:Digest方案

#密码加密
echo -n zj:123456 | openssl dgst -binary -sha1 | openssl base64
UvJWhBril5yzpEiA2eV7bwwhfLs=
#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node4 1
Created /node4
#使用是算好的密文密码添加权限:
[zk: localhost:2181(CONNECTED) 1] setAcl /node4 digest:zj:UvJWhBril5yzpEiA2eV7bwwhfLs=:cdrwa
 #获取节点数据没有权限
[zk: localhost:2181(CONNECTED) 3] get /node4
Authentication is not valid : /node4
#添加认证用户
[zk: localhost:2181(CONNECTED) 4] addauth digest zj:123456
#成功读取数据
[zk: localhost:2181(CONNECTED) 5] get /node4 1

四、原生api操作Zookeeper

利用Zookeeper官方的原生java api进行连接,然后演示一些创建、删除、修改、查询节点的操作。

1.创建maven项目,引入依赖。

<dependencies>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
  </dependency>
</dependencies>

2.创建连接和节点

注意:在使用Java连接zookeeper服务的时候先要在虚拟机开启zookeeper的服务。(zkSserver.sh start)

public class ZKMain {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /**
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                                                         "192.168.66.101:2181," +
                                                          "192.168.66.102:2181", 4000, null);
        //2.查看连接状态
        System.out.println(zooKeeper.getState());
        //3.创建节点
        /*
         * 第一个参数:节点的名字
         * 第二个参数:节点的数据
         * 第三个参数:节点的权限策略
         *            ZooDefs.Ids.OPEN_ACL_UNSAFE:完全开放任何人都能操作
         *            ZooDefs.Ids.CREATOR_ALL_ACL:创建者才能操作
         *            ZooDefs.Ids.READ_ACL_UNSAFE:只读权限
         * 第四个参数:节点类型(持久、临时、……)
         *           CreateMode.PERSISTENT:持久型节点
         *           CreateMode.EPHEMERAL:临时节点
         *           CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
         *           CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
         */
            zooKeeper.create("/zkTest","1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

znode 类型有四种

  • PERSISTENT - 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
  • PERSISTENT_SEQUENTIAL - 持久化,并带有序列号
  • EPHEMERAL - 临时目录节点,客户端与zookeeper断开连接后,该节点被删除
  • EPHEMERAL_SEQUENTIAL - 临时,并带有序列号

3.修改节点数据

//5.修改节点
        /**
         * 参数一:节点名称
         * 参数二:修改的数据
         * 参数三:版本,全部修改设为-1
         */
        zooKeeper.setData("/zkTest","2".getBytes(),-1);

4.获取节点数据和节点信息

//6.获取节点数据
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         * 参数三:状态信息
         */
        byte[] data = zooKeeper.getData("/zkTest", null, null);
        System.out.println(new String(data));
        //7.获取节点的子节点信息
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         */
        List<String> children = zooKeeper.getChildren("/zkTest", null);
        for (String child : children) {
            System.out.println(child);
        }

5.监听节点

public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //2.注册监听机制监听节点
        zooKeeper.getChildren("/zkTest", new Watcher() {
            /*对 /zkTest 的子节点进行操作的时候会回调该监听方法*/
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        });
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

删除 /zkTest 的子节点  /zkTest1 节点的数据时触发监听:

6.监听节点数据

package com.zj;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //3.监听节点数据
        zooKeeper.getData("/zkTest", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        },null);
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

修改 /zkTest 节点的数据:

注意:

通过zooKeeper.getchildren("/",new watch()){}来注册监听,监听的是整个根节点,但是这个监听只能监听一次。线程休眠是为了让监听等待事件发生,不然会随着程序直接运行完。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
14天前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
8天前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
2月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
94 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
2月前
|
NoSQL Java Redis
京东双十一高并发场景下的分布式锁性能优化
【10月更文挑战第20天】在电商领域,尤其是像京东双十一这样的大促活动,系统需要处理极高的并发请求。这些请求往往涉及库存的查询和更新,如果处理不当,很容易出现库存超卖、数据不一致等问题。
56 1
|
2月前
|
监控 Dubbo Java
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
133 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
2月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
71 4
|
2月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
57 3
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
48 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
50 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
51 0