一.問題的引出
RabbitMQ的一大特色是消息的可靠性,那么它是如何保證消息可靠性的呢?——消息持久化。為了保證RabbitMQ在退出,服務重啟或者crash等異常情況下,也不會丟失消息,我們可以將Queue,Exchange,Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。當然還是會有一些小概率事件會導致消息丟失。
二.Queue的持久化
1.查看存在的隊列和消息數量
在windows環境下,在rabbitmq的安裝目錄/sbin下,通過rabbitmqctl.bat list_queues查看
這邊啟動了兩個producer,分別生成兩個隊列hello 和 hello1,並且他們都有一個消息存在
重啟RabbitMQ Server,模擬故障
可以看到重啟后兩個隊列都消失了.
2.持久化隊列
Queue的持久化是通過durable=true來實現的。
一般程序中這么使用:
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //關鍵的是第二個參數設置為true,即durable=true. channel.queueDeclare("queue.persistent.name", true, false, false, null);
- 1
- 2
- 3
- 4
Channel類中queueDeclare的完整定義如下:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
參數說明:
queue:queue的名稱
exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
queueDeclare相關的有4種方法,分別是:
Queue.DeclareOk queueDeclare() throws IOException; Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
其中需要說明的是queueDeclarePassive(String queue)可以用來檢測一個queue是否已經存在。如果該隊列存在,則會返回true;如果不存在,就會返回異常,但是不會創建新的隊列。
我們就hello隊列持久化,在聲明隊列名稱時,持久化隊列,生產端和消費端都要.
我們重復上面的操作,但是給hello隊列做持久化,而hello1不做,並重啟rabbitmq.
可以看到重啟后,hello隊列還在,hello1隊列消失了,但是原本hello中的一條消息也沒有保存下來。所以在這邊我們僅僅做到了消息隊列的持久化,還沒有做消息持久化。
三.Message的持久化
如果將Queue的持久化標識durable設置為true,則代表是一個持久的隊列,那么在服務重啟之后,也會存在,因為服務會把持久化的queue存放在硬盤上,當服務重啟的時候,會重新加載之前被持久化的queue。隊列是可以被持久化,但是里面的消息是否為持久化那還要看消息的持久化設置。也就是說,重啟之前那個Queue里面還有沒發出去的消息的話,重啟之后那隊列里面是不是還存在原來的消息,這個就要取決於發送者在發送消息時對消息的設置了。
如果要在重啟后保持消息的持久化必須設置消息是持久化的標識。
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
- 1
這里的關鍵是:MessageProperties.PERSISTENT_TEXT_PLAIN
首先看一下basicPublish的方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- 1
- 2
- 3
- 4
- 5
exchange表示exchange的名稱
routingKey表示routingKey的名稱
body代表發送的消息體
有關mandatory和immediate的詳細解釋可以參考:RabbitMQ之mandatory和immediate
這里關鍵的是BasicProperties props這個參數了,這里看下BasicProperties的定義:
public BasicProperties( String contentType,//消息類型如:text/plain String contentEncoding,//編碼 Map<String,Object> headers, //這里的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。 Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//優先級 String correlationId, String replyTo,//反饋隊列 String expiration,//expiration到期時間 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
上面的實現代碼使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那么這個又是什么呢?
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
可以看到這其實就是講deliveryMode設置為2的BasicProperties的對象,為了方便編程而出現的一個東東。 換一種實現方式:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
- 1
- 2
- 3
- 4
設置了隊列和消息的持久化之后,當broker服務重啟的之后,消息依舊存在。單只設置隊列持久化,重啟之后消息會丟失;單只設置消息的持久化,重啟之后隊列消失,既而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無意義。
再以上面例子,生產端生成一個消息,並重啟rabbitmq.
可以看到,經過隊列和消息持久化后的hello, 在重啟的情況下,隊列和消息都存在,沒有消失,消費端再重啟后也是能正常接收的.
四.Exchange的持久化
上面闡述了隊列的持久化和消息的持久化,如果不設置exchange的持久化對消息的可靠性來說沒有什么影響,但是同樣如果exchange不設置持久化,那么當broker服務重啟之后,exchange將不復存在,那么既而發送方rabbitmq producer就無法正常發送消息。這里博主建議,同樣設置exchange的持久化。exchange的持久化設置也特別簡單,方法如下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在聲明的時候講durable字段設置為true即可。
五.關於Message的持久化的更進一步探討
1.消息什么時候需要持久化?
根據 官方博文(http://www.rabbitmq.com/blog/2011/01/20/rabbitmq-backing-stores-databases-and-disks/) 的介紹,RabbitMQ在兩種情況下會將消息寫入磁盤:
- 消息本身在publish的時候就要求消息寫入磁盤
- 內存緊張,需要將部分內存中的消息轉移到磁盤
2.消息什么時候會刷到磁盤?
- 寫入文件前會有一個Buffer,大小為1M(1048576),數據在寫入文件時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)
- 有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每隔25ms,Buffer里的數據及未刷新到磁盤的文件內容必定會刷到磁盤
- 每次消息寫入后,如果沒有后續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0來實現,只要進程的信箱里沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操作
3.消息在磁盤文件中的格式
消息保存於$MNESIA/msg_store_persistent/x.rdq文件中,其中x為數字編號,從0開始,每個文件最大為16M(16777216),超過這個大小會生成新的文件,文件編號加1。消息以以下格式存在於文件中:
<<Size:64, MsgId:16/binary, MsgBody>>
MsgId為RabbitMQ通過rabbit_guid:gen()每一個消息生成的GUID,MsgBody會包含消息對應的exchange,routing_keys,消息的內容,消息對應的協議版本,消息內容格式(二進制還是其它)等等。
4.文件何時刪除?
當所有文件中的垃圾消息(已經被刪除的消息)比例大於閾值(GARBAGE_FRACTION = 0.5)時,會觸發文件合並操作(至少有三個文件存在的情況下),以提高磁盤利用率。
publish消息時寫入內容,ack消息時刪除內容(更新該文件的有用數據大小),當一個文件的有用數據等於0時,刪除該文件。
5.將queue,exchange, message等都設置了持久化之后就能保證100%保證數據不丟失了嚒?
答案是否定的。
首先,從consumer端來說,如果這時autoAck=true,那么當consumer接收到相關消息之后,還沒來得及處理就crash掉了,那么這樣也算數據丟失,這種情況也好處理,只需將autoAck設置為false(方法定義如下),然后在正確處理完消息之后進行手動ack(channel.basicAck).
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- 1
其次,關鍵的問題是消息在正確存入RabbitMQ之后,還需要有一段時間(這個時間很短,但不可忽視)才能存入磁盤之中,RabbitMQ並不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤上,在這段時間內RabbitMQ broker發生crash, 消息保存到cache但是還沒來得及落盤,那么這些消息將會丟失。那么這個怎么解決呢?首先可以引入RabbitMQ的mirrored-queue即鏡像隊列,這個相當於配置了副本,當master在此特殊時間內crash掉,可以自動切換到slave,這樣有效的保障了HA, 除非整個集群都掛掉,這樣也不能完全的100%保障RabbitMQ不丟消息,但比沒有mirrored-queue的要好很多,很多現實生產環境下都是配置了mirrored-queue的。還有要在producer引入事務機制或者Confirm機制來確保消息已經正確的發送至broker端,有關RabbitMQ的事務機制或者Confirm機制可以參考:RabbitMQ之消息確認機制(事務+Confirm). 幸虧本文的主題是討論RabbitMQ的持久化而不是可靠性,不然就一發不可收拾了。RabbitMQ的可靠性涉及producer端的確認機制、broker端的鏡像隊列的配置以及consumer端的確認機制,要想確保消息的可靠性越高,那么性能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取舍。
相關參考鏈接:http://jzhihui.iteye.com/blog/1642324
消息中間件收錄集:https://blog.csdn.net/u013256816/article/details/54743481