thinkphp5.0 queue隊列的使用


queue隊列的使用


 一、開發環境:

ThinkPHP 5.0框架。

參考文檔:

簡書:https://www.jianshu.com/p/f5e33215c13c

packagist官網:https://packagist.org/packages/topthink/think-queue#v1.1.6

github:https://github.com/top-think/think-queue

的《thinkPHP5的隊列使用》。


 二、安裝queue擴展,有的下載后框架可能自帶queue擴展。

composer require topthink/think-queue

這個命令時下載最新的queue擴展,但是最新的是針對ThinkPHP6.0的,而開發框架是5.0,所以要在php包擴展官網找對應的版本。

1、打開php擴展列表官網https://packagist.org/

2、搜think-queue,打開第一個https://packagist.org/packages/topthink/think-queue

3、查看每個版本對框架和環境的要求,1.1.6要求thinkphp是5.0.0,所以選擇這個。

composer require topthink/think-queue 1.1.6

三、配置(服務器需要安裝有redis):

配置文件位於 application/extra/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驅動
    'expire'     => 60,        // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
    'default'    => 'default',        // 默認的隊列名稱
    'host'       => '127.0.0.1',    // redis 主機ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密碼
    'select'     => 1,        // 使用哪一個 db,默認為 db0
    'timeout'    => 0,        // redis連接的超時時間
    'persistent' => false,        // 是否是長連接
];

 四、代碼:

創建一個控制器,執行隊列里的任務,例如創建一個app\message\controller\DoJob.php。

namespace app\message\controller;
use think\Exception;
use think\Queue;
use think\Queue\Job;
use think\Db;

class DoJob{


    /**
     * fire方法是消息隊列默認調用的方法
     * @param Job $job 當前的任務對象
     * @param $data 發布任務時自定義的數據
     * @return int
     */
    public function fire(Job $job,$data){
        //這里$data定義格式為:$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()]
        if(empty($data)){
            return 0;
        }
        // 有些消息在到達消費者時,可能已經不再需要執行了
//        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
//        if(!$isJobStillNeedToBeDone){
//            $job->delete();
//            return 0;
//        }
        
        if(is_array($data) && isset($data['type'])){
            $type = $data['type'];
            if($type == 1){
                //執行發送郵件業務
                $isJobDone = $this->sendEmail($data['data_id']);
            }else if($type == 2){
                //執行APP推送消息業務
                $isJobDone = $this->sendAppMessage($data['data_id']);
            }else if($type == 3){
                //執行訂單業務
                $isJobDone = $this->orderService($data['data_id']);
            }else{
                return 0;
            }
        }else{
            return 0;
        }
        if ($isJobDone) {
            // 如果任務執行成功,刪除任務
            $job->delete();
        }else{
            if ($job->attempts() > 3) {
                //通過這個方法可以檢查這個任務已經重試了幾次了
                $job->delete();
                // 也可以重新發布這個任務
                //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行
            }
        }
    }
    
    //發郵件業務
    private function sendEmail($id){
        
    }

    //發App消息業務
    private function sendAppMessage($id){

    }

    //處理訂單業務
    private function orderService($id){

    }
}

發布任務到隊列中,創建一個測試代碼,例如創建一個app\message\controller\addJob.php:

namespace app\message\controller;
use think\Exception;
use think\Controller;
use think\Queue;
use think\Db;

class addJob extends Controller{

  public function index(){
        // 1.當前任務將由哪個類來負責處理。
        //   當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
        $jobHandlerClassName  = 'app\message\controller\DoJob';
        // 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建
        $jobQueueName  = "JobQueue";
        // 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串
        //   ( jobData 為對象時,存儲其public屬性的鍵值對 )
        $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ;
        // 4.將該任務推送到消息隊列,等待對應的消費者去執行
        $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
//        dump($isPushed);//字符串
        if( $isPushed !== false ){
            return '消息已發出';
        }else{
            return '消息發送出錯';
        }
    }
}

五、監聽任務並執行

在服務器上,執行框架的根目錄下的think命令:

nohup /usr/local/bin/php /home/www/tp5/think queue:listen --queue JobQueue &

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM