消息隊列-一篇讀懂rabbitmq(生命周期,confirm模式,延遲隊列,集群)


 

什么是消息隊列?

就是生產者生產一條消息,發送到這個rabbitmq,消費者連接rabbitmq並且進行消費,生產者和消費者並需要知道對方是如何工作的,從而實現程序之間的解耦,異步和削峰,這也就是消息隊列的作用。

使用的場景也有很多,比如用戶支付購買之后的發送短信,增加用戶積分等等,只要能將業務邏輯抽象出來,就能很好得使用它。

 

下面進入正題:

先來介紹一下基本概念和參與生命周期的各個成員。

publisher:消息生產者,負責創建消息,並發送到代理服務器(rabbitmq)

message:發送的消息,由 有效負載(payload) 和 標簽 (label) 組成

exchange:交換器,負責接收消息並路由給服務器的隊列

queue:消息隊列,就是消息最后要去的地方。然后等待消費者取走並消費

consumer:消息消費者,與生產者對應,程序的另外一方,負責消費信息,並完成相應的業務邏輯

channel:信道,在tcp之上建立的通道,負責傳送消息。隊列的傳輸都是基於信道來完成的。

broker:消息隊列服務器實體

下面來解析一下這張圖,這張圖是網上找的,雖然不夠詳細,但是勉強能用。

 

准備前提,開啟rabbitmq服務,生命隊列和交換器,並將兩者進行綁定。

 

生產邏輯:

publisher 通過 broker服務器ip 和 端口 嘗試建立與 broker 的 tcp 連接,連接成功之后,會嘗試驗證驗證用戶名和密碼,如果錯誤,則拒絕訪問。

驗證成功之后,在tcp上建立一個 信道(channel) ,publisher 通過信道,發送消息到 broker,進入指定的虛擬主機virtual host。然后查找到交換器綁定的隊列,並將消息推送進入隊列。

 

消費邏輯:

consumer 建立連接和驗證和生產者一樣。接下來通過隊列名,直接找到隊列進行監聽並消費。

 

有幾個比較重要的知識點:

1.  rabbitmq的交換器並不是真正意義的交換器,它本質上其實就是一張表,里面存放和交換器名稱和消息隊列的映射關系。所以說隊列的傳輸都是通過信道來完成的。

2.  信道存在的意義在於 創建和銷毀tcp連接非常消耗資源

 

 

交換器的類型

當生產的消息進入虛擬主機時,會去尋找一張表,就是交換器和隊列映射關系的一張表。而交換器的類型也分了好多種,可以根據不同的場景自由選擇。

目前總共分了4種,direct,fanout,topic,headers。其中headers因為性能問題幾乎不在使用,這里就不做過多的討論。

1.direct

direct是直接,完全匹配,單播的模式。

php簡單代碼實現:

//創建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declare();

//創建交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declare();

//綁定交換機與隊列,並指定路由鍵
$q->bind($e_name, $k_route);

//生產消息
$ex->publish($message, $k_route)

特性: 當生產消息時,未指定交換器,則會默認使用 (AMQP default) 交換器,然后路由到 和路由鍵名稱相同的隊列中去

 

 

2.fanout

如果說 direct 是 單對單 的關系,那么 fanout就是單對多的關系,即一個交換器對應多個隊列。

fanout交換器不通過路由鍵路由到隊列,而是通過將隊列綁定在交換器上,當消息進來時,直接路由到該交換器綁定的隊列去。

 

 

3.topic

topic交換器通過路由鍵,將自動匹配允許匹配的隊列,相比fanout,更加靈活,不過對架構要求更高。如下圖所示

$e_name = 'logs-exchange'; //交換機名
$q_name = 'msg-inbox-errors'; //隊列名

//創建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declare();

//創建交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_TOPIC); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declare();


$q->bind($e_name, '*.msg-inbox');

$ex->publish($message, 'wonima.msg-inbox');

 

高級特性:

為了確保消息可靠性,有兩種處理方式.

1.rabbitmq事務

事務主要是對信道進行設置,示例代碼如下

$channel->startTransaction(); //開始事務
for($i=0; $i<5; ++$i){
    $message = "TEST MESSAGE! 測試消息!";
    $message = $message.$i."---";
    echo "Send Message:".$ex->publish($message, 'xxxx')."\n";

}
$channel->commitTransaction(); //提交事務

經測驗,使用事務之后,性能會造成相當大的影響,與不實用事務相比,性能可以相差百倍以上。

 

2.confirm 模式

當信道設置未 confirm 模式的時候,每一條消息都會獲的唯一的id。當消費者接收到消息的時候,自動發送 或 手動發送消息  進行消息確認。

//創建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";

//第一種:自動應答
//$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答

//第二種:手動應答 $q->consume('processMessage'); /** * 消費回調函數 * 處理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); sleep(2); $myfile = fopen("newfile2.txt", "a+") or die("Unable to open file!"); $txt = $msg.time()."\n"; fwrite($myfile, $txt); fclose($myfile); echo $msg.time()."\n"; //處理消息 $q->ack($envelope->getDeliveryTag()); //手動發送ACK應答 }

 

應答模式最大的好處是就是異步,執行效率高。事務和應答模式相比,后者使用更加頻繁,前者幾乎沒有見到過。

 

延遲隊列

首先聲明rabbitmq是不支持延遲隊列的,但是我們可以利用死信隊列來完成。

 

 

實現延遲隊列也有多種方式:

第一種:設置死信隊列,並將 過期時間 加到隊列里面

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {

        echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setFlags(AMQP_DURABLE);//持久化
    $exchange->setName($params['exchangeName'] ?: '');
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange->declareExchange();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName'] ?: '');
    $queue->setFlags(AMQP_DURABLE);
    $queue->setArguments(array(
        'x-dead-letter-exchange' => 'last_exchange',
        'x-dead-letter-routing-key' => 'last_route',
        'x-message-ttl' => 10000,
    ));
    $queue->declareQueue();

    //綁定
    $queue->bind($params['exchangeName'], $params['routeKey']);

    $exchange2 = new AMQPExchange($channel);
    $exchange2->setFlags(AMQP_DURABLE);//持久化
    $exchange2->setName('last_exchange');
    $exchange2->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange2->declareExchange();

    $queue2 = new AMQPQueue($channel);
    $queue2->setName('last_queue');
    $queue2->setFlags(AMQP_DURABLE);
    $queue2->declareQueue();

    $queue2->bind('last_exchange', 'last_queue');

} catch (Exception $e) {

}

$time = time();
//生成消息
$exchange->publish((string)$time, $params['routeKey'], AMQP_MANDATORY, [
    'delivery_mode' => 2,
]);

 

第二種:設置死信隊列,並將 過期時間 加到消息里面,這一種更加自由。

$msg = [
    'x-message-ttl' => 5,
    'ttl' => 5,
    'body' => time()
];
$msg = json_encode($msg);
$exchange->publish($msg, '', AMQP_MANDATORY, ['delivery_mode' => 2]);

 

第三種:使用延遲插件

 

集群

先來談談rabbitmq的集群是如何運行的

 

當你開啟來兩個rabbitmq(節點)服務,並將其組成為一個集群。每個節點並不會將所有的隊列進行拷貝,元數據依舊保存在單個節點當中,其他節點則是通過指針。

舉個例子:節點a和節點b組成了一個集群,節點a保存着一堆元數據 c 和 元數據d的指針,用來指向節點b,節點b保存一堆元數據d 和 元數據 c的指針,用來指向節點a。

這樣做有兩個原因

1 存儲空間 :如果一個節點存儲了1gb的數據,再添加節點,只會帶來一摸一樣的1gb的數據,非常浪費磁盤空間

2 性能:對於持久化消息來說,每一條消息都會觸發磁盤io,每次新增節點,網路和磁盤負載都會增加,相對於單機來說,性能不但不會提升,反而可能下降。

但是由於交換器只是一張查詢表,並非實際的路由器,因此將交換器在整個集群進行復制也不會損耗太多的性能,所以交換器在每個節點都會保存一份,以便於查詢。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM