分布式调用与高并发处理 Zookeeper分布式协调服务(三)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 分布式调用与高并发处理 Zookeeper分布式协调服务(三)

3.5 Watcher监听机制

ZooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。

注意:

在ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

监听节点变化

#纯净客户端监听Java节点的变化
ls -w /java
#在zk-1客户端创建/java/spring节点
[zk: localhost:2181(CONNECTED) 1] create /java/spring   spirngmvc
Created /java/spring
#纯净客户端监听到了Java节点的变化
[zk: localhost:2181(CONNECTED) 11] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/java

注意:

命令如果使用 -w ,那么监听的是节点的变化,而不是值的变化。

监听节点的值的变化

#纯净客户端监听节点/java/spring的值
get -w /java/spring
#zk-1客户端修改/java/spring节点的值
[zk: localhost:2181(CONNECTED) 2] set /java/spring  mybatis
#纯净客户端监听到节点值的变化
[zk: localhost:2181(CONNECTED) 2] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/java/spring

注意:

watch监听机制只能够使用一次,如果下次想要使用,必须重新监听,就比如ls path watch命令,只能监听节点路径的改变一次,如果还想监听,那么需要再执行一次ls path watch命令。

3.6 权限控制ACL

在ZooKeeper的实际使用中,我们的做法往往是搭建一个共用的ZooKeeper集群,统一为若干个应用提供服务。在这种情况下,不同的应用之间往往是不会存在共享数据的使用场景的,因此需要解决不同应用之间的权限问题。

注意:

  1. ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  2. 每个znode支持设置多种权限控制方案和多个权限
  3. 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

权限模式schema

ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:

方案 描述
world 只有一个用户:anyone,代表所有人(默认)
ip 使用IP地址认证
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证

授权对象ID

授权对象ID是指,权限赋予的用户或者一个实体,例如:IP 地址或者机器。授权模式 schema 与 授权对象 ID 之间关系:

权限模式 授权对象
IP 通常是一个IP地址或是IP段,例如“192.168.66.101”
Digest 自定义,通常是“username:BASE64(SHA-1(username:password))”
World 只有一个ID:"anyone"
Super 与Digest模式一致

权限permission

权限 ACL简写 描述
CREATE c 可以创建子节点
DELETE d 可以删除子节点(仅下一级节点)
READ r 可以读取节点数据及显示子节点列表
WRITE w 可以设置节点数据
ADMIN a 可以设置节点访问控制列表权限

权限相关命令

命令 使用方式 描述
getAcl getAcl 读取ACL权限
setAcl setAcl 设置ACL权限
addauth addauth 添加认证用户

示例:World方案

[zk: localhost:2181(CONNECTED) 0] create /node1 1
Created /node1
[zk: localhost:2181(CONNECTED) 1] getAcl /node1
'world,'anyone  #默认为world方案
: cdrwa #任何人都拥有所有权限

示例:IP方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node2 1
Created /node2
#设置权限(只有ip地址为192.168.66.100的客户端才能操作该节点)
[zk: localhost:2181(CONNECTED) 1] setAcl /node2 ip:192.168.66.100:cdrwa 
#使用IP非 192.168.66.100 的机器
[zk: localhost:2181(CONNECTED) 0] get /node2
Authentication is not valid : /node2 #没有权限

示例:Auth方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node3 1
Created /node3
#添加认证用户zj,密码123456
[zk: localhost:2181(CONNECTED) 1] addauth digest zj:123456
#为创建的权限用户zj设置权限
[zk: localhost:2181(CONNECTED) 2] setAcl /node3 auth:zj:cdrwa
#获取权限
[zk: localhost:2181(CONNECTED) 3] getAcl /node3
'digest,'zj:UvJWhBril5yzpEiA2eV7bwwhfLs=
: cdrwa

示例:Digest方案

#密码加密
echo -n zj:123456 | openssl dgst -binary -sha1 | openssl base64
UvJWhBril5yzpEiA2eV7bwwhfLs=
#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node4 1
Created /node4
#使用是算好的密文密码添加权限:
[zk: localhost:2181(CONNECTED) 1] setAcl /node4 digest:zj:UvJWhBril5yzpEiA2eV7bwwhfLs=:cdrwa
 #获取节点数据没有权限
[zk: localhost:2181(CONNECTED) 3] get /node4
Authentication is not valid : /node4
#添加认证用户
[zk: localhost:2181(CONNECTED) 4] addauth digest zj:123456
#成功读取数据
[zk: localhost:2181(CONNECTED) 5] get /node4 1

四、原生api操作Zookeeper

利用Zookeeper官方的原生java api进行连接,然后演示一些创建、删除、修改、查询节点的操作。

1.创建maven项目,引入依赖。

<dependencies>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
  </dependency>
</dependencies>

2.创建连接和节点

注意:在使用Java连接zookeeper服务的时候先要在虚拟机开启zookeeper的服务。(zkSserver.sh start)

public class ZKMain {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /**
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                                                         "192.168.66.101:2181," +
                                                          "192.168.66.102:2181", 4000, null);
        //2.查看连接状态
        System.out.println(zooKeeper.getState());
        //3.创建节点
        /*
         * 第一个参数:节点的名字
         * 第二个参数:节点的数据
         * 第三个参数:节点的权限策略
         *            ZooDefs.Ids.OPEN_ACL_UNSAFE:完全开放任何人都能操作
         *            ZooDefs.Ids.CREATOR_ALL_ACL:创建者才能操作
         *            ZooDefs.Ids.READ_ACL_UNSAFE:只读权限
         * 第四个参数:节点类型(持久、临时、……)
         *           CreateMode.PERSISTENT:持久型节点
         *           CreateMode.EPHEMERAL:临时节点
         *           CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
         *           CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
         */
            zooKeeper.create("/zkTest","1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

znode 类型有四种

  • PERSISTENT - 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
  • PERSISTENT_SEQUENTIAL - 持久化,并带有序列号
  • EPHEMERAL - 临时目录节点,客户端与zookeeper断开连接后,该节点被删除
  • EPHEMERAL_SEQUENTIAL - 临时,并带有序列号

3.修改节点数据

//5.修改节点
        /**
         * 参数一:节点名称
         * 参数二:修改的数据
         * 参数三:版本,全部修改设为-1
         */
        zooKeeper.setData("/zkTest","2".getBytes(),-1);

4.获取节点数据和节点信息

//6.获取节点数据
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         * 参数三:状态信息
         */
        byte[] data = zooKeeper.getData("/zkTest", null, null);
        System.out.println(new String(data));
        //7.获取节点的子节点信息
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         */
        List<String> children = zooKeeper.getChildren("/zkTest", null);
        for (String child : children) {
            System.out.println(child);
        }

5.监听节点

public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //2.注册监听机制监听节点
        zooKeeper.getChildren("/zkTest", new Watcher() {
            /*对 /zkTest 的子节点进行操作的时候会回调该监听方法*/
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        });
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

删除 /zkTest 的子节点  /zkTest1 节点的数据时触发监听:

6.监听节点数据

package com.zj;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //3.监听节点数据
        zooKeeper.getData("/zkTest", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        },null);
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

修改 /zkTest 节点的数据:

注意:

通过zooKeeper.getchildren("/",new watch()){}来注册监听,监听的是整个根节点,但是这个监听只能监听一次。线程休眠是为了让监听等待事件发生,不然会随着程序直接运行完。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
138 3
|
1月前
|
消息中间件 存储 安全
分布式系统架构3:服务容错
分布式系统因其复杂性,故障几乎是必然的。那么如何让系统在不可避免的故障中依然保持稳定?本文详细介绍了分布式架构中7种核心的服务容错策略,包括故障转移、快速失败、安全失败等,以及它们在实际业务场景中的应用。无论是支付场景的快速失败,还是日志采集的安全失败,每种策略都有自己的适用领域和优缺点。此外,文章还为技术面试提供了解题思路,助你在关键时刻脱颖而出。掌握这些策略,不仅能提升系统健壮性,还能让你的技术栈更上一层楼!快来深入学习,走向架构师之路吧!
65 11
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
71 1
|
4月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
118 3
|
3月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
64 0
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
5月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
150 2
基于Redis的高可用分布式锁——RedLock
|
1月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
162 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
87 8
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
70 16

热门文章

最新文章