前言
本文中主要記錄TP6 中使用 think-queue 來實現redis的消息隊列和延遲隊列的過程以及其中出現的問題
think-queue:是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:
- 消息的發布,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等
- 隊列的多隊列, 內存限制 ,啟動,停止,守護等
- 消息隊列可降級為同步執行
環境准備(以下是本人的環境)
- WAMP(win10主要是為了方便本地測試使用)
- PHP 7.3.5 thinkphp 6.0.5
- mysql 5.7.26
- apache 2.4.39
- Centos7 redis 6.0.5(部署在線上)
安裝
在此我就不再略過TP6的項目創建過程了,大致就是安裝composer工具,安裝成功以后,再使用composer去創建項目即可。
think-queue 安裝
composer require topthink/think-queue
我所安裝的是目前最新的 3.0 版本
# 安裝需要在項目的根目錄下
安裝完成以后可以在項目根目錄下 vendor > topthink > think-queue
項目中添加驅動配置
我們需要在安裝好的 think-queue > src 下找到 config.php 復制里面的內容,然后在根目錄下 config 目錄下創建 queue.php文件,將復制的內容粘貼進去
<?php /** * 消息隊列配置 */ return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', 6379), 'password' => env('redis.password', ''), 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
# env 指的就是我在外部的資源文件中設置了值
除此之外還需要安裝redis擴展,沒有安裝的就自行去下載安裝即可。
消息隊列實現過程流程圖
1、通過生產者推送消息到消息隊列服務中
2、消息隊列服務將收到的消息存入redis隊列中(zset)
3、消費者進行監聽隊列,當監聽到隊列有新的消息時,獲取隊列第一條
4、處理獲取下來的消息調用業務類進行處理相關業務
5、業務處理后,需要從隊列中刪除消息
功能實現
創建一個生產者
<?php namespace app\api\controller; use app\BaseController; use think\facade\Queue; class Index extends BaseController { public function index() { // echo phpinfo();exit(); // 1.當前任務由哪個類來負責處理 // 當輪到該任務時,系統將生成該類的實例,並調用其fire方法 $jobHandlerClassName = 'app\api\controller\Job1'; // 2.當任務歸屬的隊列名稱,如果為新隊列,會自動創建 $jobQueueName = "helloJobQueue"; // 3.當前任務所需業務數據,不能為resource類型,其他類型最終將轉化為json形式的字符串 $jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1]; // 4.將該任務推送到消息列表,等待對應的消費者去執行 // 入隊列,later延遲執行,單位秒,push立即執行 $isPushed = Queue::later(10, $jobHandlerClassName, $jobData, $jobQueueName); // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false if ($isPushed !== false) { echo '推送成功'; } else { echo '推送失敗'; } } }
創建一個消費者
<?php namespace app\api\controller; use think\facade\Log; use think\queue\Job; class Job1 { /** * fire方法是消息隊列默認調用的方法 * @param Job $job 當前的任務對象 * @param array $data 發布任務時自定義的數據 */ public function fire(Job $job, array $data) { // 有些任務在到達消費者時,可能已經不再需要執行了 $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if (!$isJobStillNeedToBeDone) { $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone){ $job->delete(); echo "刪除任務" . $job->attempts() . '\n'; }else{ if ($job->attempts() > 3){ $job->delete(); echo "超時任務刪除" . $job->attempts() . '\n'; } } } /** * 有些消息在到達消費者時,可能已經不再需要執行了 * @param array $data * @return bool */ private function checkDatabaseToSeeIfJobNeedToBeDone(array $data) { return true; } /** * 根據消息中的數據進行實際的業務處理... * @param array $data * @return bool */ private function doHelloJob(array $data) { echo '執行業務邏輯:' . $data['bizId'] . '\n'; return true; } }
通過瀏覽器訪問
# 我這里是在本地配置了一個域名解析,實際訪問的是127.0.0.1
訪問后,可以看到已經向消息隊列服務推送了消息,此時我們需要在項目根目錄下運行命令創建工作進程來處理隊列中的消息
通過上述可以看到,當我們開啟了work進程時,就會從隊列中獲取任務,然后找到消費者執行后續的業務邏輯。
因為這里我采用的push 表示立即執行,所以只要隊列中有就會立馬執行,如果我們需要使用到延時場景,例如訂單支付超時,這時我們就可以使用later即可
多模塊多功能實現
修改生產者代碼
<?php namespace app\api\controller; use app\BaseController; use think\facade\Queue; class Index extends BaseController { public function index() { // echo phpinfo();exit(); // 1.當前任務由哪個類來負責處理 // 當輪到該任務時,系統將生成該類的實例,並調用其fire方法 $jobHandlerClassName = 'app\api\controller\Job1'; // 2.當任務歸屬的隊列名稱,如果為新隊列,會自動創建 $jobQueueName = "helloJobQueue"; // 3.當前任務所需業務數據,不能為resource類型,其他類型最終將轉化為json形式的字符串 $jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1]; // 4.將該任務推送到消息列表,等待對應的消費者去執行 // 入隊列,later延遲發送,單位秒,push立即發送 $isPushed = Queue::later(10, $jobHandlerClassName, $jobData, $jobQueueName); // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false if ($isPushed !== false) { echo '推送成功'; } else { echo '推送失敗'; } } /** * 多模塊延遲隊列實現 */ public function pay(){ $orderData = [ "orderId" => uniqid() ]; $isPushed = Queue::later(60, "app\api\controller\PayMessage", json_encode($orderData), "helloJobQueue"); if ($isPushed)echo "\n 訂單支付成功 \n"; $email = [ "email" => "1234567890@qq.com" ]; $isPushed = Queue::later(120, "app\api\controller\EmailMessage", json_encode($email), "helloJobQueue"); if ($isPushed)echo "\n 郵件發送成功 \n"; } }
新增支付消息消費者
<?php namespace app\api\controller; use think\queue\Job; class PayMessage { public function fire(Job $job, $data){ $data = json_decode($data, true); if ($this->doJob($data)){ $job->delete(); }else{ if ($job->attempts() > 3){ print_r("訂單超時:" . $data['orderId']); $job->delete(); } } } public function doJob($data){ print_r("發送支付成功通知:" . $data['orderId'] ); return true; } }
新增郵箱發送消費者
<?php namespace app\api\controller; use think\queue\Job; class EmailMessage { public function fire(Job $job, $data){ $data = json_decode($data, true); if ($this->doJob($data)){ $job->delete(); }else{ if ($job->attempts() > 3){ print_r("\n 郵件發送超時:" . $data['orderId'] . '\n '); $job->delete(); } } } public function doJob($data){ print_r("\n 發送郵件:" . $data['email'] .'\n '); return true; } }
通過瀏覽器模擬訪問
因為本次我們使用的是延時隊列所以我們可以到redis中查看
127.0.0.1:6379> keys * 1) "{queues:helloJobQueue}:delayed"
當延時時間到了后,我們可以繼續看到工作進程及時的進行消費
參考地址
think-queue官網文檔:https://github.com/tp5er/think-queue/tree/master/doc
redis參考地址:https://www.runoob.com/redis/redis-tutorial.html