PHP使用topthink/think-queue消息队列实例
2023-04-18 14:48:28 时间
常住队列消费命令
sudo nohup php7.2 think queue:work --daemon --queue createAdminLogQueue --tries 2 > out.file 2>&1 &
sudo php7.2 think queue:listen --queue createAdminLogQueue
单次队列消费命令
sudo php7.2 think queue:work --daemon --queue createAdminLogQueue
队列添加php代码快
// 当前队列归属的队列名称
$jobHandlerClassName = 'apphookadminLogjobAdminLogCreateQueueJob';
//队列名称
$jobQueueName = "createAdminLogQueue";
// 插入队列
$isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
if( $isPushed == false ){
Log::error("createAdminLogQueue创建队列失败".$data, []);
}
复制代码
使用tp5勾子实现think-queue消息队列实例,实现后台操作日志到添加到数据库
前提:thinkphp5框架基础上,已包含topthink/think-queue消息队列依赖包,可以用composer下载,这里不懂可以百度,就不说你。
1、创建admin_op_log数据表(字段不要更改)
CREATE TABLE `admin_op_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`op_time` varchar(20) NOT NULL,
`admin_id` int(11) NOT NULL,
`employee_no` varchar(30) NOT NULL,
`op_type` tinyint(4) NOT NULL COMMENT '操作类型:1-新增 2-修改 3-删除',
`op_object` varchar(50) NOT NULL COMMENT '操作对象',
`op_object_id` int(11) NOT NULL DEFAULT '0' COMMENT '操作对象id',
`op_info` varchar(2000) NOT NULL,
`op_status` tinyint(4) unsigned zerofill NOT NULL DEFAULT '0000' COMMENT '操作状态 0-成功 1-失败',
`op_ip` varchar(50) NOT NULL,
`sys` varchar(10) DEFAULT '343' COMMENT '平台判断',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3981 DEFAULT CHARSET=utf8mb4
复制代码
2、配置(配置文件位于 application/extra/queue.php)
return [
'connector' => 'database' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
];
复制代码
3、创建接口。
//添加操作日志
$context = AdminLog::getInstance()->newAdminLogContext(AdminLogContext::ADMIN_OP_TYPE_UPDATE, $this::ADMIN_OP_TABLE_NAME, AdminLogContext::ADMIN_OP_STATUS_SUCCESS, $authorId, $opInfo);
hinkfacadeHook::listen('admin_log', $context);
复制代码
4、增加任务,创建任务类,这里为了区分模块单独创建了一个模块和类。
application/hook/adminLog/controller/AdminLog.php
<?php
namespace apphookadminLogcontroller;
use apphookadminLogcontextAdminLogContext;
use appluckyadmincontrollerAdminBase;
use appluckycommonInstanceTrait;
use thinkApp;
use thinkQueue;
use thinkRequest;
class AdminLog extends AdminBase
{
use InstanceTrait;
public $sys = '';
public function __construct(App $app = null, Request $request = null)
{
parent::__construct($app, $request);
$this->sys = session('sys') ? session('sys') : 343;
}
/**
* 获取newAdminLogContext
*/
public function newAdminLogContext($operateType, $operateObject, $operateStatus, $operateObjectId, $operateInfo)
{
$operateInfo = json_encode($operateInfo);
return new AdminLogContext([
'operateUserId' => $this->loginInfo['admin_id'],
'operateUserEmployeeNo' => $this->loginInfo['employee_no'],
'operateType' => $operateType,
'operateObject' => $operateObject,
'operateObjectId' => $operateObjectId,
'operateInfo' => $operateInfo,
'operateStatus' => $operateStatus,
'operateIp' => $this->requestIp
]);
}
/**
* @description 日志添加
*/
public function run(AdminLogContext $context)
{
$data = [
"admin_id" => $context->operateUserId,
"employee_no" => $context->operateUserEmployeeNo,
"op_type" => $context->operateType,
"op_object" => $context->operateObject,
"op_object_id" => $context->operateObjectId,
"op_info" => $context->operateInfo,
"op_status" => $context->operateStatus,
"op_ip" => $context->operateIp,
"op_time" => time(),
"sys" => $this->sys
];
// 当前队列归属的队列名称
$jobHandlerClassName = 'apphookadminLogjobAdminLogCreateQueueJob';
//队列名称
$jobQueueName = "createAdminLogQueue";
// 插入队列
$isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
if( $isPushed == false ){
Log::error("createAdminLogQueue创建队列失败".$data, []);
}
}
}
复制代码
application/hook/adminLog/ job/AdminLogCreateQueueJob.php
<?php
namespace apphookadminLogjob;
use apphookadminLogserviceAdminOpLogService;
use thinkqueueJob;
use thinkfacadeLog;
class AdminLogCreateQueueJob
{
// php think queue:work --queue BlogViewSyncJob
//消费队列
public function perform($data)
{
$adminOpLogService = new AdminOpLogService();
$createFlge =$adminOpLogService->create($data);
if (!$createFlge){
Log::error("createAdminLogQueue消费队列失败", []);
}
return $createFlge;
}
/**
* fire是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job, $data)
{
//消费队列
$isJobDone = $this->perform($data);
if ($isJobDone) {
//如果任务执行成功, 记得删除任务
$job->delete();
} else {
//检查任务重试3次数后删除队列
if ($job->attempts() > 3) {
$job->delete();
}
}
}
}
复制代码
application/hook/adminLog/context/AdminLogContext.php
<?php
namespace apphookadminLogcontext;
class AdminLogContext extends appcommonContext
{
//操作类型:1-新增 2-修改 3-删除
const ADMIN_OP_TYPE_CREATE = 1;
const ADMIN_OP_TYPE_UPDATE = 2;
const ADMIN_OP_TYPE_DELETE = 3;
//操作状态 0-成功 1-失败'
const ADMIN_OP_STATUS_SUCCESS = 0;
const ADMIN_OP_STATUS_FAIL = 1;
//操作类型:1-新增 2-修改 3-删除
public $operateType;
//操作人ID
public $operateUserId;
//操作人EmployeeNo
public $operateUserEmployeeNo;
//操作对象 表名
public $operateObject;
//操作对象id 表名id
public $operateObjectId;
//操作信息
public $operateInfo;
//操作状态 0-成功 1-失败'
public $operateStatus;
//操作人IP
public $operateIp;
//操作时间
public $operateTime = '';
}
复制代码
application/hook/adminLog/model/AdminOpLogModel.php
<?php
namespace apphookadminLogmodel;
use thinkDb;
class AdminOpLogModel
{
/**
* 新增操作记录
* @param $data
*/
public function insert($data)
{
return DB::table("admin_op_log")->insert($data);
}
}
复制代码
application/hook/adminLog/service/AdminOpLogService.php
<?php
namespace apphookadminLogservice;
use apphookadminLogmodelAdminOpLogModel;
use thinkException;
use thinkfacadeLog;
class AdminOpLogService
{
public $model = '';
public function __construct()
{
$this->model = new AdminOpLogModel();
}
/**
* 添加
* @param array $data 数据数组
* @return bool 返回类型
*/
public function create($data)
{
try {
return $this->model->insert($data);
}catch (Exception $e) {
Log::error("添加操作日志失败(admin_op_log表插入失败)" . $e->getMessage(), []);
return false;
}
}
}
复制代码
5、在控制台监听任务并执行消费任务
cd到项目下执行命令 C:wampwww est>php think queue:listen
执行全部 sudo php think queue:listen --queue createAdminLogQueue
执行一次 sudo php think queue:work --queue createAdminLogQueue
复制代码
thinkqueue 后台运行常驻程序
一
进入项目路径,在目录下执行命令
在后台运行两条进程,常驻内存,不断的处理任务消息队列任务,如果要用指定版本php7.2表示使用7.2版本来执行,默认用php就可以来
sudo nohup php think queue:work --daemon --queue jobQueue --tries 2 > out.file 2>&1 &
sudo nohup php7.2 think queue:work --daemon --queue jobQueueSlow --tries 2 > out2.file 2>&1 &
复制代码
相关文章
- 无监督学习的12个最重要的算法介绍及其用例总结
- 云中的容器:你有哪些选择?
- 成功AI需要正确的数据架构
- K8S集群内Pod如何与本地网络打通实现debug
- 为什么如此安全的Https协议却仍然可以被抓包呢?
- 数智时代的绩效管理:现实和未来
- 实现网络基础设施现代化的5个步骤
- 爬虫遇到字体反爬,哭了
- 快速上手 Linkerd v2 Service Mesh(服务网格)
- 有望取代Deepfake?揭秘今年最火的NeRF技术有多牛
- VMware报告发现,网络犯罪正通过完整性和破坏性攻击操纵现实
- 用户投诉大幅下降,运营商为何笑不出来?
- 一人行快,众人行远!亚马逊云科技如何赋能合作伙伴共成长?
- 保障数据安全和推动DevOps应用 F5 助力Cegedim加速医疗数字化进程
- 掌握这5条Tips,部署Wi-Fi 6才够“666”
- 机器学习如何改变数据中心管理
- 用友 YonBIP 采购云,构建采购商业网络,助力强链补链
- 支持信创平台创新 博睿数据担任“IT医生”保驾护航
- IDC报告:中国电子云跻身政务云前列!
- 在供应链不稳定的时代,甲骨文云技术助力仓库管理转型