SpringBoot整合Zookeeper,实现分布式集群部署

简介: Zookeeper是一个分布式的、开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现。它提供了一组简单的原语,使得分布式应用能够基于这些原语实现更高层次的服务。

大家好,我是小悟。

一、Zookeeper简介

1.1 什么是Zookeeper

Zookeeper是一个分布式的、开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现。它提供了一组简单的原语,使得分布式应用能够基于这些原语实现更高层次的服务,如:分布式锁、配置管理、命名服务、集群管理等。

1.2 Zookeeper的核心特性

  • 顺序一致性:客户端发起的更新请求,会按照其发出的顺序被应用到Zookeeper
  • 原子性:更新操作要么全部成功,要么全部失败
  • 单一系统映像:无论客户端连接到哪个服务器,都会看到相同的服务视图
  • 可靠性:一旦更新被应用,就会一直保持,直到被覆盖
  • 及时性:保证客户端在特定时间范围内获得最新的数据

1.3 Zookeeper的数据结构

Zookeeper的数据模型类似于Unix文件系统,采用层次化的树形结构,每个节点称为znode,可以存储数据和子节点。

二、详细实现步骤

2.1 环境准备

2.1.1 安装Zookeeper

# 下载Zookeeper(以3.7.0为例)
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
# 解压
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
cd apache-zookeeper-3.7.0-bin
# 创建数据目录和日志目录
mkdir data
mkdir logs
# 复制配置文件
cp conf/zoo_sample.cfg conf/zoo.cfg
# 修改配置
vi conf/zoo.cfg

配置内容:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/path/to/your/zookeeper/data
dataLogDir=/path/to/your/zookeeper/logs
clientPort=2181
# 如果是集群,添加如下配置
# server.1=node1:2888:3888
# server.2=node2:2888:3888
# server.3=node3:2888:3888

2.1.2 启动Zookeeper

# 启动Zookeeper
bin/zkServer.sh start
# 查看状态
bin/zkServer.sh status
# 客户端连接
bin/zkCli.sh -server 127.0.0.1:2181

2.2 SpringBoot项目搭建

2.2.1 创建SpringBoot项目

使用Spring Initializr创建项目,添加以下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/>
    </parent>
    
    <groupId>com.example</groupId>
    <artifactId>springboot-zookeeper-demo</artifactId>
    <version>1.0.0</version>
    
    <properties>
        <java.version>11</java.version>
    </properties>
    
    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- Zookeeper Curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        
        <!-- Zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <!-- 配置处理器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2.3 Zookeeper配置类

2.3.1 配置文件

# application.yml
server:
  port: 8080
spring:
  application:
    name: zookeeper-demo
zookeeper:
  connect-string: localhost:2181
  session-timeout: 5000
  connection-timeout: 5000
  base-sleep-time: 1000
  max-retries: 3
  max-sleep-time: 5000
  namespace: springboot

2.3.2 配置属性类

package com.example.zookeeper.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
    /**
     * 连接字符串
     */
    private String connectString = "localhost:2181";
    
    /**
     * 会话超时时间(毫秒)
     */
    private int sessionTimeout = 5000;
    
    /**
     * 连接超时时间(毫秒)
     */
    private int connectionTimeout = 5000;
    
    /**
     * 重试基础睡眠时间(毫秒)
     */
    private int baseSleepTime = 1000;
    
    /**
     * 最大重试次数
     */
    private int maxRetries = 3;
    
    /**
     * 最大睡眠时间(毫秒)
     */
    private int maxSleepTime = 5000;
    
    /**
     * 命名空间
     */
    private String namespace = "springboot";
}

2.3.3 Zookeeper配置类

package com.example.zookeeper.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Configuration
public class ZookeeperConfig {
    
    @Autowired
    private ZookeeperProperties zookeeperProperties;
    
    private CuratorFramework client;
    
    @Bean
    public CuratorFramework curatorFramework() {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(
            zookeeperProperties.getBaseSleepTime(),
            zookeeperProperties.getMaxRetries(),
            zookeeperProperties.getMaxSleepTime()
        );
        
        // 创建CuratorFramework
        client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperProperties.getConnectString())
                .sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
                .connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
                .retryPolicy(retryPolicy)
                .namespace(zookeeperProperties.getNamespace())
                .build();
        
        return client;
    }
    
    @PostConstruct
    public void init() {
        // 启动客户端
        client.start();
        System.out.println("Zookeeper客户端已启动,连接地址: " + zookeeperProperties.getConnectString());
    }
    
    @PreDestroy
    public void destroy() {
        if (client != null) {
            client.close();
            System.out.println("Zookeeper客户端已关闭");
        }
    }
}

2.4 实现分布式锁

2.4.1 分布式锁工具类

package com.example.zookeeper.lock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class DistributedLock {
    
    @Autowired
    private CuratorFramework curatorFramework;
    
    /**
     * 获取分布式锁
     * @param lockPath 锁路径
     * @param waitTime 等待时间
     * @param timeUnit 时间单位
     * @return 锁对象
     */
    public InterProcessMutex acquireLock(String lockPath, long waitTime, TimeUnit timeUnit) {
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
        try {
            if (lock.acquire(waitTime, timeUnit)) {
                return lock;
            }
        } catch (Exception e) {
            throw new RuntimeException("获取分布式锁失败", e);
        }
        return null;
    }
    
    /**
     * 释放分布式锁
     * @param lock 锁对象
     */
    public void releaseLock(InterProcessMutex lock) {
        if (lock != null && lock.isAcquiredInThisProcess()) {
            try {
                lock.release();
            } catch (Exception e) {
                throw new RuntimeException("释放分布式锁失败", e);
            }
        }
    }
}

2.4.2 分布式锁注解

package com.example.zookeeper.lock;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ZookeeperLock {
    
    /**
     * 锁的路径
     */
    String lockPath();
    
    /**
     * 等待时间,默认5秒
     */
    long waitTime() default 5;
    
    /**
     * 时间单位,默认秒
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}

2.4.3 分布式锁切面

package com.example.zookeeper.lock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class ZookeeperLockAspect {
    
    @Autowired
    private DistributedLock distributedLock;
    
    @Pointcut("@annotation(com.example.zookeeper.lock.ZookeeperLock)")
    public void lockPointcut() {}
    
    @Around("lockPointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        ZookeeperLock lockAnnotation = signature.getMethod().getAnnotation(ZookeeperLock.class);
        
        String lockPath = lockAnnotation.lockPath();
        long waitTime = lockAnnotation.waitTime();
        java.util.concurrent.TimeUnit timeUnit = lockAnnotation.timeUnit();
        
        InterProcessMutex lock = null;
        try {
            // 获取锁
            lock = distributedLock.acquireLock(lockPath, waitTime, timeUnit);
            if (lock == null) {
                throw new RuntimeException("获取分布式锁超时");
            }
            
            // 执行原方法
            return joinPoint.proceed();
        } finally {
            // 释放锁
            if (lock != null) {
                distributedLock.releaseLock(lock);
            }
        }
    }
}

2.5 实现配置中心

2.5.1 配置监听器

package com.example.zookeeper.configcenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ConfigWatcher {
    
    @Autowired
    private CuratorFramework curatorFramework;
    
    private static final String CONFIG_PATH = "/config";
    
    @PostConstruct
    public void init() throws Exception {
        // 确保配置节点存在
        if (curatorFramework.checkExists().forPath(CONFIG_PATH) == null) {
            curatorFramework.create().creatingParentsIfNeeded().forPath(CONFIG_PATH, "default config".getBytes());
        }
        
        // 创建节点缓存
        NodeCache nodeCache = new NodeCache(curatorFramework, CONFIG_PATH);
        nodeCache.start(true);
        
        // 添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                byte[] data = nodeCache.getCurrentData().getData();
                String config = new String(data);
                System.out.println("配置已更新: " + config);
                // 这里可以触发配置刷新逻辑
            }
        });
    }
    
    /**
     * 获取配置
     */
    public String getConfig() throws Exception {
        byte[] data = curatorFramework.getData().forPath(CONFIG_PATH);
        return new String(data);
    }
    
    /**
     * 更新配置
     */
    public void updateConfig(String config) throws Exception {
        curatorFramework.setData().forPath(CONFIG_PATH, config.getBytes());
    }
}

2.6 实现服务注册与发现

2.6.1 服务注册

package com.example.zookeeper.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.UUID;
@Component
public class ServiceRegistry {
    
    @Autowired
    private CuratorFramework curatorFramework;
    
    @Value("${server.port}")
    private int port;
    
    @Value("${spring.application.name}")
    private String serviceName;
    
    private ServiceDiscovery<ServiceInfo> serviceDiscovery;
    private ServiceInstance<ServiceInfo> serviceInstance;
    
    @PostConstruct
    public void init() throws Exception {
        // 创建服务实例
        serviceInstance = ServiceInstance.<ServiceInfo>builder()
                .name(serviceName)
                .port(port)
                .address(getLocalIp())
                .payload(new ServiceInfo(serviceName, "1.0.0"))
                .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
                .id(UUID.randomUUID().toString())
                .build();
        
        // 创建服务发现
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
                .client(curatorFramework)
                .basePath("/services")
                .serializer(new JsonInstanceSerializer<>(ServiceInfo.class))
                .thisInstance(serviceInstance)
                .build();
        
        // 注册服务
        serviceDiscovery.start();
        System.out.println("服务注册成功: " + serviceInstance);
    }
    
    @PreDestroy
    public void destroy() throws Exception {
        if (serviceDiscovery != null) {
            serviceDiscovery.close();
            System.out.println("服务注销成功");
        }
    }
    
    private String getLocalIp() {
        try {
            return java.net.InetAddress.getLocalHost().getHostAddress();
        } catch (Exception e) {
            return "127.0.0.1";
        }
    }
    
    // 服务信息类
    public static class ServiceInfo {
        private String serviceName;
        private String version;
        
        public ServiceInfo() {}
        
        public ServiceInfo(String serviceName, String version) {
            this.serviceName = serviceName;
            this.version = version;
        }
        
        // getters and setters
        public String getServiceName() { return serviceName; }
        public void setServiceName(String serviceName) { this.serviceName = serviceName; }
        public String getVersion() { return version; }
        public void setVersion(String version) { this.version = version; }
    }
}

2.6.2 服务发现

package com.example.zookeeper.registry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@Component
public class ServiceDiscoveryClient {
    
    @Autowired
    private CuratorFramework curatorFramework;
    
    private ServiceDiscovery<ServiceRegistry.ServiceInfo> serviceDiscovery;
    
    @PostConstruct
    public void init() throws Exception {
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceRegistry.ServiceInfo.class)
                .client(curatorFramework)
                .basePath("/services")
                .serializer(new JsonInstanceSerializer<>(ServiceRegistry.ServiceInfo.class))
                .build();
        
        serviceDiscovery.start();
    }
    
    /**
     * 获取所有服务实例
     */
    public Collection<ServiceInstance<ServiceRegistry.ServiceInfo>> getAllInstances(String serviceName) throws Exception {
        return serviceDiscovery.queryForInstances(serviceName);
    }
    
    /**
     * 获取服务实例地址列表
     */
    public List<String> getServiceAddresses(String serviceName) throws Exception {
        return getAllInstances(serviceName).stream()
                .map(instance -> instance.getAddress() + ":" + instance.getPort())
                .collect(Collectors.toList());
    }
    
    /**
     * 获取一个可用的服务实例
     */
    public ServiceInstance<ServiceRegistry.ServiceInfo> getOneInstance(String serviceName) throws Exception {
        Collection<ServiceInstance<ServiceRegistry.ServiceInfo>> instances = getAllInstances(serviceName);
        if (instances.isEmpty()) {
            throw new RuntimeException("未找到可用的服务实例: " + serviceName);
        }
        // 简单的负载均衡:随机选择一个
        return instances.iterator().next();
    }
}

2.7 控制器示例

package com.example.zookeeper.controller;
import com.example.zookeeper.lock.ZookeeperLock;
import com.example.zookeeper.registry.ServiceDiscoveryClient;
import org.apache.curator.x.discovery.ServiceInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/api")
public class DemoController {
    
    @Autowired
    private ServiceDiscoveryClient serviceDiscoveryClient;
    
    @GetMapping("/lock-demo")
    @ZookeeperLock(lockPath = "/lock/order", waitTime = 3)
    public Map<String, Object> lockDemo() {
        Map<String, Object> result = new HashMap<>();
        result.put("code", 200);
        result.put("message", "分布式锁测试成功");
        result.put("timestamp", System.currentTimeMillis());
        return result;
    }
    
    @GetMapping("/services")
    public Map<String, Object> getServices(@RequestParam String serviceName) throws Exception {
        Map<String, Object> result = new HashMap<>();
        
        ServiceInstance<com.example.zookeeper.registry.ServiceRegistry.ServiceInfo> instance = 
            serviceDiscoveryClient.getOneInstance(serviceName);
        
        result.put("code", 200);
        result.put("service", instance);
        result.put("address", instance.getAddress() + ":" + instance.getPort());
        
        return result;
    }
    
    @GetMapping("/health")
    public Map<String, Object> health() {
        Map<String, Object> result = new HashMap<>();
        result.put("status", "UP");
        result.put("timestamp", System.currentTimeMillis());
        return result;
    }
}

2.8 主启动类

package com.example.zookeeper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ZookeeperApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ZookeeperApplication.class, args);
        System.out.println("SpringBoot整合Zookeeper示例已启动");
    }
}

三、集群部署配置

3.1 Zookeeper集群配置

3.1.1 zoo.cfg配置

# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
# 集群配置
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

3.1.2 创建myid文件

# 在每台服务器的dataDir目录下创建myid文件
echo 1 > /data/zookeeper/myid  # 第一台服务器
echo 2 > /data/zookeeper/myid  # 第二台服务器
echo 3 > /data/zookeeper/myid  # 第三台服务器

3.2 SpringBoot应用配置

3.2.1 多环境配置

# application-cluster.yml
zookeeper:
  connect-string: 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
  session-timeout: 5000
  connection-timeout: 5000
  base-sleep-time: 1000
  max-retries: 3
  max-sleep-time: 5000
  namespace: springboot-cluster

3.2.2 启动脚本

#!/bin/bash
# start-cluster.sh
# 启动第一个实例
nohup java -jar springboot-zookeeper-demo.jar \
--spring.profiles.active=cluster \
--server.port=8080 &
# 启动第二个实例
nohup java -jar springboot-zookeeper-demo.jar \
--spring.profiles.active=cluster \
--server.port=8081 &
# 启动第三个实例
nohup java -jar springboot-zookeeper-demo.jar \
--spring.profiles.active=cluster \
--server.port=8082 &
echo "集群启动完成"

四、测试验证

4.1 创建测试类

package com.example.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ZookeeperApplicationTests {
    
    @Autowired
    private CuratorFramework curatorFramework;
    
    @Test
    void testConnection() throws Exception {
        // 测试连接
        String path = "/test";
        String data = "Hello Zookeeper";
        
        // 创建节点
        if (curatorFramework.checkExists().forPath(path) == null) {
            curatorFramework.create().creatingParentsIfNeeded().forPath(path, data.getBytes());
        }
        
        // 读取数据
        byte[] bytes = curatorFramework.getData().forPath(path);
        System.out.println("节点数据: " + new String(bytes));
        
        // 更新数据
        curatorFramework.setData().forPath(path, "Updated Data".getBytes());
        
        // 删除节点
        curatorFramework.delete().forPath(path);
        
        System.out.println("Zookeeper连接测试通过");
    }
}

4.2 分布式锁测试

package com.example.zookeeper;
import com.example.zookeeper.lock.DistributedLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class DistributedLockTest {
    
    @Autowired
    private DistributedLock distributedLock;
    
    private int counter = 0;
    
    @Test
    void testDistributedLock() throws InterruptedException {
        int threadCount = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                InterProcessMutex lock = distributedLock.acquireLock("/test/lock", 5, TimeUnit.SECONDS);
                try {
                    if (lock != null) {
                        // 模拟业务操作
                        System.out.println(Thread.currentThread().getName() + " 获取锁成功");
                        Thread.sleep(100);
                        counter++;
                    } else {
                        System.out.println(Thread.currentThread().getName() + " 获取锁失败");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    distributedLock.releaseLock(lock);
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executorService.shutdown();
        System.out.println("最终计数器值: " + counter);
        assert counter == threadCount;
    }
}

五、总结

5.1 技术要点总结

  1. Zookeeper核心概念
  • Zookeeper提供分布式协调服务,基于ZAB协议保证数据一致性
  • 采用树形节点结构,支持临时节点、顺序节点等特性
  • 提供Watch机制实现事件通知


  1. Curator框架优势
  • 简化了Zookeeper客户端的操作
  • 提供了丰富的分布式锁、选举、缓存等高级功能
  • 内置了重试机制和连接管理


  1. 分布式锁实现
  • 使用InterProcessMutex实现可重入分布式锁
  • 通过AOP实现注解式锁管理
  • 支持锁超时和自动释放


  1. 服务注册与发现
  • 基于ServiceDiscovery实现服务自动注册
  • 支持服务健康检查和负载均衡
  • 实现配置的动态监听和更新


5.2 最佳实践建议

  1. 连接管理
  • 使用连接池管理Zookeeper连接
  • 合理设置超时时间和重试策略
  • 监控连接状态,实现自动重连


  1. 节点设计
  • 合理规划节点路径,避免节点过多
  • 及时清理临时节点和不再使用的数据
  • 使用命名空间隔离不同环境


  1. 集群部署
  • 建议使用奇数个节点(3或5)组成集群
  • 确保节点间网络通畅
  • 定期备份数据和快照


  1. 性能优化
  • 避免频繁的节点创建和删除
  • 合理使用Watch机制,避免过多监听
  • 使用异步接口处理耗时操作


5.3 适用场景

  1. 分布式锁:秒杀系统、订单处理等需要互斥访问的场景
  2. 配置中心:动态配置管理,配置热更新
  3. 服务注册发现:微服务架构中的服务治理
  4. 分布式协调:分布式任务调度、主从选举
  5. 命名服务:分布式系统中的服务寻址

5.4 注意事项

  1. 数据一致性:Zookeeper保证最终一致性,但不是强一致性
  2. 性能限制:Zookeeper不适合存储大量数据,主要用于协调信息
  3. 脑裂问题:网络分区时可能出现脑裂,需要合理配置超时时间
  4. 版本兼容:注意客户端和服务端版本兼容性

通过以上实现,我们成功将SpringBoot与Zookeeper集成,实现了分布式锁、配置中心、服务注册发现等核心功能,为构建分布式系统提供了可靠的基础设施支持。在实际应用中,可以根据具体需求选择合适的功能模块,并进行适当的优化和扩展。

SpringBoot整合Zookeeper,实现分布式集群部署.png

谢谢你看我的文章,既然看到这里了,如果觉得不错,随手点个赞、转发、在看三连吧,感谢感谢。那我们,下次再见。

您的一键三连,是我更新的最大动力,谢谢

山水有相逢,来日皆可期,谢谢阅读,我们再会

我手中的金箍棒,上能通天,下能探海

相关文章
|
6天前
|
人工智能 JSON 监控
Claude Code 源码泄露:一份价值亿元的 AI 工程公开课
我以为顶级 AI 产品的护城河是模型。读完这 51.2 万行泄露的源码,我发现自己错了。
4245 17
|
16天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
11677 138
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
4天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
1443 8
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
6天前
|
人工智能 自然语言处理 数据挖掘
零基础30分钟搞定 Claude Code,这一步90%的人直接跳过了
本文直击Claude Code使用痛点,提供零基础30分钟上手指南:强调必须配置“工作上下文”(about-me.md+anti-ai-style.md)、采用Cowork/Code模式、建立标准文件结构、用提问式提示词驱动AI理解→规划→执行。附可复制模板与真实项目启动法,助你将Claude从聊天工具升级为高效执行系统。
|
6天前
|
人工智能 定位技术
Claude Code源码泄露:8大隐藏功能曝光
2026年3月,Anthropic因配置失误致Claude Code超51万行源码泄露,意外促成“被动开源”。代码中藏有8大未发布功能,揭示其向“超级智能体”演进的完整蓝图,引发AI编程领域震动。(239字)
2408 9