thinkphp6 使用redis 實現消息隊列


安裝

composer require topthink/think-queue

配置

配置文件位於 config/queue.php

 

公共配置

[
    'default'=>'sync' //驅動類型,可選擇 sync(默認):同步執行,database:數據庫驅動,redis:Redis驅動//或其他自定義的完整的類名 ]

配置消息隊列的驅動
根據選擇的存儲方式,在\app\config\queue.php這個配置文件中,添加消息隊列對應的驅動配置

 

創建任務類

單模塊項目推薦使用 app\job 作為任務類的命名空間 多模塊項目可用使用 app\module\job 作為任務類的命名空間 也可以放在任意可以自動加載到的地方

任務類不需繼承任何類,如果這個類只有一個任務,那么就只需要提供一個fire方法就可以了,如果有多個小任務,就寫多個方法,下面發布任務的時候會有區別
每個方法會傳入兩個參數 think\queue\Job $job(當前的任務對象) 和 $data(發布任務時自定義的數據)

還有個可選的任務失敗執行的方法 failed 傳入的參數為$data(發布任務時自定義的數據)

例一:消息的消費與刪除
namespace app\job; use think\queue\Job; class Job1{ public function fire(Job $job, $data){ //....這里執行具體的任務 if ($job->attempts() > 3) { //通過這個方法可以檢查這個任務已經重試了幾次了 } //如果任務執行成功后 記得刪除任務,不然這個任務會重復執行,直到達到最大重試次數后失敗后,執行failed方法 $job->delete(); // 也可以重新發布這個任務 $job->release($delay); //$delay為延遲時間 } public function failed($data){ // ...任務達到最大重試次數后,失敗了 } }
例二:
namespace app\lib\job; use think\queue\Job; class Job2{ public function task1(Job $job, $data){ } public function task2(Job $job, $data){ } public function failed($data){ } }

 

例三:多模塊舉例
<?php
/**
* 消息隊列(queue)使用方法
* 使用redis實現消息隊列demo
*/

namespace app\crm\job\export;


use think\facade\Db;
use think\queue\Job;

class QueueJobTest
{

public function fire(Job $job,$data){
// 如有必要,可以根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
$isJobStillNeedToBeDone = $this->checkJobNeedIfDone($data);
if(!$isJobStillNeedToBeDone){
//刪除任務
$job->delete();
return;
}
$isJobDone = $this->saveInfo($data);
//如果任務執行完成,刪除此隊列任務,
if($isJobDone){
//刪除任務
$job->delete();
}else{
//通過這個方法可以檢查這個任務已經重試了幾次了
if ($job->attempts() > 3) {
//如果大於設置的次數3,可執行刪除任務或重新發布此任務
//此處直接執行刪除任務
$job->delete();
//--begin-重新發布此任務-------
//$job->release(2); //參數為設置的延遲時間,表示該任務延遲2秒后再執行
//--end-重新發布此任務--------——
}
}
}

/**
* 可選的任務失敗執行的方法,此處只作列舉
* 任務失敗,執行failed方法
* @param $data 發布任務時自定義的數據
*/
//public function failed($data){
//// ...任務達到最大重試次數后,失敗了
//執行失敗的業務代碼
//}

/**
* 有些消息在到達消費者時,可能已經不再需要執行了
* @param array|mixed $data 發布任務時自定義的數據
* @return boolean 任務執行的結果
*/
private function checkJobNeedIfDone($data){
//可查詢數據庫確認是否需要繼續執行,比如訂單支付的狀態更改等后續操作
//此處省略業務邏輯,直接返回true,表示繼續執行
return true;
}

/**
* 任務執行的業務代碼
* 如發送郵件,數據入庫等
*/
private function saveInfo($data){
//此處模擬數據入庫
$res = Db::name('test')->insert(
[
'title' => $data['title'],
'content' => $data['content'],
'add_time' => time()
]
);
if($res){
return true;
}else{
return false;
}
}
}

發布任務

兩個方法

think\facade\Queue::push($job, $data = '', $queue = null);       //立即執行

或 

think\facade\Queue::later($delay, $job, $data = '', $queue = null);  //延時執行,在參數 $delay 秒后執行在$delay秒后執行

$job 是任務名
單模塊的,且命名空間是app\job的,比如上面的例子一,寫Job1類名即可
多模塊的,且命名空間是app\module\job的,寫model/Job1即可
其他的需要些完整的類名,比如上面的例子二,需要寫完整的類名app\lib\job\Job2
如果一個任務類里有多個小任務的話,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2

$data 是你要傳到任務里的參數

$queue 隊列名,指定這個任務是在哪個隊列上執行,同下面監控隊列的時候指定的隊列名,可不填

例:

文件路徑:

任務名(類名):app\crm\job\export\QueueJobTest

傳到任務里的參數:['title' => '隊列任務入庫demo','content' => 'xxxx內容']

隊列名(可不填):queueJobTest

//生產者業務代碼中把任務push到隊列中:

think\facade\Queue::push('app\crm\job\export\QueueJobTest',  ['title' => '隊列任務入庫demo','content' => 'xxxx內容'], 'queueJobTest')

 

發布任務

 瀏覽器中訪問:http://localhost/index.php/Test/actionQueueJobTest  

監聽任務並執行

消費者進行處理隊列

&> php think queue:listen &> php think queue:work

兩種,具體的可選參數可以輸入命令加 --help 查看

 

處理任務

方式一:

切換當前終端窗口的目錄到項目根目錄下,執行

php think queue:work --queue queueJobTest

php think queue:listen --queue queueJobTest

 

方式二:

supervisor安裝參考:https://www.cnblogs.com/chihuobao/p/15341719.html  

可配合supervisor使用,保證進程常駐

添加queueJobTest.ini文件輸入如下內容


[program:queueJobTest]
user=root
command = php /home/wwwroot/test_crm/think queue:listen --queue queueJobTest --timeout 0 --memory 1024
autostart=true
autorestart=false
stderr_logfile=/tmp/queue_stderr.log
stdout_logfile=/tmp/queue_stdout.log

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM