什么是消息隊列機制
消息(Message):傳輸的數據。
隊列(Queue):隊列是一種先進先出的數據結構。
消息隊列從字面的含義來看就是一個存放消息的容器。
消息隊列可以簡單理解為:把要傳輸的數據放在隊列中。
把數據放到消息隊列叫做生產者
從消息隊列里邊取數據叫做消費者
一般來說,消息隊列是一種異步的服務間通信方式,是分布式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構
消息隊列使用場景
為什么要用消息隊列,也就是在問:用了消息隊列有什么好處。
消息隊列在實際應用中包括如下三個場景:解耦,異步,削峰
1.應用耦合:多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導致整個過程失敗;
2.異步處理:多應用對消息隊列中同一消息進行處理,應用間並發處理消息,相比串行處理,減少處理時間;
3.限流削峰:廣泛應用於秒殺或搶購活動中,避免流量過大導致應用系統掛掉的情況;
TP6 基於 redis 實現消息隊列和延遲隊列
TP6 中使用 think-queue 來實現redis的消息隊列和延遲隊列的過程以及其中出現的問題
think-queue:是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:
- 消息的發布,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等
- 隊列的多隊列, 內存限制 ,啟動,停止,守護等
- 消息隊列可降級為同步執行
think-queue 安裝
composer require topthink/think-queue
安裝需要在項目的根目錄下
安裝完成以后可以在項目根目錄下 vendor > topthink > think-queue進行查看
創建驅動配置文件
如果config下沒有queue.php,我們需要在安裝好的 think-queue > src 下找到 config.php 復制里面的內容,然后在根目錄下 config 目錄下創建 queue.php文件,將復制的內容粘貼進去
<?php
/**
* 消息隊列配置
*/
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => env('redis.host', '127.0.0.1'),
'port' => env('redis.port', 6379),
'password' => env('redis.password', ''),
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
# env 指的就是我在外部的資源文件中設置了值
消息隊列實現過程
1、通過生產者推送消息到消息隊列服務中
2、消息隊列服務將收到的消息存入redis隊列中
3、消費者進行監聽隊列,當監聽到隊列有新的消息時,獲取隊列第一條
4、處理獲取下來的消息調用業務類進行處理相關業務
5、業務處理后,需要從隊列中刪除消息
功能實現
創建一個生產者
<?php
namespace app\api\controller;
use app\BaseController;
use think\facade\Queue;
class Index extends BaseController
{
public function index()
{
// 1.當前任務由哪個類來負責處理
// 當輪到該任務時,系統將生成該類的實例,並調用其fire方法
$jobHandlerClassName = 'app\api\controller\Job1';
// 2.當任務歸屬的隊列名稱,如果為新隊列,會自動創建
$jobQueueName = "helloJobQueue";
// 3.當前任務所需業務數據,不能為resource類型,其他類型最終將轉化為json形式的字符串
$jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];
// 4.將該任務推送到消息列表,等待對應的消費者去執行
// 入隊列,later延遲執行,單位秒,push立即執行
$isPushed = Queue::later(10, $jobHandlerClassName, $jobData, $jobQueueName);
// database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false
if ($isPushed !== false) {
echo '推送成功';
} else {
echo '推送失敗';
}
}
}
創建一個消費者
<?php
namespace app\api\controller;
use think\facade\Log;
use think\queue\Job;
class Job1
{
/**
* fire方法是消息隊列默認調用的方法
* @param Job $job 當前的任務對象
* @param array $data 發布任務時自定義的數據
*/
public function fire(Job $job, array $data)
{
// 有些任務在到達消費者時,可能已經不再需要執行了
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if (!$isJobStillNeedToBeDone) {
$job->delete();
return;
}
$isJobDone = $this->doHelloJob($data);
if ($isJobDone){
$job->delete();
echo "刪除任務" . $job->attempts() . '\n';
}else{
if ($job->attempts() > 3){
$job->delete();
echo "超時任務刪除" . $job->attempts() . '\n';
}
}
}
/**
* 有些消息在到達消費者時,可能已經不再需要執行了
* @param array $data
* @return bool
*/
private function checkDatabaseToSeeIfJobNeedToBeDone(array $data) {
return true;
}
/**
* 根據消息中的數據進行實際的業務處理...
* @param array $data
* @return bool
*/
private function doHelloJob(array $data)
{
echo '執行業務邏輯:' . $data['bizId'] . '\n';
return true;
}
}
通過瀏覽器訪問,開始觸發消息隊列
此時我們需要在項目根目錄下運行命令創建工作進程來處理隊列中的消息
php think queue:work --queue helloJobQueue
通過上述可以看到,當我們開啟了work進程時,就會從隊列中獲取任務,然后找到消費者執行后續的業務邏輯。
因為這里我采用的push 表示立即執行,所以只要隊列中有就會立馬執行,如果我們需要使用到延時場景,例如訂單支付超時,這時我們就可以使用later即可