thinkphp5.0 queue队列的使用


queue队列的使用


 一、开发环境:

ThinkPHP 5.0框架。

参考文档:

简书:https://www.jianshu.com/p/f5e33215c13c

packagist官网:https://packagist.org/packages/topthink/think-queue#v1.1.6

github:https://github.com/top-think/think-queue

的《thinkPHP5的队列使用》。


 二、安装queue扩展,有的下载后框架可能自带queue扩展。

composer require topthink/think-queue

这个命令时下载最新的queue扩展,但是最新的是针对ThinkPHP6.0的,而开发框架是5.0,所以要在php包扩展官网找对应的版本。

1、打开php扩展列表官网https://packagist.org/

2、搜think-queue,打开第一个https://packagist.org/packages/topthink/think-queue

3、查看每个版本对框架和环境的要求,1.1.6要求thinkphp是5.0.0,所以选择这个。

composer require topthink/think-queue 1.1.6

三、配置(服务器需要安装有redis):

配置文件位于 application/extra/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驱动
    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',    // redis 主机ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密码
    'select'     => 1,        // 使用哪一个 db,默认为 db0
    'timeout'    => 0,        // redis连接的超时时间
    'persistent' => false,        // 是否是长连接
];

 四、代码:

创建一个控制器,执行队列里的任务,例如创建一个app\message\controller\DoJob.php。

namespace app\message\controller;
use think\Exception;
use think\Queue;
use think\Queue\Job;
use think\Db;

class DoJob{


    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param $data 发布任务时自定义的数据
     * @return int
     */
    public function fire(Job $job,$data){
        //这里$data定义格式为:$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()]
        if(empty($data)){
            return 0;
        }
        // 有些消息在到达消费者时,可能已经不再需要执行了
//        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
//        if(!$isJobStillNeedToBeDone){
//            $job->delete();
//            return 0;
//        }
        
        if(is_array($data) && isset($data['type'])){
            $type = $data['type'];
            if($type == 1){
                //执行发送邮件业务
                $isJobDone = $this->sendEmail($data['data_id']);
            }else if($type == 2){
                //执行APP推送消息业务
                $isJobDone = $this->sendAppMessage($data['data_id']);
            }else if($type == 3){
                //执行订单业务
                $isJobDone = $this->orderService($data['data_id']);
            }else{
                return 0;
            }
        }else{
            return 0;
        }
        if ($isJobDone) {
            // 如果任务执行成功,删除任务
            $job->delete();
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                $job->delete();
                // 也可以重新发布这个任务
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }
    
    //发邮件业务
    private function sendEmail($id){
        
    }

    //发App消息业务
    private function sendAppMessage($id){

    }

    //处理订单业务
    private function orderService($id){

    }
}

发布任务到队列中,创建一个测试代码,例如创建一个app\message\controller\addJob.php:

namespace app\message\controller;
use think\Exception;
use think\Controller;
use think\Queue;
use think\Db;

class addJob extends Controller{

  public function index(){
        // 1.当前任务将由哪个类来负责处理。
        //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        $jobHandlerClassName  = 'app\message\controller\DoJob';
        // 2.当前任务归属的队列名称,如果为新队列,会自动创建
        $jobQueueName  = "JobQueue";
        // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
        //   ( jobData 为对象时,存储其public属性的键值对 )
        $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ;
        // 4.将该任务推送到消息队列,等待对应的消费者去执行
        $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
//        dump($isPushed);//字符串
        if( $isPushed !== false ){
            return '消息已发出';
        }else{
            return '消息发送出错';
        }
    }
}

五、监听任务并执行

在服务器上,执行框架的根目录下的think命令:

nohup /usr/local/bin/php /home/www/tp5/think queue:listen --queue JobQueue &

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM