[资源]PHP使用消息队列
2023-09-14 09:03:20 时间
基于HTTP协议的轻量级开源简单队列服务:HTTPSQS[原创]
Redis队列——PHP操作简单示例
入队操作 <?php $redis = new Redis(); $redis->connect('127.0.0.1',6379); while(True){ try{ $value = 'value_'.date('Y-m-d H:i:s'); $redis->LPUSH('key1',$value); sleep(rand()%3); echo $value."\n"; }catch(Exception $e){ echo $e->getMessage()."\n"; } } ?> 出队操作 <?php $redis = new Redis(); $redis->pconnect('127.0.0.1',6379); while(True){ try{ echo $redis->LPOP('key1')."\n"; }catch(Exception $e){ echo $e->getMessage()."\n"; } sleep(rand()%3); } ?>
<?php /** * 使用共享内存的PHP循环内存队列实现 * 支持多进程, 支持各种数据类型的存储 * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区 * * @author wangbinandi@gmail.com * @created 2009-12-23 */ class SHMQueue { private $maxQSize = 0; // 队列最大长度 private $front = 0; // 队头指针 private $rear = 0; // 队尾指针 private $blockSize = 256; // 块的大小(byte) private $memSize = 25600; // 最大共享内存(byte) private $shmId = 0; private $filePtr = './shmq.ptr'; private $semId = 0; public function __construct() { $shmkey = ftok(__FILE__, 't'); $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize ); $this->maxQSize = $this->memSize / $this->blockSize; // 申請一个信号量 $this->semId = sem_get($shmkey, 1); sem_acquire($this->semId); // 申请进入临界区 $this->init(); } private function init() { if ( file_exists($this->filePtr) ){ $contents = file_get_contents($this->filePtr); $data = explode( '|', $contents ); if ( isset($data[0]) && isset($data[1])){ $this->front = (int)$data[0]; $this->rear = (int)$data[1]; } } } public function getLength() { return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize; } public function enQueue( $value ) { if ( $this->ptrInc($this->rear) == $this->front ){ // 队满 return false; } $data = $this->encode($value); shmop_write($this->shmId, $data, $this->rear ); $this->rear = $this->ptrInc($this->rear); return true; } public function deQueue() { if ( $this->front == $this->rear ){ // 队空 return false; } $value = shmop_read($this->shmId, $this->front, $this->blockSize-1); $this->front = $this->ptrInc($this->front); return $this->decode($value); } private function ptrInc( $ptr ) { return ($ptr + $this->blockSize) % ($this->memSize); } private function encode( $value ) { $data = serialize($value) . "__eof"; if ( strlen($data) > $this->blockSize -1 ){ throw new Exception(strlen($data)." is overload block size!"); } return $data; } private function decode( $value ) { $data = explode("__eof", $value); return unserialize($data[0]); } public function __destruct() { $data = $this->front . '|' . $this->rear; file_put_contents($this->filePtr, $data); sem_release($this->semId); // 出临界区, 释放信号量 } } 使用的样例代码如下: // 进队操作 $shmq = new SHMQueue(); $data = 'test data'; $shmq->enQueue($data); unset($shmq); // 出队操作 $shmq = new SHMQueue(); $data = $shmq->deQueue(); unset($shmq);
相关文章
- php 从一个数组中随机获取固定数据
- Github上的PHP资源汇总大全
- 一些PHP性能优化汇总
- PHP根据不同时间段输出不同的问候语
- php去除html代码中img图片标签宽高函数
- php: +1天, +3个月, strtotime(): +1 day, +3 month
- php从memcache读取数据再批量写入mysql的方法
- PHP(面向对象)连接数据库,实现基本的增删改查
- PHP执行批量mysql语句
- PHP不影响正常运行的调试技巧
- php 3DES|DES 加密解密(通用)
- php jquery ajax select 二级联动【get方式】
- Windows下编译使用Aliyun OSS PHP SDK
- Atitit 稳定性提升的艺术 之技术解决之道 目录 1. 2. 为什么会发生稳定性问题11 2. 大原则1 2.1. 尽快释放资源类似php最好的稳定性1 2.2. Nginx 负载均衡
- atitit.提取zip rar文件列表 java php c# 的原理与设计
- PHP 5 Directory 函数
- 使用框架的php假设使用定时服务Cronjob
- 关于 php json float 出现很多位的问题
- 给php-7-1-5添加扩展fileinfo