轉載請注明出處
0.目錄
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ
1.簡介
RabbitMQ中,消息丟失可以簡單的分為兩種:客戶端丟失和服務端丟失。針對這兩種消息丟失,RabbitMQ都給出了相應的解決方案。
2.防止客戶端丟失消息
如圖,生產者P向隊列中生產消息,C1和C2消費隊列中的消息,默認情況下,RabbitMQ會平均的分發消費給C1C2(Round-robin dispatching),假設一個任務的執行時間非常長,在執行過程中,客戶端掛了(連接斷開),那么,該客戶端正在處理且未完成的消息,以及分配給它還沒來得及執行的消息,都將丟失。因為默認情況下,RabbitMQ分發完消息后,就會從內存中把消息刪除掉。
3.消息確認(Message acknowledgment)
為了解決上述問題,RabbitMQ引入了消息確認機制,當消息處理完成后,給Server端發送一個確認消息,來告訴服務端可以刪除該消息了,如果連接斷開的時候,Server端沒有收到消費者發出的確認信息,則會把消息轉發給其他保持在線的消費者。
驗證上述問題
首先,我們驗證上述問題(客戶端丟失消息)是否真的存在,對Consumer進行如下改造。
先生產兩條消息
啟動消費者,在消費者接收到消息,還沒處理完成的時候,強制關掉
這時,觀察控制台,發現兩條消息都沒有了,1條是在執行中丟失的,還有1條,已經分配給這個Consumer,還沒來得及處理,也丟失了
這證明了上述問題是真的存在的,如果發生在生產環境,將產生難以預料的后果
引入消息確認機制
為了方便觀察,我們用CMD來運行Consumer,要通過maven打成可執行的JAR包,需要在pom.xml中增加如下配置
<build> <finalName>Consumer</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.liyang.ticktock.rabbitmq.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
上述配置描述了最終打包名字、入口類路徑、帶上依賴包、使用1.8版本的JDK進行打包,配置完后,就可以通過maven的install方法,在target目錄生成可執行的jar包,如果包大小很小,應檢查配置,是不是沒有帶上依賴包
再次改造Consummer類
install成可執行jar包,通過cmd開啟兩個consumer
通過Sender發送一條消息,然后用Ctrl+C結束先收到消息的Consumer,發現另外一個Consumer接收到了未處理完的消息
問題得到了解決,現在消費者在執行過程中死掉也不會丟失消息了
看一下發送確認的方法
1 /** 2 * Acknowledge one or several received 3 * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} 4 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method 5 * containing the received message being acknowledged. 6 * @see com.rabbitmq.client.AMQP.Basic.Ack 7 * @param deliveryTag the tag from the received 這個是RabbitMQ用來區分消息的,文檔在這 8 * @param multiple true to acknowledge all messages up to and 為true的話,確認所有消息,為false只確認當前消息 9 * including the supplied delivery tag; false to acknowledge just 10 * the supplied delivery tag. 11 * @throws java.io.IOException if an error is encountered 12 */ 13 void basicAck(long deliveryTag, boolean multiple) throws IOException;
在官方文檔中,這樣描述deliveryTag
簡單來說,就是RabbitMQ內部用來區分消息的一個標簽,從envelope中獲取就行了
忘記確認將引起內存泄漏
RabbitMQ只有在收到消費者確認后,才會從內存中刪除消息,如果消費者忘了確認(更多情況是因為代碼問題沒有執行到確認的代碼),將會導致內存泄漏
驗證一下
注釋掉Consumer中的確認代碼
運行Sender和Consumer,不停的生產消費消息,發現消費者在正常的消費消息
查看控制台,發現已經被吃掉了43KB的內存,所以,在試用過程中,一定要保證消息確認在任何情況下都可以發出,否則即使消費者處理完成,RabbitMQ也不會把消息在內存中清除,在該消費者斷開連接之后,還會把消息轉發給其他消費者重新處理,將引發難以預計的問題
4.消息的持久化
現在,消費者宕機已經無法影響到我們的消息了,但如果RabbitMQ重啟了,消息依然會丟失。所幸的是,RabbitMQ提供了持久化的機制,將內存中的消息持久化到硬盤上,即使重啟RabbitMQ,消息也不會丟失。但是,仍然有一個非常短暫的時間窗口(RabbitMQ收到消息還沒來得及存到硬盤上)會導致消息丟失,如果需要嚴格的控制,可以參考官方文檔
要使用RabbitMQ的消息持久化,在聲明隊列時設置一個參數即可
注意,RabbitMQ不允許對一個已經存在的隊列用不同的參數重新聲明,對於試圖這么做的程序,會報錯,所以,改動之前代碼之前,要在控制台中把原來的隊列刪除
重新聲明隊列后,發現Durable為true
重啟RabbitMQ
隊列的消息沒有丟失
5.結束語
這一章介紹了RabbitMQ消息的確認和持久化,后面將會繼續深入介紹RabbitMQ的其他特性
http://www.cnblogs.com/4----/p/6526033.html