zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

实现posix消息队列示例分享

消息队列队列 实现 示例 分享 posix
2023-06-13 09:15:17 时间

mqueue.h

复制代码代码如下:


//
// mqueue.h
// UNIX_C
//
// Createdby周凯on14-2-9.
// Copyright(c)2014年zk.Allrightsreserved.
//

#ifndef__PS_MQUEUE_H
#define__PS_MQUEUE_H

#include<unistd.h>
#include<sys/types.h>

typedefstructmq_info    *mqd_t;
typedefstructmq_attr   mq_attr;

#ifdef__cplusplus
extern"C"{
#endif

   mqd_t  mq_open(constchar*name,intflag,.../*mode_tmode,structmq_attr*attr*/);
   int    mq_close(mqd_tmqdes);
   int    mq_unlink(constchar*name);

   int    mq_getattr(mqd_tmqdes,mq_attr*attr);
   int    mq_setattr(mqd_tmqdes,constmq_attr*attr,mq_attr*old);

   int    mq_send(mqd_tmqdes,constchar*ptr,size_tlen,unsignedintprio);
   int    mq_receive(mqd_tmqdes,char*ptr,size_tlen,unsignedint*priop);

   
   //
   void   mq_info_test(mqd_tmqdes);

#ifdef__cplusplus
}
#endif
#endif

多进程,多线程创建同一个队列测试

复制代码代码如下:


#include<wrap_ext.h>
#include<mqueue.h>

void*create_mq(void*name){
   mqd_tmq;
   mq=mq_open("/tmp/mqfile",O_CREAT,FILE_MODE,0);

   if(mq==(mqd_t)-1){
       err_ret(errno,"mq_open()error");
       return0;
   }

   mq_info_test(mq);

   mq_close(mq);

   return0;
}

intmain(){

   mq_unlink("/tmp/mqfile");

   if(Fork()==0){
       create_mq("/tmp/mqfile");
       exit(0);
   }

   Create_detach_thread(create_mq,"/tmp/mqfile");
   Create_detach_thread(create_mq,"/tmp/mqfile");

   sleep(50);

   //mq_unlink("/tmp/mqfile");

   return0;
}

测试结果

复制代码代码如下:
create,startcreate...
create,startinit...
exists,waitget...
exists,waitget...
create,endinit...
mq_hdr.mqh_free:116bytes
msghdrsize:268bytesmapfilesize:3332bytes
nextmsgoffsetandmsglength:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];exists,startget...
[2796,0];
[3064,0];[0,0];
end,startget...
exists,startget...
mq_hdr.mqh_free:116bytes
msghdrsize:268bytesmapfilesize:3332bytes
nextmsgoffsetandmsglength:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];
[3064,0];[0,0];
end,startget...
mq_hdr.mqh_free:116bytes
msghdrsize:268bytesmapfilesize:3332bytes
nextmsgoffsetandmsglength:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];
[3064,0];[0,0];
Programendedwithexitcode:0

属性设置、获取测试

复制代码代码如下:
#include<wrap_ext.h>
#include<mqueue.h>

voidprint_attr(mq_attr*attr){
   assert(attr);

   err_msg("mq_attrmq_flag:0x%0x"
           "mq_curmsgs:%d"
           "mq_msgsize:%d"
           "mq_maxmsg:%d"
           ,attr->mq_flags
           ,attr->mq_curmsgs
           ,attr->mq_msgsize
           ,attr->mq_maxmsg);
}

void*create_mq(void*name){
   pthread_ttid;
   mq_attrattr,old;
   mqd_tmq;
   intflag;

   flag=O_CREAT;

   tid=pthread_self();

   if((long)tid%2!=0){
       flag=O_NONBLOCK;
   }

   mq=mq_open("/tmp/mqfile",flag|O_CREAT,FILE_MODE,0);

   if(mq==(mqd_t)-1){
       err_ret(errno,"mq_open()error");
       return0;
   }

   if((long)tid%2==0){
       attr.mq_flags=O_NONBLOCK;
       mq_setattr(mq,&attr,&old);
   }
   else
       mq_getattr(mq,&old);

   print_attr(&old);

   //mq_info_test(mq);

   mq_close(mq);

   return0;
}

intmain(){
   pid_tpid;

   mq_unlink("/tmp/mqfile");

   if((pid=Fork())==0){
       create_mq("/tmp/mqfile3");
       Create_detach_thread(create_mq,"/tmp/mqfile1");
       Create_detach_thread(create_mq,"/tmp/mqfile2");
       sleep(1);
       exit(0);
   }

   Create_detach_thread(create_mq,"/tmp/mqfile1");
   Create_detach_thread(create_mq,"/tmp/mqfile2");
   create_mq("/tmp/mqfile3");

   wait(0);

   sleep(5);

   //mq_unlink("/tmp/mqfile");

   return0;
}

测试注册通知规则

复制代码代码如下:
#include<wrap_ext.h>
#include<mqueue.h>

intmain(){
   pid_tpid;
   Init_wait();
   mqd_tmq;

   sigevent_tsige;

   mq_unlink("/tmp/mqfile");
   mq=mq_open("/tmp/mqfile",O_CREAT,FILE_MODE,0);

   Signal(SIGCHLD,SIG_DFL);

   if(mq==(mqd_t)-1){
       err_sys(errno,"mq_open()error");
   }
   if((pid=Fork())==0){

       if(mq_notify(mq,&sige)==-1)
           err_ret(errno,"mq_notify()error");
       Tell_parent();

       Wait_parent();

       End_wait();
       sleep(1);
       exit(0);
   }

   Wait_child();
   /*子进程已注册,测试是否能注册、取消通知*/
   if(mq_notify(mq,0)==-1)
       err_ret(errno,"mq_notify()error");
   if(mq_notify(mq,&sige)==-1)
       err_ret(errno,"mq_notify()error");
   Tell_child(pid);
   End_wait();

   wait(0);

   sleep(1);
   /*子进程已结束,测试是否能注册通知*/
   if(mq_notify(mq,&sige)==-1)
       err_ret(errno,"mq_notify()error");

   //mq_unlink("/tmp/mqfile");

   return0;
}

mqueue.c

复制代码代码如下:
//
// File.c
// UNIX_C
//
// Createdby周凯on14-2-9.
// Copyright(c)2014年zk.Allrightsreserved.
//

#include"mqueue.h"
#include<wrap_ext.h>

#if!defined(_LINUX_)
#defineva_mode_t  int
#else
#defineva_mode_t  mode_t
#endif

typedefstructmq_info mq_info;
typedefstructmq_hdr  mq_hdr;
//typedefstructmq_attr mq_attr;
typedefstructmq_msg  mq_msg;

structmq_hdr{
   mq_attrmqh_attr;
   long   mqh_head;
   long   mqh_free;

   pthread_cond_t mqh_conn;
   pthread_mutex_tmqh_mutex;
   sigevent_t     mqh_sigevent;
   pid_t  mqh_pid;
};

structmq_msg{
   long   msg_next;/*从映射内存的地址起,到下一个消息的偏移值*/
   ssize_tmsg_len;
   int    msg_prio;
};

structmq_info{
   mq_hdr*mqi_hdr;
   longlong  mqi_magic;
   int    mqi_flag;
};

#defineMQ_MAXMSG  12
#defineMQ_MSGSIZE 256
#defineMQ_MAGIC   0x9235167840
/*
 防止以下情况:
   一个进程或线程以创建模式打开一个队列,
   随后CPU切换当前进程或线程到另一个正
   在打开此前创建的队列,但是该队列并未
   初始化完毕,故使用一个记录锁加一个线
   程锁,进行同步。
 注:
   该实现不是异步调用安全,即不能在信号处理函数中调用队列打开(创建)函数
 */
#defineMQ_LOCK_FILE   "/tmp/mq_lock_file"
staticstructmq_attrdef_attr={0,MQ_MAXMSG,MQ_MSGSIZE,0};
staticpthread_once_t__mq_once=PTHREAD_ONCE_INIT;
staticpthread_mutex_t__mq_lock;
staticpthread_key_t__mq_key;

staticvoid__mq_once_init();
staticint __mq_get_filelock();
staticvoid*__mq_mmap_file(intfd,mq_attr*attr);
staticint __mq_init_mmap(void*ptr,mq_attr*attr);
staticvoid__mq_unmap(constchar*name,void*ptr);

 

staticvoid__mq_once_init(){
   pthread_mutexattr_tmattr;

   Pthread_mutexattr_init(&mattr);
   Pthread_mutexattr_settype(&mattr,PTHREAD_MUTEX_RECURSIVE);
   Pthread_mutex_init(&__mq_lock,&mattr);
   Pthread_mutexattr_destroy(&mattr);

   Pthread_key_create(&__mq_key,0);
}

staticint __mq_get_filelock(){
   intfd,tmp;

   Pthread_mutex_lock(&__mq_lock);
   if((fd=(int)Pthread_getspecific(__mq_key))==0){
       fd=open(MQ_LOCK_FILE,O_CREAT|O_EXCL|O_WRONLY,FILE_MODE);
       if(fd==-1&&errno!=EEXIST)
           err_sys(errno,"mq_open(),__mq_get_filelock()error");
       else
           fd=Open(MQ_LOCK_FILE,O_WRONLY,0);
       if(fd==0){
           tmp=Open(MQ_LOCK_FILE,O_WRONLY,0);
           close(fd);
           fd=tmp;
       }
       Pthread_setspecific(__mq_key,(void*)fd);
   }
   Pthread_mutex_unlock(&__mq_lock);

   returnfd;
}

staticvoid*__mq_mmap_file(intfd,mq_attr*attr){
   size_tfilesize;
   void*ptr;

   if(attr==0){
       attr=&def_attr;
   }

   if(attr->mq_maxmsg<=0||attr->mq_msgsize<=0){
       errno=EINVAL;
       returnMAP_FAILED;
   }

   filesize=sizeof(mq_hdr)+(sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize,sizeof(long)))*attr->mq_maxmsg;

   
   if(lseek(fd,filesize-1,SEEK_SET)<0)
       returnMAP_FAILED;
   if(write(fd,"",1)!=1)
       returnMAP_FAILED;

   ptr=mmap(NULL,filesize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);

   returnptr;
}

staticvoid__mq_unmap(constchar*name,void*ptr){
   size_tfilesize;
   stat_tfstat;

   assert(name);

   
   if(stat(name,&fstat)==-1){
       return;
   }

   filesize=(size_t)fstat.st_size;
   unlink(name);
   if(ptr==MAP_FAILED){
       return;
   }
   munmap(ptr,filesize);

   return;
}

staticint __mq_init_mmap(void*ptr,mq_attr*attr){
   char*tmp;
   size_tindex,i;
   intflag;
   mq_hdr*mqhdr;
   mq_msg*mqmsg;
   pthread_condattr_tcattr;
   pthread_mutexattr_tmattr;

   assert(ptr);
   if(attr==0){
       attr=&def_attr;
   }

   if(attr->mq_maxmsg<=0||attr->mq_msgsize<=0){
       errno=EINVAL;
       return-1;
   }

   tmp=ptr;
   mqhdr=(mq_hdr*)tmp;
   mqhdr->mqh_attr.mq_flags=0;
   mqhdr->mqh_attr.mq_curmsgs=0;
   mqhdr->mqh_attr.mq_maxmsg=attr->mq_maxmsg;
   mqhdr->mqh_attr.mq_msgsize=attr->mq_msgsize;

   flag=pthread_condattr_init(&cattr);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_condattr_setpshared(&cattr,PTHREAD_PROCESS_SHARED);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_cond_init(&mqhdr->mqh_conn,&cattr);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_condattr_destroy(&cattr);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_mutexattr_init(&mattr);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_mutex_init(&mqhdr->mqh_mutex,&mattr);
   if(flag){
       errno=flag;
       return-1;
   }

   flag=pthread_mutexattr_destroy(&mattr);
   if(flag){
       errno=flag;
       return-1;
   }

   index=mqhdr->mqh_free=sizeof(mq_hdr);
   mqmsg=(mq_msg*)(tmp+index);

   for(i=0;i<attr->mq_maxmsg-1;i++){
       mqmsg->msg_next=sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize,sizeof(long))+index;
       index=mqmsg->msg_next;
       mqmsg++;
       //mqmsg=(mq_msg*)(tmp+index);
   }
   mqmsg->msg_next=0;

   return0;
}

 

mqd_t  mq_open(constchar*name,intflag,...){
   intfd,nonblock,lockfile_fd,err;
   void*ptr;
   mq_attr*mqattr;
   mqd_tmqdesc;
   stat_tfilestat;

   debug_assert("Invalidpointer","mq_open()",name);

   Pthread_once(&__mq_once,__mq_once_init);

   nonblock=flag&O_NONBLOCK;
   mqattr=NULL;
   mqdesc=NULL;
   ptr=MAP_FAILED;
__again:
   if(flag&O_CREAT){
       va_listvp;
       mode_tmode;

       /*分析可变参数*/
       va_start(vp,flag);
       mode=va_arg(vp,va_mode_t);
       mqattr=va_arg(vp,mq_attr*);
       va_end(vp);

       Pthread_mutex_lock(&__mq_lock);
       lockfile_fd=__mq_get_filelock();
       write_lock_wait(lockfile_fd,SEEK_SET,0,0);

       fd=open(name,flag|O_CREAT|O_EXCL|O_RDWR,mode);
       if(fd<0){
           /*如果指定了O_EXCL,并且文件已存在,则等待其他进程或线程完成初始化*/
           if(errno==EEXIST&&(flag&O_EXCL)==1){
               return(mqd_t)-1;
           }
           goto__exists_wait_init;
       }
       /*初始化内存映射文件*/

       err_msg("create,startinit...");
       /*初始化映射文件大小(注意必须使文件长度达到映射的大小),且映射文件到内存*/
       ptr=__mq_mmap_file(fd,mqattr);
       //sleep(1);
       if(ptr==MAP_FAILED){
           goto__err;
       }

       /*初始化映射内存的内容*/
       if(__mq_init_mmap(ptr,mqattr)<0){
           goto__err;
       }

       mqdesc=(mqd_t)calloc(1,sizeof(mq_hdr));
       if(mqdesc==0){
           goto__err;
       }

       mqdesc->mqi_hdr=(mq_hdr*)ptr;
       mqdesc->mqi_flag=nonblock;
       mqdesc->mqi_magic=MQ_MAGIC;

       err_msg("create,endinit...");

       file_unlock(lockfile_fd,SEEK_SET,0,0);
       Pthread_mutex_unlock(&__mq_lock);

       returnmqdesc;
   }
__exists_wait_init:
   fd=open(name,O_RDWR,0);
   if(fd<0){
       if(errno==ENOENT&&(flag&O_CREAT)){
           goto__again;
       }
       goto__err;
   }

   err_msg("exists,startget...");

   if(stat(name,&filestat)==-1){
       if(errno==ENOENT&&(flag&O_CREAT)){
           goto__again;
       }
       goto__err;
   }

   ptr=mmap(0,(size_t)filestat.st_size,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);

   if(ptr==MAP_FAILED){
       goto__err;
   }

   mqdesc=(mqd_t)calloc(1,sizeof(mq_hdr));
   if(mqdesc==0){
       goto__err;
   }

   mqdesc->mqi_hdr=(mq_hdr*)ptr;
   mqdesc->mqi_flag=nonblock;
   mqdesc->mqi_magic=MQ_MAGIC;

   close(fd);

   file_unlock(lockfile_fd,SEEK_SET,0,0);
   Pthread_mutex_unlock(&__mq_lock);

   err_msg("end,startget...");

   returnmqdesc;

__err:
   file_unlock(lockfile_fd,SEEK_SET,0,0);
   Pthread_mutex_unlock(&__mq_lock);

   err=errno;
   __mq_unmap(name,ptr);
   close(fd);
   if(mqdesc)
       free(mqdesc);
   errno=err;
   return(mqd_t)-1;
}

int    mq_close(mqd_tmqdes){
   size_tfilesize;
   mq_attr*mattr;
   intflag;

   assert(mqdes);

   if(mqdes->mqi_magic!=MQ_MAGIC){
       errno=EBADF;
       return-1;
   }

   mattr=&mqdes->mqi_hdr->mqh_attr;
   filesize=mattr->mq_maxmsg*(sizeof(mq_msg)*ALIGN_VAL(mattr->mq_msgsize,sizeof(long)))+sizeof(mq_hdr);
   flag=munmap((void*)mqdes->mqi_hdr,filesize);

   mqdes->mqi_magic=0;
   free(mqdes);

   returnflag;
}

int    mq_unlink(charconst*name){
   assert(name);
   returnunlink(name);
}

int    mq_getattr(mqd_tmqdes,mq_attr*attr){
   intflag;
   mq_attr*tmp;

   assert(mqdes);
   assert(attr);

   if(mqdes->mqi_magic!=MQ_MAGIC){
       errno=EBADF;
       return-1;
   }

   tmp=&mqdes->mqi_hdr->mqh_attr;

   /*防止其他进程或线程在改变属性值*/
   flag=pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
   if(flag>0){
       errno=flag;
       return-1;
   }

   bcopy(&mqdes->mqi_hdr->mqh_attr,attr,sizeof(mq_attr));
   attr->mq_flags=mqdes->mqi_flag;

   flag=pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
   if(flag>0){
       errno=flag;
       return-1;
   }

   return0;
}

int    mq_setattr(mqd_tmqdes,constmq_attr*attr,mq_attr*old){
   intflag;
   mq_attr*tmp;

   assert(mqdes);
   assert(attr);

   if(mqdes->mqi_magic!=MQ_MAGIC){
       errno=EBADF;
       return-1;
   }

   tmp=&mqdes->mqi_hdr->mqh_attr;

   /*防止其他进程或线程在读取属性值*/
   flag=pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
   if(flag>0){
       errno=flag;
       return-1;
   }
   if(old!=NULL){
       bcopy(&mqdes->mqi_hdr->mqh_attr,old,sizeof(mq_attr));
       old->mq_flags=mqdes->mqi_flag;
   }
   /*创建后,只有文件标识可以改变*/
   //bcopy(attr,&mqdes->mqi_hdr->mqh_attr,sizeof(mq_attr));

   /*只有O_NONBLOCK标志可以存储*/
   if(attr->mq_flags&O_NONBLOCK){
       mqdes->mqi_flag|=O_NONBLOCK;
   }
   else{
       mqdes->mqi_flag&=~O_NONBLOCK;
   }

   flag=pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
   if(flag>0){
       errno=flag;
       return-1;
   }

   return0;
}

int    mq_notify(mqd_tmqdes,conststructsigevent*notification){
   sigevent_t*old;
   pid_tpid;
   intflag;

   assert(mqdes);

   if(mqdes->mqi_magic!=MQ_MAGIC){
       errno=EBADF;
       return-1;
   }

   flag=pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
   if(flag>0){
       errno=flag;
       return-1;
   }

   pid=mqdes->mqi_hdr->mqh_pid;

   /*已设置*/
   if(pid!=0){
       /*发送一个0信号给注册的进程,如果能发送,或者不能发送但不是返回没有进程的错误(可能权限不够),则不能再次注册通知*/

       /*有效进程*/
       if(kill(pid,0)!=-1||errno!=ESRCH){

           if(notification==0){
               if(pid!=getpid()){
                   errno=EPERM;
                   flag=-1;
               }
               else{
                   mqdes->mqi_hdr->mqh_pid=0;
                   flag=0;
               }
           }
           else{
               errno=EBUSY;
               flag=-1;
           }
           goto__return;
       }
       /*无效进程*/
   }
   /*未设置*/
   if(notification!=0){
       mqdes->mqi_hdr->mqh_pid=getpid();
       old=&mqdes->mqi_hdr->mqh_sigevent;
       bcopy(notification,old,sizeof(sigevent_t));
   }

   flag=0;

__return:
   pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);

   returnflag;
}

void   mq_info_test(mqd_tmqdes){
   size_ti,msgsize,index;
   mq_msg*msg;
   mq_attr*mattr;
   assert(mqdes);

   mattr=&mqdes->mqi_hdr->mqh_attr;
   msgsize=sizeof(mq_msg)+ALIGN_VAL(mattr->mq_msgsize,sizeof(long));
   index=mqdes->mqi_hdr->mqh_free;
   err_msg("mq_hdr.mqh_free:%ldbytes\n"
           "msghdrsize:%ubytes"
           "mapfilesize:%ubytes"
           ,index
           ,msgsize
           ,mattr->mq_maxmsg*msgsize+index);
   err_msg("nextmsgoffsetandmsglength:");
   msg=(mq_msg*)&((char*)mqdes->mqi_hdr)[index];
   for(i=0;i<mattr->mq_maxmsg;i++){
       fprintf(stderr,"[%ld,%ld];",msg->msg_next,msg->msg_len);
       if((i+1)%5==0){
           fprintf(stderr,"\n");
       }
       msg++;
   }
   if((i+1)%5!=0){
       fprintf(stderr,"\n");
   }

   return;
}