PHP 队列 think-queue 了解一下

队列(think-queue)是tp5.1.x的一个扩展,需要先安装才能使用 以下是基础用法,本文是按照数据库驱动模式进行


一、安装

    本文采用composer 方法安装

    也可直接复制源码自行引入方式使用

composer require topthink/think-queue

安装前提:

    1.composer 已经安装

    2.没有安装composer 可以用phpstorm 下载compoer.phar 包方式进行局部项目使用(三种方式任选一种使用)

    635f2cebb7d50.png

think-queue 支持多种驱动方式使用(Database、Redis、Sync、Topthink ),本文使用Redis 方式来实现队列功能

二、配置

    安装think-queue 之后会在应用根config目录下创建一个queue.php配置文件

    635f2e1fdeeb7.png

    上图为已经配置号redis 相关配置,connectior 为使用的驱动类型

三、任务逻辑代码

    3.1 创建任务服务类

    1.创建控制器服务类[非必须,应用在多场景调用时避免代码冗余,不创建服务类将下面addJob 方法写入对应功能方法即可] app\index\service\PushQueue.php(文件类名可自行拟定)

    2.创建一个静态方法addJob

    3.示例代码如下

'任务执行类名','class_method'=>'任务执行方法','method_param'=>[]]
     * @param int $execute_time 执行时间[时间戳]
     * @param string $jobQueueName 任务名称 [同一服务器多个网站实现队列需要重命名,不然会出现同名任务执行了非当前网站站的任务]
     * @return array
     * @author: xiaochun      * @Date:2022-10-28 16:43
     */
    public static function addJob(array $data, int $execute_time = 0, string $jobQueueName = "TaskQueue"): array
    {
        try {
            if (!isset($data['class_name'])) {
                throw new Exception('请传入任务实际执行类');
            }
            if (!isset($data['class_method'])) {
                throw new Exception('请传入任务实际执行方法');
            }
            if (!isset($data['method_param'])) {
                throw new Exception('请传入方法执行参数,没有参数传入空数组');
            }
            // 1.当前任务将由哪个类来负责处理。
            // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
            $jobHandlerClassName = 'app\index\job\JobTask';
            //命令:php think queue:work --daemon --delay 2 --tries  3 --queue TaskQueue
            //或者命令:php think queue:listen --delay 2 --tries 3 --queue TaskQueue
            //重启命令:php think queue:restart
            //--delay 延时执行参数 单位:秒
            //--tries 重试次数 单位:次
            // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
            // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
            if (is_object($data)) {
                throw new Exception('任务执行数据不能是对象类型');
            }
            $jobData = $data;
            // 4.将该任务推送到消息队列,等待对应的消费者去执行
            if ($execute_time == 0) {
                //如果执行时间为0的时候,表示立即执行
                $isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
            } else {
                //如果执行时间不为空,则表示需要延时执行
                $time2wait = $execute_time - time();  // 定时执行
                if ($time2wait  1, 'msg' => '任务推送成功'];
        } catch (Exception $e) {
            $result = ['code' => 0, 'msg' => $e->getMessage() ?? '执行失败'];
        }
        return $result;
    }

}

3.2 创建任务消费类

    1.创建类:app\index\job\JobTask.php(文件类名可自行拟定)

    2.创建相应方法

    3.示例代码如下

startTime = $this->getTimeStr();
        $this->jobData = $data;
        // 此处做一些 check,提前判断是否需要执行
        $isJobStillNeedToBeDone = $this->verifyJobNeedToBeDone($data);
        if (!$isJobStillNeedToBeDone) {
            $job->delete();
            return;
        }
        // 执行逻辑处理(即:你需要该消息队列做什么)
        $isJobDone = $this->doHelloJob($data);
        if ($isJobDone) {
            // 如果任务执行成功,记得删除任务
            $job->delete();
        } else {
            // 通过这个方法可以检查这个任务已经重试了几次了
            if ($job->attempts() > 3) {
                $job->delete();
                // 也可以重新发布这个任务
                //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param $data 发布任务时自定义的数据
     * @return bool 任务执行的结果
     */
    private function verifyJobNeedToBeDone($data): bool
    {
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理...
     * @param $data
     * @return bool
     */
    private function doHelloJob($data): bool
    {
        sleep(5);
        $this->writeLog();
        return true;
    }

    /**
     * Function:自定义任务日志方法
     * @author: xiaochun      * @Date:2022-10-31 10:19
     */
    private function writeLog(): void
    {
        //任务执行日志
        $line = str_repeat('=', 80);
        $this->endTime = $this->getTimeStr();
        $str = [
            '任务开始执行:' . $this->startTime,
            var_export($this->jobData, true),
            '任务结束执行:' . $this->endTime,
            $line,
        ];
        $path = Env::get('runtime_path') . 'queue' . DIRECTORY_SEPARATOR . date('Ym') . DIRECTORY_SEPARATOR;
        !is_dir($path) && mkdir($path, 0755, true);

        $fileName = $path . date('d');
        if (PHP_SAPI == 'cli') {
            $fileName .= '_cli.log';
        } else {
            $fileName .= '.log';
        }
        $this->checkLogSize($fileName);
        error_log((string)$str, 3, $fileName);
    }

    /**
     * 检查日志文件大小并自动生成备份文件
     * @access protected
     * @param string $destination 日志文件
     * @return void
     */
    protected function checkLogSize(string $destination)
    {
        $config['file_size'] = 2097152;
        if (is_file($destination) && floor($config['file_size']) <= filesize($destination)) {
            try {
                rename($destination, dirname($destination) . DIRECTORY_SEPARATOR . time() . '-' . basename($destination));
            } catch (\Exception $e) {
            }
        }
    }

    //获取当前时间
    private function getTimeStr()
    {
        return date('Y-m-d H:i:s');
    }

}

3.3 业务调用任务队列执行方法

    1.在对应业务方法中调用PushQueue:addJob() 方法

class JobTest
{

    public function demo()
    {
        $jobData = [
            'class_name'   => 'app\index\controller\Task',//具体业务执行类
            'class_method' => 'run',//业务执行方法
            'method_param' => [
                'goods_id'    => 1,
            ],//执行需要的参数
        ];
        $execTime = time() + 5;//任务执行时间,为0表示立即执行,当前设置为延时5秒执行
        $result = PushQueue::addJob($jobData, $execTime);
        if ($result['code']) {
            $str = "
";
            echo $str;
        } else {
            echo $result['msg'];
        }
    }
}

    2.任务加入

        执行http://yvhsse.com//index/job_test/demo 方法加入队列

    3.队列命令任务执行

        测试队列直接执行下面任务

        生产环境下可以将该任务加入守护进程里

        

php think queue:work --daemon --delay 2 --tries  3 --queue TaskQueue

参数说明:
--delay 2 表示任务延2秒钟执行
--tries 3 任务执行失败重试次数

    635f31e7821bf.jpg

635f3244794d1.png

End.


评论

评论列表