1.簡介
thinkphp-queue是thinkphp的一個第三方擴展, 內置了 Redis,Database,Topthink ,Sync這四種驅動,推薦使用redis
2. 下載 和安裝
composer require topthink/think-queue
配置目錄在: application/extra/queue.php
return [ 'connector' => 'Redis', // Redis 驅動 'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null 'default' => 'default', // 默認的隊列名稱 'host' => '127.0.0.1', // redis 主機ip 'port' => 6379, // redis 端口 'password' => '', // redis 密碼 'select' => 0, // 使用哪一個 db,默認為 db0 'timeout' => 0, // redis連接的超時時間 'persistent' => false, // 是否是長連接 // 'connector' => 'Database', // 數據庫驅動 // 'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null // 'default' => 'default', // 默認的隊列名稱 // 'table' => 'jobs', // 存儲消息的表名,不帶前綴 // 'dsn' => [], // 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平台 ,本文不作介紹 // 'token' => '', // 'project_id' => '', // 'protocol' => 'https', // 'host' => 'qns.topthink.com', // 'port' => 443, // 'api_version' => 1, // 'max_retries' => 3, // 'default' => 'default', // 'connector' => 'Sync', // Sync 驅動,該驅動的實際作用是取消消息隊列,還原為同步執行 ];
3.入隊,創建隊列的代碼
/** * 文件路徑: \application\index\controller\JobTest.php * 該控制器的業務代碼中借助了thinkphp-queue 庫,將一個消息推送到消息隊列 */ namespace application\index\controller; use think\Exception; use think\Queue; class JobTest { /** * 一個使用了隊列的 action */ public function actionWithHelloJob(){ // 1.當前任務將由哪個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法 $jobHandlerClassName = 'application\index\job\Hello'; // 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建 $jobQueueName = "helloJobQueue"; // 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串 // ( jobData 為對象時,需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對) $jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ; // 4.將該任務推送到消息隊列,等待對應的消費者去執行 $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false if( $isPushed !== false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>"; }else{ echo 'Oops, something went wrong.'; } } }
如果是多個任務:如果一個任務類里有多個小任務的話,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1
、app\lib\job\Job2@task2
4.消費隊列的代碼:
<?php namespace app\test\job; use think\queue\Job; class Hello { /** * fire方法是消息隊列默認調用的方法 * @param Job $job 當前的任務對象 * @param array|mixed $data 發布任務時自定義的數據 */ public function fire(Job $job,$data){ // 如有必要,可以根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //如果任務執行成功, 記得刪除任務 $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\n"); }else{ if ($job->attempts() > 3) { //通過這個方法可以檢查這個任務已經重試了幾次了 print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); $job->delete(); // 也可以重新發布這個任務 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行 } } } /** * 有些消息在到達消費者時,可能已經不再需要執行了 * @param array|mixed $data 發布任務時自定義的數據 * @return boolean 任務執行的結果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根據消息中的數據進行實際的業務處理 * @param array|mixed $data 發布任務時自定義的數據 * @return boolean 任務執行的結果 */ private function doHelloJob($data) { // 根據消息中的數據進行實際的業務處理... var_dump($data); // print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n"); // print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n"); // print("<info>Hello Job is Done!"."</info> \n"); return true; } }
執行代碼:
php think queue:work --queue helloJobQueue
5.總結:
5.1 命名:
-
queue:subscribe 命令 [截至2017-02-15,作者暫未實現該模式,略過]
-
queue:work 命令
work 命令: 該命令將啟動一個 work 進程來處理消息隊列。
php think queue:work --queue helloJobQueue
-
queue:listen 命令
listen 命令: 該命令將會創建一個 listen 父進程 ,然后由父進程通過
proc_open(‘php think queue:work’)
的方式來創建一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。php think queue:listen --queue helloJobQueue
queue:restart 重新開啟
參考資料:https://blog.csdn.net/will5451/article/details/80434174
https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645
https://github.com/top-think/think-queue