zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

基于条件变量的消息队列说明介绍

消息队列变量队列 基于 介绍 说明 条件
2023-06-13 09:14:50 时间

条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。

    消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!

    消息队列在服务器开发过程中主要用于什么对象呢?

    1:我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!

    2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。

    3:日志;处理模式与方式2类似。不过日志大概是不需要返回的!

给出源代码:

BlockingQueue.h文件

复制代码代码如下:


/*
 *BlockingQueue.h
 *
 * Createdon:Apr19,2013
 *     Author:archy_yu
 */

#ifndefBLOCKINGQUEUE_H_
#defineBLOCKINGQUEUE_H_

#include<queue>
#include<pthread.h>

typedefvoid*CommonItem;

classBlockingQueue
{
public:
   BlockingQueue();

   virtual~BlockingQueue();

   intpeek(CommonItem&item);

   intappend(CommonItemitem);

private:

   pthread_mutex_t_mutex;

   pthread_cond_t_cond;

   std::queue<CommonItem>_read_queue;

   std::queue<CommonItem>_write_queue;

};

 
#endif/*BLOCKINGQUEUE_H_*/

BlockingQueue.cpp文件代码
复制代码代码如下:

/*
 *BlockingQueue.cpp
 *
 * Createdon:Apr19,2013
 *     Author:archy_yu
 */

#include"BlockingQueue.h"

BlockingQueue::BlockingQueue()
{
   pthread_mutex_init(&this->_mutex,NULL);
   pthread_cond_init(&this->_cond,NULL);
}

BlockingQueue::~BlockingQueue()
{
   pthread_mutex_destroy(&this->_mutex);
   pthread_cond_destroy(&this->_cond);
}

intBlockingQueue::peek(CommonItem&item)
{

   if(!this->_read_queue.empty())
   {
       item=this->_read_queue.front();
       this->_read_queue.pop();
   }
   else
   {
       pthread_mutex_lock(&this->_mutex);

       while(this->_write_queue.empty())
       {
           pthread_cond_wait(&this->_cond,&this->_mutex);
       }

       while(!this->_write_queue.empty())
       {
           this->_read_queue.push(this->_write_queue.front());
           this->_write_queue.pop();
       }

       pthread_mutex_unlock(&this->_mutex);
   }

 
   return0;
}

intBlockingQueue::append(CommonItemitem)
{
   pthread_mutex_lock(&this->_mutex);
   this->_write_queue.push(item);
   pthread_cond_signal(&this->_cond);
   pthread_mutex_unlock(&this->_mutex);
   return0;
}

测试代码:
复制代码代码如下:
BlockingQueue_queue;

void*process(void*arg)
{

   inti=0;
   while(true)
   {
       int*j=newint();
       *j=i;
       _queue.append((void*)j);
       i++;
   }
   returnNULL;
}

intmain(intargc,char**argv)
{
   pthread_tpid;
   pthread_create(&pid,0,process,0);

   longlongintstart=get_os_system_time();
   inti=0;
   while(true)
   {
       int*j=NULL;
       _queue.peek((void*&)j);

       i++;

       if(j!=NULL&&(*j)==100000)
       {
           longlongintend=get_os_system_time();
           printf("consume%d\n",end-start);
           break;
       }
   }

   return0;
}