本片文章會介紹以下章節,使大家能夠有一些基本的認知。
- 什么是交換器、路由器、綁定,以及三者的關系
- 交換器類型
- 消費模式
- 交換器、隊列、消息的持久化
- 消息發送的過程以及可靠消息投遞機制
- 消息與隊列的過期事件
- 死信隊列
什么是交換器、路由器、綁定
RabbitMQ的很多強大功能和靈活性來自於AMQP規范。不像HTTP和SMTP協議,AMQP規范不僅定義了一種網絡協議,同時也定義了服務器端的服務和行為。這些信息就是高級消息隊列(Advanced Message Queuing,AMQ)模型。針對代理服務器軟件,AMQ模型在邏輯上定義了三種抽象組件用於指定消息的路由行為:
■ 交換器(Exchange),消息代理服務器中用於把消息路由到隊列的組件。
■ 隊列(Queue),用來存儲消息的數據結構,位於硬盤或內存中。
■ 綁定(Binding),一套規則,用於告訴交換器消息應該被存儲到哪個隊列。
交換器、路由鍵、綁定
交換器:Exchange
生產者將消息,發送到對應的 Exchange,由Exchange將消息路由到一個或者多個隊列中去。如果路由不到,或許返回給生產者或許直接丟棄。
路由鍵:RoutingKey
生產者發送消息時,一般會指定RoutingKey,用於指定路由規則,RoutingKey需要與交換器類型和綁定鍵聯合使用才能生效。
綁定:Binding
通過綁定把交換器與隊列關聯起來,在綁定的時候一般會指定一個bindkey,這樣mq就知道如何正確的將消息路由到隊列了。可以把RoutingKey看做是RoutingKey
交換器類型
Fanout
把發送到此交換機的消息,路由到所有綁定的隊列。
Direct
把發送到此交換機的消息,路由到與BindingKey和RoutingKey完全匹配的隊列中。
如圖,如果發送 RoutingKey為warning的消息,會路由到 Queue1、Queue2中,如果key為 info、debug會路由到Queue2中,如果為其他的key,則不會路由到這兩個隊列中。
Topic
Direct是完全匹配,而Topic在匹配規則上進行了擴展,與Direct相似,也是將消息路由到BindKey和RoutingKey相匹配的隊列中,但是匹配規則不同。
以 點號 “.” 作為分隔符,如 com.rabbitmq.client, com.hidden.client,java.util,concurrent,com.hidden.demo。
BindKey,RoutingKey也是通過 “.”分隔的字符串。
BindKey中可以存在兩個字符 “*”,“#”,其中,# 標識 零至多個單詞,* 標識 匹配一個單詞
假如我們聲明了如下RoutingKey
*.rabbitmq.*
*.*.client
com.#
匹配規則請看下圖
Headers
將消息中的headers與該Exchange相關聯的所有Binging中的參數進行匹配,如果匹配上了,則發送到該Binding對應的Queue中。
消費模式
在Rabbitmq中,有兩種消費模式 :PULL(basic_get),PUSH(basic_consume),兩者的區別是
get方法,每次獲取一條,每次都是一條新的請求;而consume將channel(信道)設置為訂閱模式,服務器會一直推送消息到消費者直到隊列被消費完。
消費確認與拒絕
為了保證消息正確的到達消費者。Rabbitmq提供了消息確認機制。
當消費者在訂閱隊列時,可以指定autoAck參數,等於false時,rabbitmq會顯示的等待消費者回復確認信號后,才從內存或硬盤中刪除消息,當等於true是,會把發送的消息直接刪除,而不管是否真的能到達消費者。
rabbitmq 不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞的唯一依據是消費者已經斷開。
確認消息
$channel->basic_ack($deliveryTag,$requeue);
拒絕消息
$channel->basic_reject();
批量拒絕消息
$channel->basic_nack($deliveryTag,$multiple,$requeue);
操作與實踐(PHP版本)
添加composer
"php-amqplib/php-amqplib": ">=2.9.0”
生產者
// 鏈接
$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest', '/');
$channel = $connection->channel(); // 聲明 交換機 指定類型 $channel->exchange_declare('exchange-direct','direct’); // 聲明隊列 $channel->queue_declare('queue-direct'); $channel->queue_declare('queue-direct1'); // 綁定隊列-交換機-key,若一個隊列里有多個routingkey,只能在回調里過濾 $channel->queue_bind('queue-direct','exchange-direct','a’); $channel->queue_bind('queue-direct','exchange-direct’,'b’); $channel->queue_bind('queue-direct1','exchange-direct’,'c’); // 發送消息 $msg = new \PhpAmqpLib\Message\AMQPMessage('hello'); // 消息會到queue-direct1隊列 $channel->basic_publish($msg, ‘exchange-direct’,’c');
消費者
include_once '../vendor/autoload.php';
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $callback = function ($msg) { echo 'routing_key', $msg->delivery_info['routing_key’],"\n"; echo ' [x] Received ', $msg->body, "\n"; }; $channel->basic_consume('queue-direct1', '', false, false, false, false, $callback); // 采用 推模式 while ($channel->is_consuming()) { $channel->wait(); }
持久化
持久化可以提高rabbitmq的可靠性,防止異常情況下(重啟、關閉、宕機)的數據丟失。
rabbitmq的持久化分為3部分:交換機、隊列、消息。
交換機、隊列的持久化通過在聲明時把durable設置為true來實現,但只能保證交換機、隊列的元數據不會丟失,不能保證隊列消息不回丟失。
消息的持久化需要設置 deliveryMode = 2,所以在使用持久化的時候,不能單單對某一項設置持久化。
$msg = new AMQPMessage('這是exchange的消息',['deliveryMode'=>2]);
需要注意的是,並不是設置為持久化,就100%確保數據不會丟失。
比如在autoAck時,消費者收到消息,還沒有來得及處理發生宕機這樣也算消息丟失,可以采用手動ack解決。
其次,持久化是需要寫入硬盤的,但不是為每一條都進行同步存盤(參考fsync)操作,可能暫時保存在了緩沖區中,如果這個時間段發生異常,也會消息丟失。可以使用 鏡像隊列解決。
還可以使用 事務機制和確認機制來保證消息已經正確發送且存儲。
消息發送的過程以及可靠消息投遞機制
確認機制有兩種方式
1. 事務確認機制
2. 發送方確認機制
事務確認機制
- channel.tx_select() 聲明事務
- channel.tx_commit() 提交事務
- channel.tx_rollback() 回滾事務
可以看下圖,AMQP的協議流轉
發送方確認機制
RABBITMQ 可能會遇到一個問題,發送方並不知道消息是否真正到達MQ,在除了事務機制之外,引入了confirm模式,比事務機制更輕量。
生產者將信道設置成 confirm(確認)模式,一旦信道進入 confirm 模式,所有在該信道上面發布的消息都會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ 就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一 ID),這就使得生產者知曉消息已經正確到達了目的地了。如果消息和隊列是可持久化的,那么確認消息會在消息寫入磁盤之后發出。RabbitMQ 回傳給生產者的確認消息中的 deliveryTag 包含了確認消息的序號,此外 RabbitMQ 也可以設置 channel.basicAck 方法中的 multiple 參數,表示到這個序號之前的所有消息都已經得到了處理。
事務機制在一條消息發送之后會使發送端阻塞,以等待 RabbitMQ 的回應,之后才能繼續發送下一條消息。相比之下,發送方確認機制最大的好處在於它是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用程序便可以通過回調方法來處理該確認消息,如果 RabbitMQ 因為自身內部錯誤導致消息丟失,就會發送一條 nack(Basic.Nack)命令,生產者應用程序同樣可以在回調方法中處理該 nack 命令。
- channel.confirm() 將 信道 設置為confirm模式
示例:
// 生產者
// 鏈接
$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest', '/');
$channel = $connection->channel(); // 聲明 交換機 指定類型 $channel->exchange_declare('exchange-direct','direct’); // 聲明隊列 $channel->queue_declare('queue-direct'); // 綁定隊列-交換機-key,若一個隊列里有多個routingkey,只能在回調里過濾 $channel->queue_bind('queue-direct','exchange-direct','a’); // 開啟confirm模式 $channel->confirm_select(); // 設置nack 回調 $channel->set_nack_handler(function ($msg) { var_dump($msg->body); echo 'nack'; }); // 設置ack 回調 $channel->set_ack_handler(function ($msg) { var_dump($msg->body); echo 'ack'; }); // 發送消息 $msg = new \PhpAmqpLib\Message\AMQPMessage('hello’); // 等待 $channel->wait_for_pending_acks_returns(); // 消息會到queue-direct隊列 $channel->basic_publish($msg, ‘exchange-direct’,’a');
消息與隊列的過期時間
1.設置消息的TTL
目前有兩種方法設置,第一種是設置在隊列上即隊列上所有消息一起過期,第二種是 單獨對消息進行設置過期時間,當兩者一起使用時以 過期時間最小的為准,消息一旦過期就會變成 死信 (后邊講)。
第一種:在聲明隊列時,添加x-message-tt信息
$set = new AMQPTable();
$set->set('x-message-ttl', 3000); $channel->queue_declare("queue-1", false, false, false, true, false, $set, null);
第二種:在發送消息時,添加expiration信息
$msg = new AMQPMessage('這是exchange-dlx的消息',['expiration'=>3000]);
2.設置隊列的TTL
控制隊列被自動刪除前處於未使用狀態的時間。
未使用的意思是:隊列上沒有任何消費者,隊列沒有重新聲明,並且在過期時間段內沒有被get過。
Rabbitmq會確保 隊列在過期后刪除隊列,但不能保證很及時,另外Rabbitmq重啟后,持久化隊列的過期時間也會被重新計算。
$set = new AMQPTable();
$set->set('x-expire', 3000); $channel->queue_declare("queue-1", false, false, false, true, false, $set, null);
死信
消息變成死信有以下幾種情況:
1.消息被拒絕(basic.reject/basic.nack),且設置requeue為false。
2.消息過期
3.隊列到達最大長度
基於消息的過期時間添加,x-dead-letter-exchange或x-dead-letter-routingkey。
需要注意,在設置參數時,交換機可以不存在,但是在死信到達的時候,交換機及隊列一定要存在,不然會收不到消息。
$set = new AMQPTable();
$set->set('x-message-ttl', 3000); $set->set('x-dead-letter-exchange', 'exchange'); $set->set('x-dead-letter-routingkey', ‘routingkey'); $channel->queue_declare("queue-1", false, false, false, true, false, $set, null);