輕松搞定RabbitMQ(三)——消息應答與消息持久化


轉自 http://blog.csdn.net/xiaoxian8023/article/details/48710653

 

 這個官網的第二個例子中的消息應答和消息持久化部分。我把它摘出來作為單獨的一塊兒來分享。


Message acknowledgment(消息應答)

       執行一個任務可能需要花費幾秒鍾,你可能會擔心如果一個消費者在執行任務過程中掛掉了。基於現在的代碼,一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果殺死正在執行任務的消費者,會丟失正在處理的消息,也會丟失已經分發給這個消費者但尚未處理的消息。

       但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。

       為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ可以刪除它了。

       如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會不丟失任何消息了。

       沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。

       消息應答是默認打開的。我們明確地把它們關掉了(autoAck=true)。現在將應答打開,一旦我們完成任務,消費者會自動發送消息應答。

 

[java] view plain copy
 
  1. boolean autoAck = false;  
  2. channel.basicConsume(QUEUE_NAME, autoAck, consumer);  

 

       修改一下Worker.java

 

[java] view plain copy
 
  1. channel.basicQos(1);//保證一次只分發一個  
  2. // 創建隊列消費者  
  3. final Consumer consumer = new DefaultConsumer(channel) {  
  4.       @Override  
  5.       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  6.         String message = new String(body, "UTF-8");  
  7.   
  8.         System.out.println(" [x] Received '" + message + "'");  
  9.         System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());  
  10.         try {  
  11.             for (char ch: message.toCharArray()) {  
  12.                 if (ch == '.') {  
  13.                     Thread.sleep(1000);  
  14.                 }  
  15.             }  
  16.         } catch (InterruptedException e) {  
  17.         } finally {  
  18.           System.out.println(" [x] Done! at " +new Date().toLocaleString());  
  19.           channel.basicAck(envelope.getDeliveryTag(), false);  
  20.         }  
  21.       }  
  22.     };  

       我們還是運行1個生產者,2個消費者,在消息處理過程中,人為讓一個消費者掛掉,然后會看到剩下的任務都會被另外的消費者執行。

       運行結果如下:


       如果你關閉了自動消息應答,手動也未設置應答,這是一個很簡單錯誤,但是后果卻是極其嚴重的。消息在分發出去以后,得不到回應,所以不會在內存中刪除,結果RabbitMQ會越來越占用內存,最終的結果,你懂得。。。


Message durability(消息持久化)

       我們已經了解了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍將失去!
       當RabbitMQ退出或者崩潰,將會丟失隊列和消息。除非你不要隊列和消息。兩件事兒必須保證消息不被丟失:我們必須把“隊列”和“消息”設為持久化。      

 

[java] view plain copy
 
  1. boolean durable = true;  
  2. channel.queueDeclare("hello", durable, false, false, null);  

       盡管這行代碼是正確的,但他不會在我們當前的設置中起作用。因為我們已經定義了一個名叫hello的未持久化的隊列。RabbitMQ不允許使用不同的參數設定重新定義已經存在的隊列,並且會向嘗試如此做的程序返回一個錯誤。一個快速的解決方案——就是聲明一個不同名字的隊列,比如task_queue。

 

       (當然,我們也可以登錄到RabbitMQ的服務管理頁面,RabbitMQ默認的端口是5672,管理頁面默認端口是15672,頁面地址為:http://localhost:15672,使用是用戶名和密碼登錄。RabbitMQ的默認密碼和用戶名都是guest。點開“queue”那欄,可以看到隊列列表,點擊“hello”杜列,會展開隊列的詳細信息。把頁面拉到最后,有一項“Delete / purge”,點開,點擊“Delete”按鈕,就可以把隊列刪除掉了。 然后再運行代碼的時候,就會創建一個支持持久化的hello隊列。)

       上述的代碼需要在生產者和消費者都要作出同樣的修改。

       在這一點上我們確信task_queue的隊列不會丟失,即使RabbitMQ服務重啟。現在我們需要將消息標記為持久性的——通過設置 MessageProperties(實現BasicProperties)為PERSISTENT_TEXT_PLAIN。

       現在你可以啟動RabbitMQ服務器,執行一次生產者NewTask的程序,然后關閉服務器,再重新啟動服務器,運行消費者Work做下實驗。可以發現消費者依舊可以讀出消息來。說明在RabbitMQ服務器關閉后,消息和隊列信息都已經做了持久化。再次啟動后,會重新加載到服務器中,消費者運行后,就可以正常的從隊列中獲取消息了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM