TP5 中的redis 隊列


首先我們看一下自己的TP5的框架中的  TP5\vendor\topthink ,這個文件中有沒有think-queue這個文件夾,如果沒有請安裝,

安裝這個是要用到Composer的如果沒有安裝composer,請安裝Composer

1.$ curl -sS https://getcomposer.org/installer | php

2.$ mv composer.phar /usr/local/bin/composer
Linux上安裝 think-queue ,請先進入到框架的根目錄再運行

composer require topthink/think-queue
這個時候再去看就會有 think-queue 這個文件夾了,確定一下看看是否安裝成功,運行

php think queue:work -h
能出現以下 結果 就表示think-queue的 安裝好了

 

 

配置
配置文件位於 application/extra/queue.php

公共配置
 

<?php
return [
'connector' => 'Redis', // Redis 驅動
'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
'default' => 'default', // 默認的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 0, // 使用哪一個 db,默認為 db0
'timeout' => 0, // redis連接的超時時間
'persistent' => false, // 是否是長連接

// 'connector' => 'Database', // 數據庫驅動
// 'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
// 'default' => 'default', // 默認的隊列名稱
// 'table' => 'jobs', // 存儲消息的表名,不帶前綴
// 'dsn' => [],

// 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平台 ,本文不作介紹
// 'token' => '',
// 'project_id' => '',
// 'protocol' => 'https',
// 'host' => 'qns.topthink.com',
// 'port' => 443,
// 'api_version' => 1,
// 'max_retries' => 3,
// 'default' => 'default',

// 'connector' => 'Sync', // Sync 驅動,該驅動的實際作用是取消消息隊列,還原為同步執行
];
 

1.4 消息的創建與推送

我們在業務控制器中創建一個新的消息,並推送到 helloJobQueue 隊列

新增 \application\index\controller\JobTest.php 控制器,在該控制器中添加 actionWithHelloJob 方法

<?php
/**
* 文件路徑: \application\index\controller\JobTest.php
* 該控制器的業務代碼中借助了thinkphp-queue 庫,將一個消息推送到消息隊列
*/
namespace app\index\controller;
use think\Exception;

use think\Queue;

class JobTest {
/**
* 一個使用了隊列的 action
*/
public function actionWithHelloJob(){

// 1.當前任務將由哪個類來負責處理。
// 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
$jobHandlerClassName = 'application\index\job\Hello';
// 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建
$jobQueueName = "helloJobQueue";
// 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串
// ( jobData 為對象時,需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對)
$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
// 4.將該任務推送到消息隊列,等待對應的消費者去執行
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
注意: 在這個例子當中,我們是手動指定的 $jobHandlerClassName ,更合理的做法是先定義好消息名稱與消費者類名的映射關系,然后由某個可以獲取該映射關系的類來推送這個消息。這樣,生產者只需要知道消息的名稱,而無需指定哪個消費者類來處理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );這種方式之外,還可以直接傳入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 這時,需要在 $jobHandlerObject 中定義一個 handle() 方法,消息隊列在執行到該任務時會自動反序列化該對象,並調用其 handle()方法。 該方式的缺點是無法傳入自定義數據。

1.5 消息的消費與刪除

編寫 Hello 消費者類,用於處理 helloJobQueue 隊列中的任務

新增 \application\index\job\Hello.php 消費者類,並編寫其 fire() 方法

<?php
/**
* 文件路徑: \application\index\job\Hello.php
* 這是一個消費者類,用於處理 helloJobQueue 隊列中的任務
*/
namespace app\index\job;

use think\queue\Job;

class Hello {

/**
* fire方法是消息隊列默認調用的方法
* @param Job $job 當前的任務對象
* @param array|mixed $data 發布任務時自定義的數據
*/
public function fire(Job $job,$data){
         // 如有必要,可以根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.
         $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
         if(!isJobStillNeedToBeDone){
$job->delete();
return;
}
       
$isJobDone = $this->doHelloJob($data);

if ($isJobDone) {
//如果任務執行成功, 記得刪除任務
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
if ($job->attempts() > 3) {
//通過這個方法可以檢查這個任務已經重試了幾次了
print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
// 也可以重新發布這個任務
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行
}
}
}

/**
      * 有些消息在到達消費者時,可能已經不再需要執行了
* @param array|mixed $data 發布任務時自定義的數據
* @return boolean 任務執行的結果
      */
     private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}

/**
* 根據消息中的數據進行實際的業務處理
* @param array|mixed $data 發布任務時自定義的數據
* @return boolean 任務執行的結果
*/
private function doHelloJob($data) {
// 根據消息中的數據進行實際的業務處理...

print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
print("<info>Hello Job is Done!"."</info> \n");

return true;
}
}

至此,所有的代碼都已准備完畢,在運行消息隊列之前,我們先看一下現在的目錄結構:

 

1.6 發布任務

在瀏覽器中訪問 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到消息推送成功。

 

這個時候去Linux中鏈接redis,查看redis的隊列任務,就可以看到有數據在里面

連接redis:/usr/local/redis/bin/redis-cli -h 12.131.12.12 -p 6379

redis對列的命令,顯示helloJobQueue對列中的數據:LRANGE queues:helloJobQueue 0 -1  

顯示對列中有幾條數據:Llen queues:helloJobQueue

 

看到這樣就說明已經加入到隊列了,截圖是加入了好多條

 

 

1.7 處理任務

切換當前終端窗口的目錄到項目根目錄下,執行

php think queue:work --queue helloJobQueue
可以看到執行的結果類似如下:

 

​由於php think queue:work --queue helloJobQueue這個命令只能在TP5框架的根目錄才能運行成功,所以,shell腳本要先cd到框架的根目錄,具體見下面的shell腳本截圖

至此,我們成功地經歷了一個消息的 創建 -> 推送 -> 消費 -> 刪除 的基本流程

但是!!! 但是!!! 但是!!!,重要的事說3遍

雖然現在是可以將消息的創建 -> 推送 -> 消費 -> 刪除 的基本流程全部跑通,但是每次執行 php think queue:work --queue helloJobQueue 這個命令都是進行了一次,也就是說,在對列 helloJobQueue 中有5條要處理的數據,每次執行 php think queue:work --queue helloJobQueue 都是只執行了一條數據,還有4條數據沒有處理,我們要的是執行一次可以直接將對列中的數據全部處理掉,於是,我們想到定時任務去處理

首先我們寫兩個shell腳本

1.monitorHandleQueue.sh,作用是檢查隊列的進程是否在運行


pid=$(ps -ef| grep handleQueue |grep -v grep | awk ' NR==1 {print $2}')

if [ -z $pid ]
then
sh /home/wwwroot/default/www/thinkphp5/handleQueue.sh &>/dev/null 2>&1
fi
mysqld是進程名稱

檢查進程是否存在,如果不存在啟動handleQueue.sh腳本,注意:monitorHandleQueue.sh腳本中的啟動handleQueue.sh的路徑寫自己的,NR==1表示只取第一個進程,|grep -v grep 過濾掉自己的進程

2.handleQueue.sh 腳本

cd /home/wwwroot/default/www/thinkphp5
while [ 2 > 0 ]
do
len=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:helloJobQueue`
if [ $((len + 0 )) -gt 0 ];then
php think queue:work --queue helloJobQueue
else
sleep 3
php think queue:work --queue helloJobQueue
fi
done
此腳本中的cd /home/wwwroot/default/www/thinkphp5,一定要切換到框架的根目錄,解釋一下handleQueue.sh腳本的邏輯:先切換到框架的根目錄,while判斷2大於0為真,所以會一直執行,連接到redis,獲取隊列的長度,if判斷,如果隊列的長度大於0直接執行隊列,否則就停3秒再執行隊列,很簡單,寫了很長時間,還有一點要注意,shell腳本最好不要在編輯器編輯,直接在Linux上編輯,因為如果在編輯器上編輯上傳到Linux上會產生意想不到的問題(我在這里耽誤了很長時間),找不到問題所在就直接在Linux上編寫好了,省的麻煩

上面的shell腳本是我第一次寫,碰到了很多問題

1.[ 2 > 0 ]格式為[空格判斷表達式空格]

2.len=`/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:helloJobQueue`,等於號的左右兩邊不能有空格,反引號 `` 不知道怎么輸出,在1是我左邊的那個鍵

3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 變量大於0,大於號用 -gt 表示

創建定時任務

*/1 * * * *  /home/wwwroot/default/www/thinkphp5/monitorHandleQueue.sh &>/dev/null 2>&1

好了,至此一個循環請求隊列就寫好了

 

 

 

參考:https://blog.csdn.net/will5451/article/details/80434174

參考:https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

原文:https://blog.csdn.net/dabao87/article/details/82414839 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM