系列說明
本系列主要講解RabbitMQ,講解其特性,例如消息持久化、消息TTL、消息的優先、延遲消息、消息可靠性、消費模式以及在Spring Boot中使用RabbitMQ,代碼在我的Github上
RabbitMQ介紹
RabbitMQ使用Erlang語言開發基於AQMP協議的開源消息隊列,RabbitMQ主要有以下特點:
- 高可靠性: RabbitMQ依靠消息確認、持久化等實現高可靠性,但其吞吐量不太高
- 高可用: RabbitMQ支持分布式部署,多個RabbitMQ服務器組成一個集群形成一個邏輯Broker
- 多種協議支持: RabbitMQ基於AQMP協議,但是可以通過安裝插件支持其它協議,例如STOMP、MQTT協議等
- 多種客戶端語言支持: RabbitMQ提供Java、C++等多種客戶端語言支持
- 管理頁面: RabbitMQ提供Web管理頁面以便可視化管理
AQMP
RabbitMQ基於AQMP協議開發的消息隊列,AQMP協議在之前消息隊列(一)中已經簡單的介紹了,這里就簡單的介紹一下:
- Broker: Broker指的是代理消息隊列,是一個邏輯概念,指的是RabbitMQ服務器,其可以有多個Vritual Host組成。
- Virtual Host: Vritual Host是一個虛擬概念,類似於權限控制組,一個Vritual Host可以有若干Exchange和Queue,權限控制的最小粒度是Virtual Host
- Exchange: Exchange叫交換機,其可以多個Queue根據路由規則(Routing Key)綁定。Exchange接收生產者發送的消息,根據其類型(ExchangeType)和路由規則(Routing Key)把消息發送給隊列。
- Bingding: Binding聯系Exchange和Queue
- Connection: Connection在RabbitMQ中是一個客戶端和Broker之間的TCP連接
- Channel: Channel在RabbitMQ中叫做信道,有Connection創建,並且一個Connection可以創建多個Channel。在RabbitMQ必須通過Channel才能發送消息。之所以需要Channel,主要因為TCP連接過於昂貴。
需要注意的地方:
- 如果消息發送到Exchange后,Exchange不能通過路由規則找到合適的隊列,該消息將會被刪除
- RabbitMQ建議客戶端線程之間不要共用Channel,而是共用Connection
RabbitMQ使用
RabbitMQ-Java官方提供了簡單的使用教程,這里就簡單的提一下,具體可見其網友翻譯版本:RabbitMQ入門教程
這里展示的是RabbitMQ發送消息
public class Sender {
private static final String EXCHANGE_NAME = "log";
private static final String ROUTING_KEY = "info";
private static final String MESSAGE = "hello world!";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.100.20.186");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 通過連接工廠創建連接
Connection connection = factory.newConnection();
// 通過Connection創建Channel
Channel channel = connection.createChannel();
// 聲明Exchange:名稱及其類型,該操作同樣是冪等的,如何聲明對隊列一樣
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 通過Channel向Exchange發送消息和Routing Key,並且配置了BasicProperties(消息屬性)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_BASIC, MESSAGE.getBytes(StandardCharsets.UTF_8));
// 關閉Channel和Connection
channel.close();
connection.close();
}
}
這里展示使用RabbitMQ接收消息
public class Receiver {
private static final String QUEUE_NAME = "log_info_queue";
private static final String EXCHANGE_NAME = "log";
private static final String ROUTING_KEY = "info";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.100.20.186");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 通過連接工廠創建連接
Connection connection = factory.newConnection();
// 通過Connection創建Channel
Channel channel = connection.createChannel();
// 聲明一個隊列 -- 在RabbitMQ中,隊列聲明是冪等性的
// 一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同
// 也就是說,如果不存在,就創建,如果存在,不會對已經存在的隊列產生任何影響
// 但是如果聲明時修改已存在隊列的屬性,則會拋出異常
channel.queueDeclare(QUEUE_NAME, false, false , false, null);
// 把Queue和Exchange通過Routing Key綁定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 設置該消費者預讀取消息數量:這里主要是考慮到慢消費的問題,這里使用PUSH模型,服務器推消息給客戶端,
// 可能會導致消息堆積,設置預讀取數量后,服務器會發送指定數量消息后等待前面消息ACK后才會繼續發送消息
channel.basicQos(1);
// 接收消息:這里使用自動ACK,當然也可以獲取消息后手動ACK
String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
}
});
}
}
RabbitMQ的消息傳遞模型
RabbitMQ主要通過ExchangeType來設置消息傳遞模型,主要有下面4種模型,其中Header模型用的少:
- Direct模型
- Fanout模型
- Topic模型
- Header模型
Direct模型
Direct模型顧名思義指的是直接連接,只有當消息中的Routing Key與Queue綁定到Exchange的Routing Key一致,才會轉發消息給該Queue
Fanout模型
Fanout模型類似於訂閱/發布模型,Exchange會把消息轉發給所有綁定到該Exchange上的Queue
Topic模型
Topic模型類型與Servlet的URL匹配模型,其會匹配消息的Routing Key和Queue綁定到Exchange的Routing Key,使用通配符匹配。有#和兩種通配符,#代表0個或多個字符,代表1個字符
RabbitMQ的持久化
首先RabbitMQ的持久化是異步持久化模型,也就是說在特定情況下,可能造成消息丟失。比如在RabbitMQ Server回調RabbitMQ Producer Client的接口表明已經接收到該消息,但是由於是異步持久化可能還沒有把消息持久化到磁盤中,這時候MQ-Server斷電就會導致消息的丟失
RabbitMQ中消息的持久化需要保證Exchange、Queue、Message都進行持久化操作。需要注意的是:Exchange、Queue的聲明時冪等的。冪等指說多次聲明產生的結果都是一樣,也就是說如果其不存在則創建,存在則返回且不會對其產生任何影響,但是如果聲明已存在的隊列,且其屬性不同則會拋出異常。
Exchange的持久化
RabbitMQ聲明Exchange有幾種方法,但主要使用下面方法,其中第三個參數表示是否將該Exchange持久化
/**
* Actively declare a non-autodelete exchange with no extra arguments
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Queue的持久化
RabbitMQ聲明Queue與Exchange的方法類型,同樣使用durable參數表示是否將該Queue進行持久化操作,下面是其中一個方法
/**
* Declare a queue
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
部分參數說明:
- exclusive:表明該隊列是否是排它隊列。如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
- autoDelete:表明該隊列是否自動刪除。自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
Message的持久化
消息的持久化需要在生產者發送消息時設置消息屬性,以表明該消息時持久化消息。下面是消息發送的一個API
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
部分參數說明:
- exchange:表示該消息發送到哪個Exchange
- routingKey:表示該消息的Routing Key
- props:表示該消息的屬性
- body:消息實體
BasicProperties定義如下:
public BasicProperties(
String contentType,//消息類型如:text/plain
String contentEncoding,//編碼
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//優先級
String correlationId,
String replyTo,//反饋隊列
String expiration,//expiration到期時間
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)
其中RabbitMQ提供了屬性實現已經更簡單的配置消息屬性:
/** Empty basic properties, with no fields set */
BasicProperties.MINIMAL_BASIC
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
BasicProperties.MINIMAL_PERSISTENT_BASIC
/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
BasicProperties.BASIC
/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
BasicProperties.PERSISTENT_BASIC
/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
BasicProperties.TEXT_PLAIN
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
BasicProperties.PERSISTENT_TEXT_PLAIN
當然可以使用時編程自定義設置消息屬性:
AMQP.BasicProperties.Builder builder = new Builder();
BasicProperties properties = builder
.deliveryMode(2)
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, MESSAGE.getBytes(StandardCharsets.UTF_8));
消息什么時候刷到磁盤(持久化)
寫入文件前會有一個Buffer,大小為1M,數據在寫入文件時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。
有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25ms,Buffer里的數據及未刷新到磁盤的文件內容必定會刷到磁盤。
每次消息寫入后,如果沒有后續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱里沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操作。
TTL
TTL(Time To Live)表示存活時間。RabbitMQ中可以對Queue和Message設置TTL,以控制Queue和Message的存活時間。
Queue TTL
隊列的存活時間指的是Queue在自動刪除前可以處於未使用狀態的時間。未使用狀態指的是Queue上沒有Consumer、Queue沒有被重新聲明。隊列的存活時間在隊列第一次聲明時通過指定隊列的屬性"x-expires"指定,單位是毫秒,代碼如下:
Map<String, Object> queueArgs = new HashMap<>();
// 設置1分鍾過期
queueArgs.put("x-expires", 60000);
channel.queueDeclare("queue", false, false, false, queueArgs);
Message TTL
消息的存活時間指的是消息在隊列中的存活時間,超過該時間消息將被刪除或者不能傳遞給消費者。消息的存活時間可以通過設置每條消息的存活時間或者設置某條隊列中的所以存活時間,當兩者都有時,時間小的有效。
設置消息屬性
針對每條消息可以在發送消息時設置消息屬性
// 設置消息屬性-TTL為30s
BasicProperties properties = new BasicProperties.Builder()
.expiration("30000").build();
channel.basicPublish("exchange", "kanyuxia", properties,
"hello".getBytes(StandardCharsets.UTF_8));
設置隊列屬性
通過設置隊列中消息的TTL屬性,然后傳入該隊列的所有消息都有該TTL屬性
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-message-ttl", 30000);
channel.queueDeclare("queue", false, false, false, queueArgs);
Reference
https://www.jianshu.com/p/64357bf35808?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
http://blog.csdn.net/wanbf123/article/details/78052419
http://blog.csdn.net/u013256816/article/details/60875666
http://blog.csdn.net/u013256816/article/details/54916011