zl程序教程

您现在的位置是:首页 >  其他

当前栏目

PHP使用topthink/think-queue消息队列实例

2023-04-18 14:48:28 时间

常住队列消费命令

sudo nohup php7.2 think queue:work --daemon --queue createAdminLogQueue --tries 2 > out.file 2>&1 &

sudo php7.2 think queue:listen --queue createAdminLogQueue

单次队列消费命令

sudo php7.2 think queue:work --daemon --queue createAdminLogQueue

队列添加php代码快

       // 当前队列归属的队列名称
        $jobHandlerClassName = 'apphookadminLogjobAdminLogCreateQueueJob';
        //队列名称
        $jobQueueName = "createAdminLogQueue";
        // 插入队列
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        if( $isPushed == false ){
            Log::error("createAdminLogQueue创建队列失败".$data, []);
        }
复制代码

使用tp5勾子实现think-queue消息队列实例,实现后台操作日志到添加到数据库

前提:thinkphp5框架基础上,已包含topthink/think-queue消息队列依赖包,可以用composer下载,这里不懂可以百度,就不说你。

1、创建admin_op_log数据表(字段不要更改)

CREATE TABLE `admin_op_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `op_time` varchar(20) NOT NULL,
  `admin_id` int(11) NOT NULL,
  `employee_no` varchar(30) NOT NULL,
  `op_type` tinyint(4) NOT NULL COMMENT '操作类型:1-新增 2-修改 3-删除',
  `op_object` varchar(50) NOT NULL COMMENT '操作对象',
  `op_object_id` int(11) NOT NULL DEFAULT '0' COMMENT '操作对象id',
  `op_info` varchar(2000) NOT NULL,
  `op_status` tinyint(4) unsigned zerofill NOT NULL DEFAULT '0000' COMMENT '操作状态 0-成功 1-失败',
  `op_ip` varchar(50) NOT NULL,
  `sys` varchar(10) DEFAULT '343' COMMENT '平台判断',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3981 DEFAULT CHARSET=utf8mb4
复制代码

2、配置(配置文件位于 application/extra/queue.php)

return [
    'connector' => 'database'  //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
];
复制代码

3、创建接口。

//添加操作日志
$context = AdminLog::getInstance()->newAdminLogContext(AdminLogContext::ADMIN_OP_TYPE_UPDATE, $this::ADMIN_OP_TABLE_NAME, AdminLogContext::ADMIN_OP_STATUS_SUCCESS, $authorId, $opInfo);
	hinkfacadeHook::listen('admin_log', $context);
复制代码

4、增加任务,创建任务类,这里为了区分模块单独创建了一个模块和类。

application/hook/adminLog/controller/AdminLog.php

<?php

namespace apphookadminLogcontroller;

use apphookadminLogcontextAdminLogContext;
use appluckyadmincontrollerAdminBase;
use appluckycommonInstanceTrait;
use thinkApp;
use thinkQueue;
use thinkRequest;

class AdminLog extends AdminBase
{
    use InstanceTrait;

    public $sys = '';

    public function __construct(App $app = null, Request $request = null)
    {
        parent::__construct($app, $request);
        $this->sys = session('sys') ? session('sys') : 343;
    }

    /**
     * 获取newAdminLogContext
     */
    public function newAdminLogContext($operateType, $operateObject, $operateStatus, $operateObjectId, $operateInfo)
    {
        $operateInfo = json_encode($operateInfo);
        return new AdminLogContext([
            'operateUserId' => $this->loginInfo['admin_id'],
            'operateUserEmployeeNo' => $this->loginInfo['employee_no'],
            'operateType' => $operateType,
            'operateObject' => $operateObject,
            'operateObjectId' => $operateObjectId,
            'operateInfo' => $operateInfo,
            'operateStatus' => $operateStatus,
            'operateIp' => $this->requestIp
        ]);
    }


    /**
     * @description 日志添加
     */
    public function run(AdminLogContext $context)
    {
        $data = [
            "admin_id" => $context->operateUserId,
            "employee_no" => $context->operateUserEmployeeNo,
            "op_type" => $context->operateType,
            "op_object" => $context->operateObject,
            "op_object_id" => $context->operateObjectId,
            "op_info" => $context->operateInfo,
            "op_status" => $context->operateStatus,
            "op_ip" => $context->operateIp,
            "op_time" => time(),
            "sys" => $this->sys
        ];

        // 当前队列归属的队列名称
        $jobHandlerClassName = 'apphookadminLogjobAdminLogCreateQueueJob';
        //队列名称
        $jobQueueName = "createAdminLogQueue";
        // 插入队列
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        if( $isPushed == false ){
            Log::error("createAdminLogQueue创建队列失败".$data, []);
        }
    }

}
复制代码

application/hook/adminLog/ job/AdminLogCreateQueueJob.php

<?php

namespace apphookadminLogjob;

use apphookadminLogserviceAdminOpLogService;
use thinkqueueJob;
use thinkfacadeLog;

class AdminLogCreateQueueJob
{
    // php think queue:work --queue BlogViewSyncJob
    //消费队列
    public function perform($data)
    {
        $adminOpLogService = new AdminOpLogService();
        $createFlge =$adminOpLogService->create($data);
        if (!$createFlge){
            Log::error("createAdminLogQueue消费队列失败", []);
        }
        return $createFlge;
    }

    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        //消费队列
        $isJobDone = $this->perform($data);

        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
        } else {
            //检查任务重试3次数后删除队列
            if ($job->attempts() > 3) {
                $job->delete();
            }
        }
    }
}
复制代码

application/hook/adminLog/context/AdminLogContext.php

<?php

namespace apphookadminLogcontext;

class AdminLogContext extends appcommonContext
{
    //操作类型:1-新增 2-修改 3-删除
    const ADMIN_OP_TYPE_CREATE = 1;
    const ADMIN_OP_TYPE_UPDATE = 2;
    const ADMIN_OP_TYPE_DELETE = 3;

    //操作状态 0-成功 1-失败'
    const ADMIN_OP_STATUS_SUCCESS = 0;
    const ADMIN_OP_STATUS_FAIL = 1;

    //操作类型:1-新增 2-修改 3-删除
    public $operateType;

    //操作人ID
    public $operateUserId;

    //操作人EmployeeNo
    public $operateUserEmployeeNo;

    //操作对象 表名
    public $operateObject;

    //操作对象id 表名id
    public $operateObjectId;

    //操作信息
    public $operateInfo;

    //操作状态 0-成功 1-失败'
    public $operateStatus;

    //操作人IP
    public $operateIp;

    //操作时间
    public $operateTime = '';

}
复制代码

application/hook/adminLog/model/AdminOpLogModel.php

<?php

namespace apphookadminLogmodel;

use thinkDb;

class AdminOpLogModel
{

    /**
     * 新增操作记录
     * @param $data
     */
    public function insert($data)
    {
        return DB::table("admin_op_log")->insert($data);
    }

}
复制代码

application/hook/adminLog/service/AdminOpLogService.php

<?php

namespace apphookadminLogservice;

use apphookadminLogmodelAdminOpLogModel;
use thinkException;
use thinkfacadeLog;

class AdminOpLogService
{
    public $model = '';

    public function __construct()
    {
        $this->model = new AdminOpLogModel();
    }

    /**
     * 添加
     * @param array $data 数据数组
     * @return bool 返回类型
     */
    public function create($data)
    {
        try {
            return $this->model->insert($data);
        }catch (Exception $e) {
            Log::error("添加操作日志失败(admin_op_log表插入失败)" . $e->getMessage(), []);
            return false;
        }
    }


}
复制代码

5、在控制台监听任务并执行消费任务

cd到项目下执行命令 C:wampwww	est>php think queue:listen
执行全部 sudo php think queue:listen --queue createAdminLogQueue
执行一次 sudo php think queue:work --queue createAdminLogQueue
复制代码

thinkqueue 后台运行常驻程序

进入项目路径,在目录下执行命令

在后台运行两条进程,常驻内存,不断的处理任务消息队列任务,如果要用指定版本php7.2表示使用7.2版本来执行,默认用php就可以来

sudo nohup php think queue:work --daemon --queue jobQueue --tries 2 >  out.file  2>&1  &
sudo nohup php7.2 think queue:work --daemon --queue jobQueueSlow --tries 2 >  out2.file  2>&1  &
复制代码