Zookeeper场景实践:(8) 分布式队列

简介: 1.基本介绍 按照ZooKeeper典型应用场景一览里的说法,分布式队列有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。 第二种队列可以先建立一个/queue,赋值为n,表达队列的大小。然后每个队列成员加入时,就判断是否达到队列要求的大小,如果是可以进行下一步动作,否则继续等待队列成员的加入。比较典型的情况是,当一个大的任务可能需要

1.基本介绍

按照ZooKeeper典型应用场景一览里的说法,分布式队列有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。

第二种队列可以先建立一个/queue,赋值为n,表达队列的大小。然后每个队列成员加入时,就判断是否达到队列要求的大小,如果是可以进行下一步动作,否则继续等待队列成员的加入。比较典型的情况是,当一个大的任务可能需要很多的子任务完成才能开始进行。

比如汇总账单的时候,就必须先将用户的消费数据,积分数据等都统计完成后才能开始。汇总账单的程序建立一个队列/Queue,赋值为2,然后分别统计消费数据和积分数据的程序当完成任务时就往/Queue下创建一个临时节点。而汇总账单程序监测到/Queue的子节点个数为2时,就可以开始执行任务了。

实际上,我们也可以先建立一个数目为2的子节点。当一个子任务完成的时候,就删除一个子节点,当所有子节点都被删除的时候,主任务就可以开始执行了。这个过程可以形象的理解为拆除屏障。因此这种队列还有一个专门的词语描述,叫做屏障(barrier)。

2.场景分析

讲了那么多的关于屏障的认识,但是并不打算就去实现它,并且Zookeeper的官方文档也有相关的知识。这次的主要目标是常规的FIFO队列。我将实现队列的两个主要操作:push和pop。

1). int push(zhandle_t *zkhandle,const char *path,char *element)

  • zkhandlezookeeper_init初始化后的句柄
  • path为队列的路径
  • element为要压入队列的内容

2). int pop(zhandle_t *zkhandle,const char *path,char *element_buffer,int *buffer_len)

  • zkhandlezookeeper_init初始化后的句柄
  • path为队列的路径
  • element_buffer为要弹出的缓冲区
  • buffer_len为指向缓冲区的大小的指针

简单来说,假设队列的路径为/Queue,push就是就是创建一个临时有序的/Queue/queue-节点。pop就是取出/Queue/下序列号最小的节点。
我们知道在C++中stl里有一个queue的类,实现了push,pop等操作,然而它是非线程安全的,即多个线程同时push/pop的时候可能会出现错误。而由于ZooKeeper保证了创建节点和删除节点的一致性,因此可以说利用Zookeeper实现的队列是进程安全的。

3. 场景实践

来看push和pop的具体实现。push的实现很简单,就是在{path}下创建一个有序的{path}/queue-子节点.

int push(zhandle_t *zkhandle,const char *path,char *element)
{
    char child_path[512] = {0};
    char path_buffer[512] = {0};
    int bufferlen = sizeof(path_buffer);

    sprintf(child_path,"%s/queue-",path);
    int ret = zoo_create(zkhandle,child_path,element,strlen(element),  
                     &ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE,  
                     path_buffer,bufferlen);  
    if(ret != ZOK){
        fprintf(stderr,"failed to create the path %s!\n",path);
    }else{
        printf("create path %s successfully!\n",path);
    }

    return ret;
}


pop的功能则是取出{path}下序号最小的子节点,如果没有子节点,则返回-1.

int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)
{
    int i = 0;
    struct String_vector children;
    int ret = zoo_get_children(zkhandle,path,0,&children);


    if(ret != ZOK){
        fprintf(stderr,"failed to create the path %s!\n",path);
    }else if (children.count == 0){
        strcpy(element,"");
        *len = 0;
        ret = -1;
    }else{
        char *min = children.data[0];
        for(i = 0; i < children.count; ++i){
            printf("%s:%s\n",min,children.data[i]);
            if(strcmp(min,children.data[i]) > 0){
                min = children.data[i];
            }
        }
        if(min != NULL){
            char child_path[512]={0};
            sprintf(child_path,"%s/%s",path,min);
            ret = zoo_get(zkhandle,child_path,0,element,len,NULL);

            if(ret != ZOK){
                fprintf(stderr,"failed to get data of the path %s!\n",child_path);
            }else{
                ret = zoo_delete(zkhandle,child_path, -1);

                if(ret != ZOK){
                    fprintf(stderr,"failed to delete the path %s!\n",child_path);
                }
            }
        }
    }

    for(i = 0; i < children.count; ++i){
        free(children.data[i]);
        children.data[i] = NULL;
    }


    return ret;
}


最后,再来看看模拟队列操作的程序。和其他程序类似,它的选项有

  • -p:指定队列的路径
  • -m:指定操作是push还是pop
  • -v:只在push时有用,用与指定要push的元素的值
  • -s:指定Zookeeper的服务器的ip:port.

如:

向队列/Queue中压人一个元素,元素的值为"Hello":

>myqueue -s 172.17.0.36:2181 -p /Queue -m push -v Hello

将队列/Queue弹出一个元素

>myqueue -s 172.17.0.36:2181 -p /Queue -m pop


最后附上完整的源代码:

#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include"zookeeper.h"  
#include"zookeeper_log.h"  

char g_host[512]= "172.17.0.36:2181";  
char g_path[512]= "/Queue";
char g_value[512]="msg";
enum MODE{PUSH_MODE,POP_MODE} g_mode;

void print_usage();
void get_option(int argc,const char* argv[]);

/**********unitl*********************/  
void print_usage()
{
    printf("Usage : [myqueue] [-h] [-m mode] [-p path ] [-v value][-s ip:port] \n");
    printf("        -h Show help\n");
    printf("        -p Queue path\n");
    printf("        -m mode:push or pop\n");
    printf("        -v the value you want to push\n");
    printf("        -s zookeeper server ip:port\n");
    printf("For example:\n");
    printf("    push the message \"Hello\" into the queue Queue:\n");
    printf("        >myqueue -s172.17.0.36:2181 -p /Queue -m push -v Hello\n");
    printf("    pop one message from the queue Queue:\n");
    printf("        >myqueue -s172.17.0.36:2181 -p /Queue -m pop\n");
}

void get_option(int argc,const char* argv[])
{
    extern char    *optarg;
    int            optch;
    int            dem = 1;
    const char    optstring[] = "hv:m:p:s:";


    g_mode = PUSH_MODE;
    while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
    {
        switch( optch )
        {
        case 'h':
            print_usage();
            exit(-1);
        case '?':
            print_usage();
            printf("unknown parameter: %c\n", optopt);
            exit(-1);
        case ':':
            print_usage();
            printf("need parameter: %c\n", optopt);
            exit(-1);
        case 'm':
            if(strcasecmp(optarg,"push")==0){
                g_mode = PUSH_MODE;
            }else{
                g_mode = POP_MODE;
            }
            break;
        case 's':
            strncpy(g_host,optarg,sizeof(g_host));
            break;
        case 'p':
            strncpy(g_path,optarg,sizeof(g_path));
            break;
        case 'v':
            strncpy(g_value,optarg,sizeof(g_value));
            break;
        default:
            break;
        }
    }
} 

int push(zhandle_t *zkhandle,const char *path,char *element)
{
    char child_path[512] = {0};
    char path_buffer[512] = {0};
    int bufferlen = sizeof(path_buffer);

    sprintf(child_path,"%s/queue-",path);
    int ret = zoo_create(zkhandle,child_path,element,strlen(element),  
                     &ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE,  
                     path_buffer,bufferlen);  
    if(ret != ZOK){
        fprintf(stderr,"failed to create the path %s!\n",path);
    }else{
        printf("create path %s successfully!\n",path);
    }

    return ret;
}

int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)
{
    int i = 0;
    struct String_vector children;
    int ret = zoo_get_children(zkhandle,path,0,&children);


    if(ret != ZOK){
        fprintf(stderr,"failed to create the path %s!\n",path);
    }else if (children.count == 0){
        strcpy(element,"");
        *len = 0;
        ret = -1;
    }else{
        char *min = children.data[0];
        for(i = 0; i < children.count; ++i){
            printf("%s:%s\n",min,children.data[i]);
            if(strcmp(min,children.data[i]) > 0){
                min = children.data[i];
            }
        }
        if(min != NULL){
            char child_path[512]={0};
            sprintf(child_path,"%s/%s",path,min);
            ret = zoo_get(zkhandle,child_path,0,element,len,NULL);

            if(ret != ZOK){
                fprintf(stderr,"failed to get data of the path %s!\n",child_path);
            }else{
                ret = zoo_delete(zkhandle,child_path, -1);

                if(ret != ZOK){
                    fprintf(stderr,"failed to delete the path %s!\n",child_path);
                }
            }
        }
    }

    for(i = 0; i < children.count; ++i){
        free(children.data[i]);
        children.data[i] = NULL;
    }


    return ret;
}

int front(zhandle_t *zkhandle,char *path,char *element,int *len)
{
    int i = 0;
    struct String_vector children;
    int ret = zoo_get_children(zkhandle,path,0,&children);

    if(ret != ZOK){
        fprintf(stderr,"failed to create the path %s!\n",path);
    }else if(children.count == 0){
        strcpy(element,"");
        *len = 0;
        ret = -1;
    }else{
        char *min = NULL;
        for(i = 0; i < children.count; ++i){
            if(strcmp(min,children.data[i]) > 0){
                min = children.data[i];
            }
        }
        if(min != NULL){
            char child_path[512]={0};
            sprintf(child_path,"%s/%s",path,min);
            ret = zoo_get(zkhandle,child_path,0,element,len,NULL);

            if(ret != ZOK){
                fprintf(stderr,"failed to get data of the path %s!\n",child_path);
            }
        }
    }

    for(i = 0; i < children.count; ++i){
        free(children.data[i]);
        children.data[i] = NULL;
    }

    return ret;

}


int main(int argc, const char *argv[])  
{  
    int timeout = 30000;  
    char path_buffer[512];  
    int bufferlen=sizeof(path_buffer);  

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  

    get_option(argc,argv);

    zhandle_t* zkhandle = zookeeper_init(g_host,NULL, timeout, 0, (char *)"lock Test", 0);  

    if (zkhandle ==NULL)  
    {  
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");  
        exit(EXIT_FAILURE);  
    }  

    int ret = zoo_exists(zkhandle,g_path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",g_path);
        }else{
            printf("create path %s successfully!\n",g_path);
        }
    }

    if(g_mode == PUSH_MODE){
        push(zkhandle,g_path,g_value); 
        printf("push:%s\n",g_value);
    }else{
        int len = sizeof(g_value);
        ret = pop(zkhandle,g_path,g_value,&len) ;

        if(ret == ZOK){
            printf("pop:%s\n",g_value);
        }else if( ret == -1){
            printf("queue is empty\n");
        }
    }



    zookeeper_close(zkhandle); 

    return 0;
}


相关文章
|
6月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
11月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
850 3
|
11月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
490 0
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
406 10
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
381 11
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
579 8
|
调度 数据库
什么场景下要使用分布式锁
分布式锁用于确保多节点环境下的资源互斥访问、避免重复操作、控制并发流量、防止竞态条件及任务调度协调,常见于防止超卖等问题。
458 4
|
NoSQL Java Redis
京东双十一高并发场景下的分布式锁性能优化
【10月更文挑战第20天】在电商领域,尤其是像京东双十一这样的大促活动,系统需要处理极高的并发请求。这些请求往往涉及库存的查询和更新,如果处理不当,很容易出现库存超卖、数据不一致等问题。
494 1
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
489 4