轉自 http://blog.csdn.net/xiaoxian8023/article/details/48710653
這個官網的第二個例子中的消息應答和消息持久化部分。我把它摘出來作為單獨的一塊兒來分享。
Message acknowledgment(消息應答)
執行一個任務可能需要花費幾秒鍾,你可能會擔心如果一個消費者在執行任務過程中掛掉了。基於現在的代碼,一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果殺死正在執行任務的消費者,會丟失正在處理的消息,也會丟失已經分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ可以刪除它了。
如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會不丟失任何消息了。
沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。
消息應答是默認打開的。我們明確地把它們關掉了(autoAck=true)。現在將應答打開,一旦我們完成任務,消費者會自動發送消息應答。
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME, autoAck, consumer);
修改一下Worker.java
- channel.basicQos(1);//保證一次只分發一個
- // 創建隊列消費者
- final Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
- try {
- for (char ch: message.toCharArray()) {
- if (ch == '.') {
- Thread.sleep(1000);
- }
- }
- } catch (InterruptedException e) {
- } finally {
- System.out.println(" [x] Done! at " +new Date().toLocaleString());
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
我們還是運行1個生產者,2個消費者,在消息處理過程中,人為讓一個消費者掛掉,然后會看到剩下的任務都會被另外的消費者執行。
運行結果如下:
如果你關閉了自動消息應答,手動也未設置應答,這是一個很簡單錯誤,但是后果卻是極其嚴重的。消息在分發出去以后,得不到回應,所以不會在內存中刪除,結果RabbitMQ會越來越占用內存,最終的結果,你懂得。。。
Message durability(消息持久化)
我們已經了解了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍將失去!
當RabbitMQ退出或者崩潰,將會丟失隊列和消息。除非你不要隊列和消息。兩件事兒必須保證消息不被丟失:我們必須把“隊列”和“消息”設為持久化。
- boolean durable = true;
- channel.queueDeclare("hello", durable, false, false, null);
盡管這行代碼是正確的,但他不會在我們當前的設置中起作用。因為我們已經定義了一個名叫hello的未持久化的隊列。RabbitMQ不允許使用不同的參數設定重新定義已經存在的隊列,並且會向嘗試如此做的程序返回一個錯誤。一個快速的解決方案——就是聲明一個不同名字的隊列,比如task_queue。
(當然,我們也可以登錄到RabbitMQ的服務管理頁面,RabbitMQ默認的端口是5672,管理頁面默認端口是15672,頁面地址為:http://localhost:15672,使用是用戶名和密碼登錄。RabbitMQ的默認密碼和用戶名都是guest。點開“queue”那欄,可以看到隊列列表,點擊“hello”杜列,會展開隊列的詳細信息。把頁面拉到最后,有一項“Delete / purge”,點開,點擊“Delete”按鈕,就可以把隊列刪除掉了。 然后再運行代碼的時候,就會創建一個支持持久化的hello隊列。)
上述的代碼需要在生產者和消費者都要作出同樣的修改。
在這一點上我們確信task_queue的隊列不會丟失,即使RabbitMQ服務重啟。現在我們需要將消息標記為持久性的——通過設置 MessageProperties(實現BasicProperties)為PERSISTENT_TEXT_PLAIN。
現在你可以啟動RabbitMQ服務器,執行一次生產者NewTask的程序,然后關閉服務器,再重新啟動服務器,運行消費者Work做下實驗。可以發現消費者依舊可以讀出消息來。說明在RabbitMQ服務器關閉后,消息和隊列信息都已經做了持久化。再次啟動后,會重新加載到服務器中,消費者運行后,就可以正常的從隊列中獲取消息了。