ZooKeeper场景实践:(7) 分布式锁

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 1.基本介绍 分布式锁是控制分布式系统之间同步访问共享资源的一种方式,需要互斥来防止彼此干扰来保证一致性。利用Zookeeper的强一致性可以完成锁服务。Zookeeper的官方文档是列举了两种锁,独占锁和共享锁。独占锁保证任何时候都只有一个进程能或者资源的读写权限。共享锁可以同时有多个读,但是同一时刻最多只能有一个写,读和写是互斥的。 2.场景分析 我们

1.基本介绍

分布式锁是控制分布式系统之间同步访问共享资源的一种方式,需要互斥来防止彼此干扰来保证一致性。利用Zookeeper的强一致性可以完成锁服务。Zookeeper的官方文档是列举了两种锁,独占锁和共享锁。独占锁保证任何时候都只有一个进程能或者资源的读写权限。共享锁可以同时有多个读,但是同一时刻最多只能有一个写,读和写是互斥的。

2.场景分析

我们准备来实现互斥的锁,按照官网的思路,给定一个锁的路径,如/Lock,所有要申请这个锁的进程都在/Lock目录下创建一个/Lock/lock-的临时序列节点,并监控/Lock的子节点变化事件。当子节点发送变化时用get_children()获取子节点的列表,如果发现进程发现自己拥有最小的一个序号,则获得锁。处理业务完毕后需要释放锁,此时只需要删除该临时节点即可。简单来说就是永远是拥有最小序号的进程获得锁。

3.场景实践

使用锁有两个基本的函数,就是lockunlock.定义为

  • Lock *lock(zhandle_t *zkhandle,const char *path)
    lock函数有两个参数,一个是zookeeper_init返回的句柄zkhandle,另一个是锁的路径,如果成功则返回一个Lock的结构体指针,并同时获得锁,否则返回NULL。
  • int unlock(zhandle_t *zkhandle,Lock * *lock)
    unlock函数也有两个参数,一个是zookeeper_init返回的句柄zkhandle,另一个是lock函数返回的结构体指针的指针

接下来在看具体的实现。

Lock *lock(zhandle_t *zkhandle,const char *path)
{
    Lock *lock = create_lock(zkhandle,path);
    if(lock != NULL){
        while(try_lock(zkhandle,lock) == 0){
            sleep(1);
        }
    }else{
        fprintf(stderr,"error when create lock %s.\n",path);
    }

    return lock;
}


  • create_lock:负责锁的初始化,主要功能是负责创建{path}的节点已经{path}/lock-的临时序列节点。{path}如果存在则不再创建。
  • try_lock:尝试加锁,这个函数不会等待,失败和成功都立即返回。其主要功能是获取{path}的子节点列表,并查看自己是否是拥有最小序列号的节点,如果是则返回1,否则返回0;

lock函数初始化锁后,会持续的尝试加锁,直到成功。虽然我是这样实现的,但是过于简单粗暴(哈哈)。如果拿不到锁的话,持续就会阻塞在lock函数。

int unlock(zhandle_t *zkhandle,Lock * *lock)
{
    if(*lock){
        int ret = zoo_delete(zkhandle,(*lock)->selfpath,-1);
        if(ret != ZOK){
            fprintf(stderr,"error when release lock %s.\n",(*lock)->selfpath);
        }
        free(*lock);
        *lock = NULL;

        return ret;
    }

    return ZOK;
}


unlock函数就非常简单了,就是将create_lock中创建的临时序列节点删除就可以了。

接下来在看下模拟程序的功能。

> ./mylock -h
Usage : [mylock] [-h]  [-p path][-s ip:port] 
        -h Show help
        -p lock path
        -s zookeeper server ip:port
For example:
    mylock -s 172.17.0.36:2181 -p /Lock


模拟程序有3个选项。其中
-s:为Zookeeper的服务器的ip:port.
-p: 为锁的路径。

分别同时运行多个mylock程序,就可以看到各个程序之间是如何获取锁的了。

最后是完整的代码:

#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include"zookeeper.h"  
#include"zookeeper_log.h"  

char g_host[512]= "172.17.0.36:2181";  
char g_path[512]= "/Lock";

typedef struct Lock
{
    char lockpath[1024];
    char selfpath[1024];
}Lock;

void print_usage();
void get_option(int argc,const char* argv[]);

/**********unitl*********************/  
void print_usage()
{
    printf("Usage : [mylock] [-h]  [-p path][-s ip:port] \n");
    printf("        -h Show help\n");
    printf("        -p lock path\n");
    printf("        -s zookeeper server ip:port\n");
    printf("For example:\n");
    printf("    mylock -s172.17.0.36:2181 -p /Lock\n");
}

void get_option(int argc,const char* argv[])
{
    extern char    *optarg;
    int            optch;
    int            dem = 1;
    const char    optstring[] = "hp:s:";


    while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
    {
        switch( optch )
        {
        case 'h':
            print_usage();
            exit(-1);
        case '?':
            print_usage();
            printf("unknown parameter: %c\n", optopt);
            exit(-1);
        case ':':
            print_usage();
            printf("need parameter: %c\n", optopt);
            exit(-1);
        case 's':
            strncpy(g_host,optarg,sizeof(g_host));
            break;
        case 'p':
            strncpy(g_path,optarg,sizeof(g_path));
            break;
        default:
            break;
        }
    }
} 

Lock *create_lock(zhandle_t *zkhandle,const char *path)
{
    char path_buffer[512]={0};
    int bufferlen = sizeof(path_buffer);
    Lock * lock = NULL;

    int ret = zoo_exists(zkhandle,path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",path);
        }else{
            printf("create path %s successfully!\n",path);
        }
    }
    if(ret == ZOK){
        char child_path[512];
        sprintf(child_path,"%s/lock-",path);
        ret = zoo_create(zkhandle,child_path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE|ZOO_EPHEMERAL,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",path);
        }else{
            printf("create path %s successfully!\n",path);
        }
    }
    if(ret == ZOK){
        lock = (Lock *)malloc(sizeof(Lock));

        strcpy(lock->lockpath,path);
        strcpy(lock->selfpath,path_buffer);
    }

    return lock;
}

int try_lock(zhandle_t *zkhandle,Lock *lock)
{
    struct String_vector children;
    int i = 0;
    int ret = zoo_get_children(zkhandle,lock->lockpath,0,&children);

    if(ret != ZOK){
        fprintf(stderr,"error when get children of path %s\n",lock->lockpath);
        ret = -1;
    }else{
        char *myseq = rindex(lock->selfpath,'/');
        if (myseq != NULL) myseq += 1;

        ret = 1;
        for(i = 0; i < children.count; ++i){
            if(strcmp(children.data[i],myseq) < 0){
                ret = 0;
                break;
            }            
        }

        for(i = 0; i < children.count; ++i){
            free(children.data[i]);
            children.data[i] = NULL;
        }
    }

    return ret;
}

Lock *lock(zhandle_t *zkhandle,const char *path)
{
    Lock *lock = create_lock(zkhandle,path);
    if(lock != NULL){
        while(try_lock(zkhandle,lock) == 0){
            sleep(1);
        }
    }else{
        fprintf(stderr,"error when create lock %s.\n",path);
    }

    return lock;
}

int unlock(zhandle_t *zkhandle,Lock * *lock)
{
    if(*lock){
        int ret = zoo_delete(zkhandle,(*lock)->selfpath,-1);
        if(ret != ZOK){
            fprintf(stderr,"error when release lock %s.\n",(*lock)->selfpath);
        }
        free(*lock);
        *lock = NULL;

        return ret;
    }

    return ZOK;
}

int main(int argc, const char *argv[])  
{  
    int timeout = 30000;  
    char path_buffer[512];  
    int bufferlen=sizeof(path_buffer);  

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  

    get_option(argc,argv);

    zhandle_t* zkhandle = zookeeper_init(g_host,NULL, timeout, 0, (char *)"lock Test", 0);  

    if (zkhandle ==NULL)  
    {  
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");  
        exit(EXIT_FAILURE);  
    }  

    int ret = zoo_exists(zkhandle,g_path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",g_path);
        }else{
            printf("create path %s successfully!\n",g_path);
        }
    }

    if(ret == ZOK ){
       Lock *mylock = lock(zkhandle,g_path);

        if(mylock){
            printf("get lock of %s.\n",g_path);
            printf("self path is %s.\n",mylock->selfpath);

            printf("do something....\n");
            getchar();

            unlock(zkhandle,&mylock);
        }    
    }

    zookeeper_close(zkhandle); 

    return 0;
}



相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
17天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
56 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
3天前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
|
26天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
1月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
63 10
|
1月前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
阿里云容器服务ACK提供强大的产品能力,支持弹性、调度、可观测、成本治理和安全合规。针对拥有IDC或三方资源的企业,ACK One分布式云容器平台能够有效解决资源管理、多云多集群管理及边缘计算等挑战,实现云上云下统一管理,提升业务效率与稳定性。
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
1月前
|
调度 数据库
什么场景下要使用分布式锁
分布式锁用于确保多节点环境下的资源互斥访问、避免重复操作、控制并发流量、防止竞态条件及任务调度协调,常见于防止超卖等问题。
49 4
|
1月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
84 4
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
87 8
|
3月前
|
NoSQL Java Redis
京东双十一高并发场景下的分布式锁性能优化
【10月更文挑战第20天】在电商领域,尤其是像京东双十一这样的大促活动,系统需要处理极高的并发请求。这些请求往往涉及库存的查询和更新,如果处理不当,很容易出现库存超卖、数据不一致等问题。
85 1

热门文章

最新文章