queue队列的使用
一、开发环境:
ThinkPHP 5.0框架。
参考文档:
简书:https://www.jianshu.com/p/f5e33215c13c。
packagist官网:https://packagist.org/packages/topthink/think-queue#v1.1.6
github:https://github.com/top-think/think-queue
weixin_34163553的《thinkPHP5的队列使用》。
二、安装queue扩展,有的下载后框架可能自带queue扩展。
composer require topthink/think-queue
这个命令时下载最新的queue扩展,但是最新的是针对ThinkPHP6.0的,而开发框架是5.0,所以要在php包扩展官网找对应的版本。
1、打开php扩展列表官网https://packagist.org/。
2、搜think-queue,打开第一个https://packagist.org/packages/topthink/think-queue。
3、查看每个版本对框架和环境的要求,1.1.6要求thinkphp是5.0.0,所以选择这个。
composer require topthink/think-queue 1.1.6
三、配置(服务器需要安装有redis):
配置文件位于 application/extra/queue.php
return [ // 'connector' => 'Sync' 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 1, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, // 是否是长连接 ];
四、代码:
创建一个控制器,执行队列里的任务,例如创建一个app\message\controller\DoJob.php。
namespace app\message\controller; use think\Exception; use think\Queue; use think\Queue\Job; use think\Db; class DoJob{ /** * fire方法是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param $data 发布任务时自定义的数据 * @return int */ public function fire(Job $job,$data){ //这里$data定义格式为:$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()] if(empty($data)){ return 0; } // 有些消息在到达消费者时,可能已经不再需要执行了 // $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); // if(!$isJobStillNeedToBeDone){ // $job->delete(); // return 0; // } if(is_array($data) && isset($data['type'])){ $type = $data['type']; if($type == 1){ //执行发送邮件业务 $isJobDone = $this->sendEmail($data['data_id']); }else if($type == 2){ //执行APP推送消息业务 $isJobDone = $this->sendAppMessage($data['data_id']); }else if($type == 3){ //执行订单业务 $isJobDone = $this->orderService($data['data_id']); }else{ return 0; } }else{ return 0; } if ($isJobDone) { // 如果任务执行成功,删除任务 $job->delete(); }else{ if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 $job->delete(); // 也可以重新发布这个任务 //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } } //发邮件业务 private function sendEmail($id){ } //发App消息业务 private function sendAppMessage($id){ } //处理订单业务 private function orderService($id){ } }
发布任务到队列中,创建一个测试代码,例如创建一个app\message\controller\addJob.php:
namespace app\message\controller; use think\Exception; use think\Controller; use think\Queue; use think\Db; class addJob extends Controller{ public function index(){ // 1.当前任务将由哪个类来负责处理。 // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法 $jobHandlerClassName = 'app\message\controller\DoJob'; // 2.当前任务归属的队列名称,如果为新队列,会自动创建 $jobQueueName = "JobQueue"; // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串 // ( jobData 为对象时,存储其public属性的键值对 ) $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ; // 4.将该任务推送到消息队列,等待对应的消费者去执行 $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // dump($isPushed);//字符串 if( $isPushed !== false ){ return '消息已发出'; }else{ return '消息发送出错'; } }
}
五、监听任务并执行
在服务器上,执行框架的根目录下的think命令:
nohup /usr/local/bin/php /home/www/tp5/think queue:listen --queue JobQueue &