实现posix消息队列示例分享
mqueue.h
//
// mqueue.h
// UNIX_C
//
// Createdby周凯on14-2-9.
// Copyright(c)2014年zk.Allrightsreserved.
//
#ifndef__PS_MQUEUE_H
#define__PS_MQUEUE_H
#include<unistd.h>
#include<sys/types.h>
typedefstructmq_info *mqd_t;
typedefstructmq_attr mq_attr;
#ifdef__cplusplus
extern"C"{
#endif
mqd_t mq_open(constchar*name,intflag,.../*mode_tmode,structmq_attr*attr*/);
int mq_close(mqd_tmqdes);
int mq_unlink(constchar*name);
int mq_getattr(mqd_tmqdes,mq_attr*attr);
int mq_setattr(mqd_tmqdes,constmq_attr*attr,mq_attr*old);
int mq_send(mqd_tmqdes,constchar*ptr,size_tlen,unsignedintprio);
int mq_receive(mqd_tmqdes,char*ptr,size_tlen,unsignedint*priop);
//
void mq_info_test(mqd_tmqdes);
#ifdef__cplusplus
}
#endif
#endif
多进程,多线程创建同一个队列测试
#include<wrap_ext.h>
#include<mqueue.h>
void*create_mq(void*name){
mqd_tmq;
mq=mq_open("/tmp/mqfile",O_CREAT,FILE_MODE,0);
if(mq==(mqd_t)-1){
err_ret(errno,"mq_open()error");
return0;
}
mq_info_test(mq);
mq_close(mq);
return0;
}
intmain(){
mq_unlink("/tmp/mqfile");
if(Fork()==0){
create_mq("/tmp/mqfile");
exit(0);
}
Create_detach_thread(create_mq,"/tmp/mqfile");
Create_detach_thread(create_mq,"/tmp/mqfile");
sleep(50);
//mq_unlink("/tmp/mqfile");
return0;
}
测试结果
属性设置、获取测试 voidprint_attr(mq_attr*attr){ void*create_mq(void*name){ if((long)tid%2==0){ intmain(){ 测试注册通知规则 intmain(){ mqueue.c #include"mqueue.h" #if!defined(_LINUX_) typedefstructmq_info mq_info; structmq_hdr{ structmq_msg{ structmq_info{ #defineMQ_MAXMSG 12 staticvoid__mq_once_init(); staticvoid__mq_once_init(){ staticint __mq_get_filelock(){ staticvoid*__mq_mmap_file(intfd,mq_attr*attr){ staticvoid__mq_unmap(constchar*name,void*ptr){ staticint __mq_init_mmap(void*ptr,mq_attr*attr){ flag=pthread_mutexattr_init(&mattr); flag=pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED); flag=pthread_mutexattr_destroy(&mattr); mqd_t mq_open(constchar*name,intflag,...){ int mq_close(mqd_tmqdes){ int mq_unlink(charconst*name){ int mq_getattr(mqd_tmqdes,mq_attr*attr){ int mq_setattr(mqd_tmqdes,constmq_attr*attr,mq_attr*old){ int mq_notify(mqd_tmqdes,conststructsigevent*notification){ void mq_info_test(mqd_tmqdes){
create,startcreate...
create,startinit...
exists,waitget...
exists,waitget...
create,endinit...
mq_hdr.mqh_free:116bytes
msghdrsize:268bytesmapfilesize:3332bytes
nextmsgoffsetandmsglength:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];exists,startget...
[2796,0];
[3064,0];[0,0];
end,startget...
exists,startget...
mq_hdr.mqh_free:116bytes
msghdrsize:268bytesmapfilesize:3332bytes
nextmsgoffsetandmsglength:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];
[3064,0];[0,0];
end,startget...
mq_hdr.mqh_free:116bytes
msghdrsize:268bytesmapfilesize:3332bytes
nextmsgoffsetandmsglength:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];
[3064,0];[0,0];
Programendedwithexitcode:0
#include<wrap_ext.h>
#include<mqueue.h>
assert(attr);
err_msg("mq_attrmq_flag:0x%0x"
"mq_curmsgs:%d"
"mq_msgsize:%d"
"mq_maxmsg:%d"
,attr->mq_flags
,attr->mq_curmsgs
,attr->mq_msgsize
,attr->mq_maxmsg);
}
pthread_ttid;
mq_attrattr,old;
mqd_tmq;
intflag;
flag=O_CREAT;
tid=pthread_self();
if((long)tid%2!=0){
flag=O_NONBLOCK;
}
mq=mq_open("/tmp/mqfile",flag|O_CREAT,FILE_MODE,0);
if(mq==(mqd_t)-1){
err_ret(errno,"mq_open()error");
return0;
}
attr.mq_flags=O_NONBLOCK;
mq_setattr(mq,&attr,&old);
}
else
mq_getattr(mq,&old);
print_attr(&old);
//mq_info_test(mq);
mq_close(mq);
return0;
}
pid_tpid;
mq_unlink("/tmp/mqfile");
if((pid=Fork())==0){
create_mq("/tmp/mqfile3");
Create_detach_thread(create_mq,"/tmp/mqfile1");
Create_detach_thread(create_mq,"/tmp/mqfile2");
sleep(1);
exit(0);
}
Create_detach_thread(create_mq,"/tmp/mqfile1");
Create_detach_thread(create_mq,"/tmp/mqfile2");
create_mq("/tmp/mqfile3");
wait(0);
sleep(5);
//mq_unlink("/tmp/mqfile");
return0;
}
#include<wrap_ext.h>
#include<mqueue.h>
pid_tpid;
Init_wait();
mqd_tmq;
sigevent_tsige;
mq_unlink("/tmp/mqfile");
mq=mq_open("/tmp/mqfile",O_CREAT,FILE_MODE,0);
Signal(SIGCHLD,SIG_DFL);
if(mq==(mqd_t)-1){
err_sys(errno,"mq_open()error");
}
if((pid=Fork())==0){
if(mq_notify(mq,&sige)==-1)
err_ret(errno,"mq_notify()error");
Tell_parent();
Wait_parent();
End_wait();
sleep(1);
exit(0);
}
Wait_child();
/*子进程已注册,测试是否能注册、取消通知*/
if(mq_notify(mq,0)==-1)
err_ret(errno,"mq_notify()error");
if(mq_notify(mq,&sige)==-1)
err_ret(errno,"mq_notify()error");
Tell_child(pid);
End_wait();
wait(0);
sleep(1);
/*子进程已结束,测试是否能注册通知*/
if(mq_notify(mq,&sige)==-1)
err_ret(errno,"mq_notify()error");
//mq_unlink("/tmp/mqfile");
return0;
}
//
// File.c
// UNIX_C
//
// Createdby周凯on14-2-9.
// Copyright(c)2014年zk.Allrightsreserved.
//
#include<wrap_ext.h>
#defineva_mode_t int
#else
#defineva_mode_t mode_t
#endif
typedefstructmq_hdr mq_hdr;
//typedefstructmq_attr mq_attr;
typedefstructmq_msg mq_msg;
mq_attrmqh_attr;
long mqh_head;
long mqh_free;
pthread_cond_t mqh_conn;
pthread_mutex_tmqh_mutex;
sigevent_t mqh_sigevent;
pid_t mqh_pid;
};
long msg_next;/*从映射内存的地址起,到下一个消息的偏移值*/
ssize_tmsg_len;
int msg_prio;
};
mq_hdr*mqi_hdr;
longlong mqi_magic;
int mqi_flag;
};
#defineMQ_MSGSIZE 256
#defineMQ_MAGIC 0x9235167840
/*
防止以下情况:
一个进程或线程以创建模式打开一个队列,
随后CPU切换当前进程或线程到另一个正
在打开此前创建的队列,但是该队列并未
初始化完毕,故使用一个记录锁加一个线
程锁,进行同步。
注:
该实现不是异步调用安全,即不能在信号处理函数中调用队列打开(创建)函数
*/
#defineMQ_LOCK_FILE "/tmp/mq_lock_file"
staticstructmq_attrdef_attr={0,MQ_MAXMSG,MQ_MSGSIZE,0};
staticpthread_once_t__mq_once=PTHREAD_ONCE_INIT;
staticpthread_mutex_t__mq_lock;
staticpthread_key_t__mq_key;
staticint __mq_get_filelock();
staticvoid*__mq_mmap_file(intfd,mq_attr*attr);
staticint __mq_init_mmap(void*ptr,mq_attr*attr);
staticvoid__mq_unmap(constchar*name,void*ptr);
pthread_mutexattr_tmattr;
Pthread_mutexattr_init(&mattr);
Pthread_mutexattr_settype(&mattr,PTHREAD_MUTEX_RECURSIVE);
Pthread_mutex_init(&__mq_lock,&mattr);
Pthread_mutexattr_destroy(&mattr);
Pthread_key_create(&__mq_key,0);
}
intfd,tmp;
Pthread_mutex_lock(&__mq_lock);
if((fd=(int)Pthread_getspecific(__mq_key))==0){
fd=open(MQ_LOCK_FILE,O_CREAT|O_EXCL|O_WRONLY,FILE_MODE);
if(fd==-1&&errno!=EEXIST)
err_sys(errno,"mq_open(),__mq_get_filelock()error");
else
fd=Open(MQ_LOCK_FILE,O_WRONLY,0);
if(fd==0){
tmp=Open(MQ_LOCK_FILE,O_WRONLY,0);
close(fd);
fd=tmp;
}
Pthread_setspecific(__mq_key,(void*)fd);
}
Pthread_mutex_unlock(&__mq_lock);
returnfd;
}
size_tfilesize;
void*ptr;
if(attr==0){
attr=&def_attr;
}
if(attr->mq_maxmsg<=0||attr->mq_msgsize<=0){
errno=EINVAL;
returnMAP_FAILED;
}
filesize=sizeof(mq_hdr)+(sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize,sizeof(long)))*attr->mq_maxmsg;
if(lseek(fd,filesize-1,SEEK_SET)<0)
returnMAP_FAILED;
if(write(fd,"",1)!=1)
returnMAP_FAILED;
ptr=mmap(NULL,filesize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
returnptr;
}
size_tfilesize;
stat_tfstat;
assert(name);
if(stat(name,&fstat)==-1){
return;
}
filesize=(size_t)fstat.st_size;
unlink(name);
if(ptr==MAP_FAILED){
return;
}
munmap(ptr,filesize);
return;
}
char*tmp;
size_tindex,i;
intflag;
mq_hdr*mqhdr;
mq_msg*mqmsg;
pthread_condattr_tcattr;
pthread_mutexattr_tmattr;
assert(ptr);
if(attr==0){
attr=&def_attr;
}
if(attr->mq_maxmsg<=0||attr->mq_msgsize<=0){
errno=EINVAL;
return-1;
}
tmp=ptr;
mqhdr=(mq_hdr*)tmp;
mqhdr->mqh_attr.mq_flags=0;
mqhdr->mqh_attr.mq_curmsgs=0;
mqhdr->mqh_attr.mq_maxmsg=attr->mq_maxmsg;
mqhdr->mqh_attr.mq_msgsize=attr->mq_msgsize;
flag=pthread_condattr_init(&cattr);
if(flag){
errno=flag;
return-1;
}
flag=pthread_condattr_setpshared(&cattr,PTHREAD_PROCESS_SHARED);
if(flag){
errno=flag;
return-1;
}
flag=pthread_cond_init(&mqhdr->mqh_conn,&cattr);
if(flag){
errno=flag;
return-1;
}
flag=pthread_condattr_destroy(&cattr);
if(flag){
errno=flag;
return-1;
}
if(flag){
errno=flag;
return-1;
}
if(flag){
errno=flag;
return-1;
}
flag=pthread_mutex_init(&mqhdr->mqh_mutex,&mattr);
if(flag){
errno=flag;
return-1;
}
if(flag){
errno=flag;
return-1;
}
index=mqhdr->mqh_free=sizeof(mq_hdr);
mqmsg=(mq_msg*)(tmp+index);
for(i=0;i<attr->mq_maxmsg-1;i++){
mqmsg->msg_next=sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize,sizeof(long))+index;
index=mqmsg->msg_next;
mqmsg++;
//mqmsg=(mq_msg*)(tmp+index);
}
mqmsg->msg_next=0;
return0;
}
intfd,nonblock,lockfile_fd,err;
void*ptr;
mq_attr*mqattr;
mqd_tmqdesc;
stat_tfilestat;
debug_assert("Invalidpointer","mq_open()",name);
Pthread_once(&__mq_once,__mq_once_init);
nonblock=flag&O_NONBLOCK;
mqattr=NULL;
mqdesc=NULL;
ptr=MAP_FAILED;
__again:
if(flag&O_CREAT){
va_listvp;
mode_tmode;
/*分析可变参数*/
va_start(vp,flag);
mode=va_arg(vp,va_mode_t);
mqattr=va_arg(vp,mq_attr*);
va_end(vp);
Pthread_mutex_lock(&__mq_lock);
lockfile_fd=__mq_get_filelock();
write_lock_wait(lockfile_fd,SEEK_SET,0,0);
fd=open(name,flag|O_CREAT|O_EXCL|O_RDWR,mode);
if(fd<0){
/*如果指定了O_EXCL,并且文件已存在,则等待其他进程或线程完成初始化*/
if(errno==EEXIST&&(flag&O_EXCL)==1){
return(mqd_t)-1;
}
goto__exists_wait_init;
}
/*初始化内存映射文件*/
err_msg("create,startinit...");
/*初始化映射文件大小(注意必须使文件长度达到映射的大小),且映射文件到内存*/
ptr=__mq_mmap_file(fd,mqattr);
//sleep(1);
if(ptr==MAP_FAILED){
goto__err;
}
/*初始化映射内存的内容*/
if(__mq_init_mmap(ptr,mqattr)<0){
goto__err;
}
mqdesc=(mqd_t)calloc(1,sizeof(mq_hdr));
if(mqdesc==0){
goto__err;
}
mqdesc->mqi_hdr=(mq_hdr*)ptr;
mqdesc->mqi_flag=nonblock;
mqdesc->mqi_magic=MQ_MAGIC;
err_msg("create,endinit...");
file_unlock(lockfile_fd,SEEK_SET,0,0);
Pthread_mutex_unlock(&__mq_lock);
returnmqdesc;
}
__exists_wait_init:
fd=open(name,O_RDWR,0);
if(fd<0){
if(errno==ENOENT&&(flag&O_CREAT)){
goto__again;
}
goto__err;
}
err_msg("exists,startget...");
if(stat(name,&filestat)==-1){
if(errno==ENOENT&&(flag&O_CREAT)){
goto__again;
}
goto__err;
}
ptr=mmap(0,(size_t)filestat.st_size,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
if(ptr==MAP_FAILED){
goto__err;
}
mqdesc=(mqd_t)calloc(1,sizeof(mq_hdr));
if(mqdesc==0){
goto__err;
}
mqdesc->mqi_hdr=(mq_hdr*)ptr;
mqdesc->mqi_flag=nonblock;
mqdesc->mqi_magic=MQ_MAGIC;
close(fd);
file_unlock(lockfile_fd,SEEK_SET,0,0);
Pthread_mutex_unlock(&__mq_lock);
err_msg("end,startget...");
returnmqdesc;
__err:
file_unlock(lockfile_fd,SEEK_SET,0,0);
Pthread_mutex_unlock(&__mq_lock);
err=errno;
__mq_unmap(name,ptr);
close(fd);
if(mqdesc)
free(mqdesc);
errno=err;
return(mqd_t)-1;
}
size_tfilesize;
mq_attr*mattr;
intflag;
assert(mqdes);
if(mqdes->mqi_magic!=MQ_MAGIC){
errno=EBADF;
return-1;
}
mattr=&mqdes->mqi_hdr->mqh_attr;
filesize=mattr->mq_maxmsg*(sizeof(mq_msg)*ALIGN_VAL(mattr->mq_msgsize,sizeof(long)))+sizeof(mq_hdr);
flag=munmap((void*)mqdes->mqi_hdr,filesize);
mqdes->mqi_magic=0;
free(mqdes);
returnflag;
}
assert(name);
returnunlink(name);
}
intflag;
mq_attr*tmp;
assert(mqdes);
assert(attr);
if(mqdes->mqi_magic!=MQ_MAGIC){
errno=EBADF;
return-1;
}
tmp=&mqdes->mqi_hdr->mqh_attr;
/*防止其他进程或线程在改变属性值*/
flag=pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
if(flag>0){
errno=flag;
return-1;
}
bcopy(&mqdes->mqi_hdr->mqh_attr,attr,sizeof(mq_attr));
attr->mq_flags=mqdes->mqi_flag;
flag=pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
if(flag>0){
errno=flag;
return-1;
}
return0;
}
intflag;
mq_attr*tmp;
assert(mqdes);
assert(attr);
if(mqdes->mqi_magic!=MQ_MAGIC){
errno=EBADF;
return-1;
}
tmp=&mqdes->mqi_hdr->mqh_attr;
/*防止其他进程或线程在读取属性值*/
flag=pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
if(flag>0){
errno=flag;
return-1;
}
if(old!=NULL){
bcopy(&mqdes->mqi_hdr->mqh_attr,old,sizeof(mq_attr));
old->mq_flags=mqdes->mqi_flag;
}
/*创建后,只有文件标识可以改变*/
//bcopy(attr,&mqdes->mqi_hdr->mqh_attr,sizeof(mq_attr));
/*只有O_NONBLOCK标志可以存储*/
if(attr->mq_flags&O_NONBLOCK){
mqdes->mqi_flag|=O_NONBLOCK;
}
else{
mqdes->mqi_flag&=~O_NONBLOCK;
}
flag=pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
if(flag>0){
errno=flag;
return-1;
}
return0;
}
sigevent_t*old;
pid_tpid;
intflag;
assert(mqdes);
if(mqdes->mqi_magic!=MQ_MAGIC){
errno=EBADF;
return-1;
}
flag=pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
if(flag>0){
errno=flag;
return-1;
}
pid=mqdes->mqi_hdr->mqh_pid;
/*已设置*/
if(pid!=0){
/*发送一个0信号给注册的进程,如果能发送,或者不能发送但不是返回没有进程的错误(可能权限不够),则不能再次注册通知*/
/*有效进程*/
if(kill(pid,0)!=-1||errno!=ESRCH){
if(notification==0){
if(pid!=getpid()){
errno=EPERM;
flag=-1;
}
else{
mqdes->mqi_hdr->mqh_pid=0;
flag=0;
}
}
else{
errno=EBUSY;
flag=-1;
}
goto__return;
}
/*无效进程*/
}
/*未设置*/
if(notification!=0){
mqdes->mqi_hdr->mqh_pid=getpid();
old=&mqdes->mqi_hdr->mqh_sigevent;
bcopy(notification,old,sizeof(sigevent_t));
}
flag=0;
__return:
pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
returnflag;
}
size_ti,msgsize,index;
mq_msg*msg;
mq_attr*mattr;
assert(mqdes);
mattr=&mqdes->mqi_hdr->mqh_attr;
msgsize=sizeof(mq_msg)+ALIGN_VAL(mattr->mq_msgsize,sizeof(long));
index=mqdes->mqi_hdr->mqh_free;
err_msg("mq_hdr.mqh_free:%ldbytes\n"
"msghdrsize:%ubytes"
"mapfilesize:%ubytes"
,index
,msgsize
,mattr->mq_maxmsg*msgsize+index);
err_msg("nextmsgoffsetandmsglength:");
msg=(mq_msg*)&((char*)mqdes->mqi_hdr)[index];
for(i=0;i<mattr->mq_maxmsg;i++){
fprintf(stderr,"[%ld,%ld];",msg->msg_next,msg->msg_len);
if((i+1)%5==0){
fprintf(stderr,"\n");
}
msg++;
}
if((i+1)%5!=0){
fprintf(stderr,"\n");
}
return;
}相关文章