queue隊列的使用
一、開發環境:
ThinkPHP 5.0框架。
參考文檔:
簡書:https://www.jianshu.com/p/f5e33215c13c。
packagist官網:https://packagist.org/packages/topthink/think-queue#v1.1.6
github:https://github.com/top-think/think-queue
weixin_34163553的《thinkPHP5的隊列使用》。
二、安裝queue擴展,有的下載后框架可能自帶queue擴展。
composer require topthink/think-queue
這個命令時下載最新的queue擴展,但是最新的是針對ThinkPHP6.0的,而開發框架是5.0,所以要在php包擴展官網找對應的版本。
1、打開php擴展列表官網https://packagist.org/。
2、搜think-queue,打開第一個https://packagist.org/packages/topthink/think-queue。
3、查看每個版本對框架和環境的要求,1.1.6要求thinkphp是5.0.0,所以選擇這個。
composer require topthink/think-queue 1.1.6
三、配置(服務器需要安裝有redis):
配置文件位於 application/extra/queue.php
return [ // 'connector' => 'Sync' 'connector' => 'Redis', // Redis 驅動 'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null 'default' => 'default', // 默認的隊列名稱 'host' => '127.0.0.1', // redis 主機ip 'port' => 6379, // redis 端口 'password' => '', // redis 密碼 'select' => 1, // 使用哪一個 db,默認為 db0 'timeout' => 0, // redis連接的超時時間 'persistent' => false, // 是否是長連接 ];
四、代碼:
創建一個控制器,執行隊列里的任務,例如創建一個app\message\controller\DoJob.php。
namespace app\message\controller; use think\Exception; use think\Queue; use think\Queue\Job; use think\Db; class DoJob{ /** * fire方法是消息隊列默認調用的方法 * @param Job $job 當前的任務對象 * @param $data 發布任務時自定義的數據 * @return int */ public function fire(Job $job,$data){ //這里$data定義格式為:$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()] if(empty($data)){ return 0; } // 有些消息在到達消費者時,可能已經不再需要執行了 // $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); // if(!$isJobStillNeedToBeDone){ // $job->delete(); // return 0; // } if(is_array($data) && isset($data['type'])){ $type = $data['type']; if($type == 1){ //執行發送郵件業務 $isJobDone = $this->sendEmail($data['data_id']); }else if($type == 2){ //執行APP推送消息業務 $isJobDone = $this->sendAppMessage($data['data_id']); }else if($type == 3){ //執行訂單業務 $isJobDone = $this->orderService($data['data_id']); }else{ return 0; } }else{ return 0; } if ($isJobDone) { // 如果任務執行成功,刪除任務 $job->delete(); }else{ if ($job->attempts() > 3) { //通過這個方法可以檢查這個任務已經重試了幾次了 $job->delete(); // 也可以重新發布這個任務 //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行 } } } //發郵件業務 private function sendEmail($id){ } //發App消息業務 private function sendAppMessage($id){ } //處理訂單業務 private function orderService($id){ } }
發布任務到隊列中,創建一個測試代碼,例如創建一個app\message\controller\addJob.php:
namespace app\message\controller; use think\Exception; use think\Controller; use think\Queue; use think\Db; class addJob extends Controller{ public function index(){ // 1.當前任務將由哪個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法 $jobHandlerClassName = 'app\message\controller\DoJob'; // 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建 $jobQueueName = "JobQueue"; // 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串 // ( jobData 為對象時,存儲其public屬性的鍵值對 ) $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ; // 4.將該任務推送到消息隊列,等待對應的消費者去執行 $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // dump($isPushed);//字符串 if( $isPushed !== false ){ return '消息已發出'; }else{ return '消息發送出錯'; } }
}
五、監聽任務並執行
在服務器上,執行框架的根目錄下的think命令:
nohup /usr/local/bin/php /home/www/tp5/think queue:listen --queue JobQueue &
