队列(think-queue)是tp5.1.x的一个扩展,需要先安装才能使用 以下是基础用法,本文是按照数据库驱动模式进行
一、安装
本文采用composer 方法安装
也可直接复制源码自行引入方式使用
composer require topthink/think-queue
安装前提:
1.composer 已经安装
2.没有安装composer 可以用phpstorm 下载compoer.phar 包方式进行局部项目使用(三种方式任选一种使用)
think-queue 支持多种驱动方式使用(Database、Redis、Sync、Topthink ),本文使用Redis 方式来实现队列功能
二、配置
安装think-queue 之后会在应用根config目录下创建一个queue.php配置文件
上图为已经配置号redis 相关配置,connectior 为使用的驱动类型
三、任务逻辑代码
3.1 创建任务服务类
1.创建控制器服务类[非必须,应用在多场景调用时避免代码冗余,不创建服务类将下面addJob 方法写入对应功能方法即可] app\index\service\PushQueue.php(文件类名可自行拟定)
2.创建一个静态方法addJob
3.示例代码如下
'任务执行类名','class_method'=>'任务执行方法','method_param'=>[]] * @param int $execute_time 执行时间[时间戳] * @param string $jobQueueName 任务名称 [同一服务器多个网站实现队列需要重命名,不然会出现同名任务执行了非当前网站站的任务] * @return array * @author: xiaochun * @Date:2022-10-28 16:43 */ public static function addJob(array $data, int $execute_time = 0, string $jobQueueName = "TaskQueue"): array { try { if (!isset($data['class_name'])) { throw new Exception('请传入任务实际执行类'); } if (!isset($data['class_method'])) { throw new Exception('请传入任务实际执行方法'); } if (!isset($data['method_param'])) { throw new Exception('请传入方法执行参数,没有参数传入空数组'); } // 1.当前任务将由哪个类来负责处理。 // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法 $jobHandlerClassName = 'app\index\job\JobTask'; //命令:php think queue:work --daemon --delay 2 --tries 3 --queue TaskQueue //或者命令:php think queue:listen --delay 2 --tries 3 --queue TaskQueue //重启命令:php think queue:restart //--delay 延时执行参数 单位:秒 //--tries 重试次数 单位:次 // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串 // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对) if (is_object($data)) { throw new Exception('任务执行数据不能是对象类型'); } $jobData = $data; // 4.将该任务推送到消息队列,等待对应的消费者去执行 if ($execute_time == 0) { //如果执行时间为0的时候,表示立即执行 $isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName); } else { //如果执行时间不为空,则表示需要延时执行 $time2wait = $execute_time - time(); // 定时执行 if ($time2wait 1, 'msg' => '任务推送成功']; } catch (Exception $e) { $result = ['code' => 0, 'msg' => $e->getMessage() ?? '执行失败']; } return $result; } }3.2 创建任务消费类
1.创建类:app\index\job\JobTask.php(文件类名可自行拟定)
2.创建相应方法
3.示例代码如下
startTime = $this->getTimeStr(); $this->jobData = $data; // 此处做一些 check,提前判断是否需要执行 $isJobStillNeedToBeDone = $this->verifyJobNeedToBeDone($data); if (!$isJobStillNeedToBeDone) { $job->delete(); return; } // 执行逻辑处理(即:你需要该消息队列做什么) $isJobDone = $this->doHelloJob($data); if ($isJobDone) { // 如果任务执行成功,记得删除任务 $job->delete(); } else { // 通过这个方法可以检查这个任务已经重试了几次了 if ($job->attempts() > 3) { $job->delete(); // 也可以重新发布这个任务 //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行 } } } /** * 有些消息在到达消费者时,可能已经不再需要执行了 * @param $data 发布任务时自定义的数据 * @return bool 任务执行的结果 */ private function verifyJobNeedToBeDone($data): bool { return true; } /** * 根据消息中的数据进行实际的业务处理... * @param $data * @return bool */ private function doHelloJob($data): bool { sleep(5); $this->writeLog(); return true; } /** * Function:自定义任务日志方法 * @author: xiaochun * @Date:2022-10-31 10:19 */ private function writeLog(): void { //任务执行日志 $line = str_repeat('=', 80); $this->endTime = $this->getTimeStr(); $str = [ '任务开始执行:' . $this->startTime, var_export($this->jobData, true), '任务结束执行:' . $this->endTime, $line, ]; $path = Env::get('runtime_path') . 'queue' . DIRECTORY_SEPARATOR . date('Ym') . DIRECTORY_SEPARATOR; !is_dir($path) && mkdir($path, 0755, true); $fileName = $path . date('d'); if (PHP_SAPI == 'cli') { $fileName .= '_cli.log'; } else { $fileName .= '.log'; } $this->checkLogSize($fileName); error_log((string)$str, 3, $fileName); } /** * 检查日志文件大小并自动生成备份文件 * @access protected * @param string $destination 日志文件 * @return void */ protected function checkLogSize(string $destination) { $config['file_size'] = 2097152; if (is_file($destination) && floor($config['file_size']) <= filesize($destination)) { try { rename($destination, dirname($destination) . DIRECTORY_SEPARATOR . time() . '-' . basename($destination)); } catch (\Exception $e) { } } } //获取当前时间 private function getTimeStr() { return date('Y-m-d H:i:s'); } }3.3 业务调用任务队列执行方法
1.在对应业务方法中调用PushQueue:addJob() 方法
class JobTest { public function demo() { $jobData = [ 'class_name' => 'app\index\controller\Task',//具体业务执行类 'class_method' => 'run',//业务执行方法 'method_param' => [ 'goods_id' => 1, ],//执行需要的参数 ]; $execTime = time() + 5;//任务执行时间,为0表示立即执行,当前设置为延时5秒执行 $result = PushQueue::addJob($jobData, $execTime); if ($result['code']) { $str = " "; echo $str; } else { echo $result['msg']; } } }2.任务加入
执行http://yvhsse.com//index/job_test/demo 方法加入队列
3.队列命令任务执行
测试队列直接执行下面任务
生产环境下可以将该任务加入守护进程里
php think queue:work --daemon --delay 2 --tries 3 --queue TaskQueue 参数说明: --delay 2 表示任务延2秒钟执行 --tries 3 任务执行失败重试次数
End.