zl程序教程

您现在的位置是:首页 >  其他

当前栏目

PHP+memcache实现消息队列案例分享

案例PHP消息队列队列 实现 分享 memcache
2023-06-13 09:15:26 时间

memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志。然后通过定时程序将内容落地到文件或者数据库。

php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列。
方便实现队列的轻量级队列服务器是:
starling支持memcache协议的轻量级持久化服务器
https://github.com/starling/starling
Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列
http://kr.github.com/beanstalkd/
php中也可以使用memcache/memcached来实现消息队列。

复制代码代码如下:


   <?php 
   /**
   *Memcache消息队列类
   */ 
   classQMC{ 
   constPREFIX="ASDFASDFFWQKE"; 
   /**
   *初始化mc
   *@staticvarstring$mc
   *@returnMemcache
   */ 
   staticprivatefunctionmc_init(){ 
   static$mc=null; 
   if(is_null($mc)){ 
   $mc=newMemcache; 
   $mc->connect("127.0.0.1",11211); 
   } 
   return$mc; 
   } 
   /**
   *mc计数器,增加计数并返回新的计数
   *@paramstring$key  计数器
   *@paramint$offset  计数增量,可为负数.0为不改变计数
   *@paramint$time    时间
   *@returnint/false   失败是返回false,成功时返回更新计数器后的计数
   */ 
   staticpublicfunctionset_counter($key,$offset,$time=0){ 
   $mc=self::mc_init(); 
   $val=$mc->get($key); 
   if(!is_numeric($val)||$val<0){ 
   $ret=$mc->set($key,0,$time); 
   if(!$ret)returnfalse; 
   $val=0; 
   } 
   $offset=intval($offset); 
   if($offset>0){ 
   return$mc->increment($key,$offset); 
   }elseif($offset<0){ 
   return$mc->decrement($key,-$offset); 
   } 
   return$val; 
   } 
   /**
   *写入队列
   *@paramstring$key
   *@parammixed$value
   *@returnbool
   */ 
   staticpublicfunctioninput($key,$value){ 
   $mc=self::mc_init(); 
   $w_key=self::PREFIX.$key."W"; 
   $v_key=self::PREFIX.$key.self::set_counter($w_key,1); 
   return$mc->set($v_key,$value); 
   } 
   /**
   *读取队列里的数据
   *@paramstring$key
   *@paramint$max 最多读取条数
   *@returnarray
   */ 
   staticpublicfunctionoutput($key,$max=100){ 
   $out=array(); 
   $mc=self::mc_init(); 
   $r_key=self::PREFIX.$key."R"; 
   $w_key=self::PREFIX.$key."W"; 
   $r_p  =self::set_counter($r_key,0);//读指针 
   $w_p  =self::set_counter($w_key,0);//写指针 
   if($r_p==0)$r_p=1; 
   while($w_p>=$r_p){ 
   if(--$max<0)break; 
   $v_key=self::PREFIX.$key.$r_p; 
   $r_p=self::set_counter($r_key,1); 
   $out[]=$mc->get($v_key); 
   $mc->delete($v_key); 
   } 
   return$out; 
   } 
   } 
   /**
   使用方法:
   QMC::input($key,$value);//写入队列
   $list=QMC::output($key);//读取队列
   */ 
   ?> 

基于PHP共享内存实现的消息队列:

复制代码代码如下:


<?php 
/**
*使用共享内存的PHP循环内存队列实现
*支持多进程,支持各种数据类型的存储
*注:完成入队或出队操作,尽快使用unset(),以释放临界区
*
*@authorwangbinandi@gmail.com
*@created2009-12-23
*/ 
classShmQueue 

private$maxQSize=0;//队列最大长度 
private$front=0;//队头指针 
private$rear=0; //队尾指针 
private$blockSize=256; //块的大小(byte) 
private$memSize=25600; //最大共享内存(byte) 
private$shmId=0; 
private$filePtr="./shmq.ptr"; 
private$semId=0; 
publicfunction__construct() 

$shmkey=ftok(__FILE__,"t"); 
$this->shmId=shmop_open($shmkey,"c",0644,$this->memSize); 
$this->maxQSize=$this->memSize/$this->blockSize; 
//申?一个信号量 
$this->semId=sem_get($shmkey,1); 
sem_acquire($this->semId);//申请进入临界区 
$this->init(); 

privatefunctioninit() 

if(file_exists($this->filePtr)){ 
$contents=file_get_contents($this->filePtr); 
$data=explode("|",$contents); 
if(isset($data[0])&&isset($data[1])){ 
$this->front=(int)$data[0]; 
$this->rear =(int)$data[1]; 



publicfunctiongetLength() 

return(($this->rear-$this->front+$this->memSize)%($this->memSize))/$this->blockSize; 

publicfunctionenQueue($value) 

if($this->ptrInc($this->rear)==$this->front){//队满 
returnfalse; 

$data=$this->encode($value); 
shmop_write($this->shmId,$data,$this->rear); 
$this->rear=$this->ptrInc($this->rear); 
returntrue; 

publicfunctiondeQueue() 

if($this->front==$this->rear){//队空 
returnfalse; 

$value=shmop_read($this->shmId,$this->front,$this->blockSize-1); 
$this->front=$this->ptrInc($this->front); 
return$this->decode($value); 

privatefunctionptrInc($ptr) 

return($ptr+$this->blockSize)%($this->memSize); 

privatefunctionencode($value) 

$data=serialize($value)."__eof"; 
echo""; 
echostrlen($data); 
echo""; 
echo$this->blockSize-1; 
echo""; 
if(strlen($data)>$this->blockSize-1){ 
thrownewException(strlen($data)."isoverloadblocksize!"); 

return$data; 

privatefunctiondecode($value) 

$data=explode("__eof",$value); 
returnunserialize($data[0]); 

publicfunction__destruct() 

$data=$this->front."|".$this->rear; 
file_put_contents($this->filePtr,$data); 
sem_release($this->semId);//出临界区,释放信号量 


/*
//进队操作
$shmq=newShmQueue();
$data="testdata";
$shmq->enQueue($data);
unset($shmq);
//出队操作
$shmq=newShmQueue();
$data=$shmq->deQueue();
unset($shmq);
*/ 
?>

对于一个很大的消息队列,频繁进行进行大数据库的序列化和反序列化,有太耗费。下面是我用PHP实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。
如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码:
代码如下:

复制代码代码如下:
   classMemcache_Queue  
   {  
   private$memcache;  
   private$name;  
   private$prefix;  
   function__construct($maxSize,$name,$memcache,$prefix="__memcache_queue__")  
   {  
   if($memcache==null){  
   thrownewException("memcacheobjectisnull,newtheobjectfirst.");  
   }  
   $this->memcache=$memcache;  
   $this->name=$name;  
   $this->prefix=$prefix;  
   $this->maxSize=$maxSize;  
   $this->front=0;  
   $this->real=0;  
   $this->size=0;  
   }  
   function__get($name)  
   {  
   return$this->get($name);  
   }  
   function__set($name,$value)  
   {  
   $this->add($name,$value);  
   return$this;  
   }  
   functionisEmpty()  
   {  
   return$this->size==0;  
   }  
   functionisFull()  
   {  
   return$this->size==$this->maxSize;  
   }  
   functionenQueue($data)  
   {  
   if($this->isFull()){  
   thrownewException("QueueisFull");  
   }  
   $this->increment("size");  
   $this->set($this->real,$data);  
   $this->set("real",($this->real+1)%$this->maxSize);  
   return$this;  
   }  
   functiondeQueue()  
   {  
   if($this->isEmpty()){  
   thrownewException("QueueisEmpty");  
   }  
   $this->decrement("size");  
   $this->delete($this->front);  
   $this->set("front",($this->front+1)%$this->maxSize);  
   return$this;  
   }  
   functiongetTop()  
   {  
   return$this->get($this->front);  
   }  
   functiongetAll()  
   {  
   return$this->getPage();  
   }  
   functiongetPage($offset=0,$limit=0)  
   {  
   if($this->isEmpty()||$this->size<$offset){  
   returnnull;  
   }  
   $keys[]=$this->getKeyByPos(($this->front+$offset)%$this->maxSize);  
   $num=1;  
   for($pos=($this->front+$offset+1)%$this->maxSize;$pos!=$this->real;$pos=($pos+1)%$this->maxSize)  
   {  
   $keys[]=$this->getKeyByPos($pos);  
   $num++;  
   if($limit>0&&$limit==$num){  
   break;  
   }  
   }  
   returnarray_values($this->memcache->get($keys));  
   }  
   functionmakeEmpty()  
   {  
   $keys=$this->getAllKeys();  
   foreach($keysas$value){  
   $this->delete($value);  
   }  
   $this->delete("real");  
   $this->delete("front");  
   $this->delete("size");  
   $this->delete("maxSize");  
   }  
   privatefunctiongetAllKeys()  
   {  
   if($this->isEmpty())  
   {  
   returnarray();  
   }  
   $keys[]=$this->getKeyByPos($this->front);  
   for($pos=($this->front+1)%$this->maxSize;$pos!=$this->real;$pos=($pos+1)%$this->maxSize)  
   {  
   $keys[]=$this->getKeyByPos($pos);  
   }  
   return$keys;  
   }  
   privatefunctionadd($pos,$data)  
   {  
   $this->memcache->add($this->getKeyByPos($pos),$data);  
   return$this;  
   }  
   privatefunctionincrement($pos)  
   {  
   return$this->memcache->increment($this->getKeyByPos($pos));  
   }  
   privatefunctiondecrement($pos)  
   {  
   $this->memcache->decrement($this->getKeyByPos($pos));  
   }  
   privatefunctionset($pos,$data)  
   {  
   $this->memcache->set($this->getKeyByPos($pos),$data);  
   return$this;  
   }  
   privatefunctionget($pos)  
   {  
   return$this->memcache->get($this->getKeyByPos($pos));  
   }  
   privatefunctiondelete($pos)  
   {  
   return$this->memcache->delete($this->getKeyByPos($pos));  
   }  
   privatefunctiongetKeyByPos($pos)  
   {  
   return$this->prefix.$this->name.$pos;  
   }  
   }