zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Zookeeper场景实践:(8) 分布式队列

2023-09-14 09:01:04 时间
按照ZooKeeper典型应用场景一览里的说法,分布式队列有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。 第二种队列可以先建立一个/queue,赋值为n,表达队列的大小。然后每个队列成员加入时,就判断是否达到队列要求的大小,如果是可以进行下一步动作,否则继续等待队列成员的加入。比较典型的情况是,当一个大的任务可能需要
按照ZooKeeper典型应用场景一览里的说法,分布式队列有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。

第二种队列可以先建立一个/queue,赋值为n,表达队列的大小。然后每个队列成员加入时,就判断是否达到队列要求的大小,如果是可以进行下一步动作,否则继续等待队列成员的加入。比较典型的情况是,当一个大的任务可能需要很多的子任务完成才能开始进行。

比如汇总账单的时候,就必须先将用户的消费数据,积分数据等都统计完成后才能开始。汇总账单的程序建立一个队列/Queue,赋值为2,然后分别统计消费数据和积分数据的程序当完成任务时就往/Queue下创建一个临时节点。而汇总账单程序监测到/Queue的子节点个数为2时,就可以开始执行任务了。

实际上,我们也可以先建立一个数目为2的子节点。当一个子任务完成的时候,就删除一个子节点,当所有子节点都被删除的时候,主任务就可以开始执行了。这个过程可以形象的理解为拆除屏障。因此这种队列还有一个专门的词语描述,叫做屏障(barrier)。

2.场景分析 讲了那么多的关于屏障的认识,但是并不打算就去实现它,并且Zookeeper的官方文档也有相关的知识。这次的主要目标是常规的FIFO队列。我将实现队列的两个主要操作:push和pop。

int push(zhandle_t *zkhandle,const char *path,char *element)


int pop(zhandle_t *zkhandle,const char *path,char *element_buffer,int *buffer_len)


简单来说,假设队列的路径为/Queue,push就是就是创建一个临时有序的/Queue/queue-节点。pop就是取出/Queue/下序列号最小的节点。
我们知道在C++中stl里有一个queue的类,实现了push,pop等操作,然而它是非线程安全的,即多个线程同时push/pop的时候可能会出现错误。而由于ZooKeeper保证了创建节点和删除节点的一致性,因此可以说利用Zookeeper实现的队列是进程安全的。

3. 场景实践 来看push和pop的具体实现。push的实现很简单,就是在{path}下创建一个有序的{path}/queue-子节点.

int push(zhandle_t *zkhandle,const char *path,char *element)

 char child_path[512] = {0};

 char path_buffer[512] = {0};

 int bufferlen = sizeof(path_buffer);

 sprintf(child_path,"%s/queue-",path);

 int ret = zoo_create(zkhandle,child_path,element,strlen(element), 

 ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE, 

 path_buffer,bufferlen); 

 if(ret != ZOK){

 fprintf(stderr,"failed to create the path %s!\n",path);

 }else{

 printf("create path %s successfully!\n",path);

 return ret;

}


pop的功能则是取出{path}下序号最小的子节点,如果没有子节点,则返回-1.

int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)

 int i = 0;

 struct String_vector children;

 int ret = zoo_get_children(zkhandle,path,0, children);


fprintf(stderr,"failed to create the path %s!\n",path); }else if (children.count == 0){ strcpy(element,""); *len = 0; ret = -1; }else{ char *min = children.data[0]; for(i = 0; i children.count; ++i){ printf("%s:%s\n",min,children.data[i]); if(strcmp(min,children.data[i]) 0){ min = children.data[i]; if(min != NULL){ char child_path[512]={0}; sprintf(child_path,"%s/%s",path,min); ret = zoo_get(zkhandle,child_path,0,element,len,NULL); if(ret != ZOK){ fprintf(stderr,"failed to get data of the path %s!\n",child_path); }else{ ret = zoo_delete(zkhandle,child_path, -1); if(ret != ZOK){ fprintf(stderr,"failed to delete the path %s!\n",child_path); for(i = 0; i children.count; ++i){ free(children.data[i]); children.data[i] = NULL;
printf("Usage : [myqueue] [-h] [-m mode] [-p path ] [-v value][-s ip:port] \n"); printf(" -h Show help\n"); printf(" -p Queue path\n"); printf(" -m mode:push or pop\n"); printf(" -v the value you want to push\n"); printf(" -s zookeeper server ip:port\n"); printf("For example:\n"); printf(" push the message \"Hello\" into the queue Queue:\n"); printf(" myqueue -s172.17.0.36:2181 -p /Queue -m push -v Hello\n"); printf(" pop one message from the queue Queue:\n"); printf(" myqueue -s172.17.0.36:2181 -p /Queue -m pop\n"); void get_option(int argc,const char* argv[]) extern char *optarg; int optch; int dem = 1; const char optstring[] = "hv:m:p:s:";
g_mode = PUSH_MODE; while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 ) switch( optch ) case h: print_usage(); exit(-1); case ?: print_usage(); printf("unknown parameter: %c\n", optopt); exit(-1); case :: print_usage(); printf("need parameter: %c\n", optopt); exit(-1); case m: if(strcasecmp(optarg,"push")==0){ g_mode = PUSH_MODE; }else{ g_mode = POP_MODE; break; case s: strncpy(g_host,optarg,sizeof(g_host)); break; case p: strncpy(g_path,optarg,sizeof(g_path)); break; case v: strncpy(g_value,optarg,sizeof(g_value)); break; default: break; int push(zhandle_t *zkhandle,const char *path,char *element) char child_path[512] = {0}; char path_buffer[512] = {0}; int bufferlen = sizeof(path_buffer); sprintf(child_path,"%s/queue-",path); int ret = zoo_create(zkhandle,child_path,element,strlen(element), ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE, path_buffer,bufferlen); if(ret != ZOK){ fprintf(stderr,"failed to create the path %s!\n",path); }else{ printf("create path %s successfully!\n",path); return ret; int pop(zhandle_t *zkhandle,const char *path,char *element,int *len) int i = 0; struct String_vector children; int ret = zoo_get_children(zkhandle,path,0, children);
fprintf(stderr,"failed to create the path %s!\n",path); }else if (children.count == 0){ strcpy(element,""); *len = 0; ret = -1; }else{ char *min = children.data[0]; for(i = 0; i children.count; ++i){ printf("%s:%s\n",min,children.data[i]); if(strcmp(min,children.data[i]) 0){ min = children.data[i]; if(min != NULL){ char child_path[512]={0}; sprintf(child_path,"%s/%s",path,min); ret = zoo_get(zkhandle,child_path,0,element,len,NULL); if(ret != ZOK){ fprintf(stderr,"failed to get data of the path %s!\n",child_path); }else{ ret = zoo_delete(zkhandle,child_path, -1); if(ret != ZOK){ fprintf(stderr,"failed to delete the path %s!\n",child_path); for(i = 0; i children.count; ++i){ free(children.data[i]); children.data[i] = NULL;
fprintf(stderr,"failed to create the path %s!\n",path); }else if(children.count == 0){ strcpy(element,""); *len = 0; ret = -1; }else{ char *min = NULL; for(i = 0; i children.count; ++i){ if(strcmp(min,children.data[i]) 0){ min = children.data[i]; if(min != NULL){ char child_path[512]={0}; sprintf(child_path,"%s/%s",path,min); ret = zoo_get(zkhandle,child_path,0,element,len,NULL); if(ret != ZOK){ fprintf(stderr,"failed to get data of the path %s!\n",child_path); for(i = 0; i children.count; ++i){ free(children.data[i]); children.data[i] = NULL; return ret;
zhandle_t* zkhandle = zookeeper_init(g_host,NULL, timeout, 0, (char *)"lock Test", 0); if (zkhandle ==NULL) fprintf(stderr, "Error when connecting to zookeeper servers...\n"); exit(EXIT_FAILURE); int ret = zoo_exists(zkhandle,g_path,0,NULL); if(ret != ZOK){ ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"), ZOO_OPEN_ACL_UNSAFE,0, path_buffer,bufferlen); if(ret != ZOK){ fprintf(stderr,"failed to create the path %s!\n",g_path); }else{ printf("create path %s successfully!\n",g_path); if(g_mode == PUSH_MODE){ push(zkhandle,g_path,g_value); printf("push:%s\n",g_value); }else{ int len = sizeof(g_value); ret = pop(zkhandle,g_path,g_value, len) ; if(ret == ZOK){ printf("pop:%s\n",g_value); }else if( ret == -1){ printf("queue is empty\n");
【ZooKeeper】① 分布式的基本概念 一个 SpringBoot 项目(apple.jar)被部署到了服务器上运行。可向其发送网络请求获取网络资源。随着请求数量的逐渐增多,服务器宕机(死机)的可能性也越来越高。 若一个服务器宕机会导致该服务器上的某个系统直接无法被访问,则不是高可用的项目,便产生了单点故障。单点故障:服务器与项目共生。服务器生,项目活;服务器挂,项目死。
Zookeeper基本原理与运用场景 Zookeeper基本原理与运用场景一、什么是Zookeeper? zookeeper是一个分布式的一致性协调服务。 换句话说,也可以把zookeeper看成一个小型的分布式文件系统。但是和FastDFS不同,zookeeper只适合用来存储一些小型的数据或者配置信息。
【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列 | 9月18号栖夜读 今天的首篇文章,讲述了:上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。
【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列 前言上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。
Zookeeper应用之——队列(Queue) 为了在Zookeeper中实现分布式队列,首先需要设计一个znode来存放数据,这个节点叫做队列节点,我们的例子中这个节点是/zookeeper/queue。
1. 基本介绍 在分布式的环境中,可能会有多个对等的程序读取同样的配置文件,程序可以部署在多台机器上,如果配置采用文件的话,则需要为部署该程序的机器也部署一个配置文件,一旦要修改配置的时候就会非常麻烦,需要修改多个配置文件,而且容易产生不一致。