安裝
composer require topthink/think-queue
配置
配置文件位於
config/queue.php
公共配置
[
'default'=>'sync' //驅動類型,可選擇 sync(默認):同步執行,database:數據庫驅動,redis:Redis驅動//或其他自定義的完整的類名 ]
配置消息隊列的驅動
根據選擇的存儲方式,在\app\config\queue.php這個配置文件中,添加消息隊列對應的驅動配置
創建任務類
單模塊項目推薦使用
app\job
作為任務類的命名空間 多模塊項目可用使用app\module\job
作為任務類的命名空間 也可以放在任意可以自動加載到的地方任務類不需繼承任何類,如果這個類只有一個任務,那么就只需要提供一個
fire
方法就可以了,如果有多個小任務,就寫多個方法,下面發布任務的時候會有區別
每個方法會傳入兩個參數think\queue\Job $job
(當前的任務對象) 和$data
(發布任務時自定義的數據)還有個可選的任務失敗執行的方法
failed
傳入的參數為$data
(發布任務時自定義的數據)
例一:消息的消費與刪除
namespace app\job;
use think\queue\Job; class Job1{ public function fire(Job $job, $data){ //....這里執行具體的任務 if ($job->attempts() > 3) { //通過這個方法可以檢查這個任務已經重試了幾次了 } //如果任務執行成功后 記得刪除任務,不然這個任務會重復執行,直到達到最大重試次數后失敗后,執行failed方法 $job->delete(); // 也可以重新發布這個任務 $job->release($delay); //$delay為延遲時間 } public function failed($data){ // ...任務達到最大重試次數后,失敗了 } }
例二:
namespace app\lib\job;
use think\queue\Job; class Job2{ public function task1(Job $job, $data){ } public function task2(Job $job, $data){ } public function failed($data){ } }
例三:多模塊舉例
<?php
/**
* 消息隊列(queue)使用方法
* 使用redis實現消息隊列demo
*/
namespace app\crm\job\export;
use think\facade\Db;
use think\queue\Job;
class QueueJobTest
{
public function fire(Job $job,$data){
// 如有必要,可以根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
$isJobStillNeedToBeDone = $this->checkJobNeedIfDone($data);
if(!$isJobStillNeedToBeDone){
//刪除任務
$job->delete();
return;
}
$isJobDone = $this->saveInfo($data);
//如果任務執行完成,刪除此隊列任務,
if($isJobDone){
//刪除任務
$job->delete();
}else{
//通過這個方法可以檢查這個任務已經重試了幾次了
if ($job->attempts() > 3) {
//如果大於設置的次數3,可執行刪除任務或重新發布此任務
//此處直接執行刪除任務
$job->delete();
//--begin-重新發布此任務-------
//$job->release(2); //參數為設置的延遲時間,表示該任務延遲2秒后再執行
//--end-重新發布此任務--------——
}
}
}
/**
* 可選的任務失敗執行的方法,此處只作列舉
* 任務失敗,執行failed方法
* @param $data 發布任務時自定義的數據
*/
//public function failed($data){
//// ...任務達到最大重試次數后,失敗了
//執行失敗的業務代碼
//}
/**
* 有些消息在到達消費者時,可能已經不再需要執行了
* @param array|mixed $data 發布任務時自定義的數據
* @return boolean 任務執行的結果
*/
private function checkJobNeedIfDone($data){
//可查詢數據庫確認是否需要繼續執行,比如訂單支付的狀態更改等后續操作
//此處省略業務邏輯,直接返回true,表示繼續執行
return true;
}
/**
* 任務執行的業務代碼
* 如發送郵件,數據入庫等
*/
private function saveInfo($data){
//此處模擬數據入庫
$res = Db::name('test')->insert(
[
'title' => $data['title'],
'content' => $data['content'],
'add_time' => time()
]
);
if($res){
return true;
}else{
return false;
}
}
}
發布任務
兩個方法
think\facade\Queue::push($job, $data = '', $queue = null);
//立即執行或
think\facade\Queue::later($delay, $job, $data = '', $queue = null);
//延時執行,在參數$delay
秒后執行在$delay
秒后執行
$job
是任務名
單模塊的,且命名空間是app\job
的,比如上面的例子一,寫Job1
類名即可
多模塊的,且命名空間是app\module\job
的,寫model/Job1
即可
其他的需要些完整的類名,比如上面的例子二,需要寫完整的類名app\lib\job\Job2
如果一個任務類里有多個小任務的話,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1
、app\lib\job\Job2@task2
$data
是你要傳到任務里的參數
$queue
隊列名,指定這個任務是在哪個隊列上執行,同下面監控隊列的時候指定的隊列名,可不填例:
文件路徑:
任務名(類名):app\crm\job\export\QueueJobTest
傳到任務里的參數:['title' => '隊列任務入庫demo','content' => 'xxxx內容']
隊列名(可不填):queueJobTest
//生產者業務代碼中把任務push到隊列中:
think\facade\Queue::push('app\crm\job\export\QueueJobTest', ['title' => '隊列任務入庫demo','content' => 'xxxx內容'], 'queueJobTest')
發布任務
瀏覽器中訪問:http://localhost/index.php/Test/actionQueueJobTest
監聽任務並執行
消費者進行處理隊列
&> php think queue:listen &> php think queue:work
兩種,具體的可選參數可以輸入命令加 --help
查看
處理任務
方式一:
切換當前終端窗口的目錄到項目根目錄下,執行
php think queue:work --queue queueJobTest
或
php think queue:listen --queue queueJobTest
方式二:
supervisor安裝參考:https://www.cnblogs.com/chihuobao/p/15341719.html
可配合supervisor使用,保證進程常駐
添加queueJobTest.ini文件輸入如下內容
[program:queueJobTest]
user=root
command = php /home/wwwroot/test_crm/think queue:listen --queue queueJobTest --timeout 0 --memory 1024
autostart=true
autorestart=false
stderr_logfile=/tmp/queue_stderr.log
stdout_logfile=/tmp/queue_stdout.log