從本節開始稱Sender為生產者 , Recv為消費者
一、消息確認
為了確保消息一定被消費者處理,rabbitMQ提供了消息確認功能,就是在消費者處理完任務之后,就給服務器一個回饋,服務器就會將該消息刪除,如果消費者超時不回饋,那么服務器將就將該消息重新發送給其他消費者
默認是開啟的,在消費者端通過下面的方式開啟消息確認, 首先將autoAck自動確認關閉,等我們的任務執行完成之后,手動的去確認,類似JDBC的autocommit一樣
QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer);
在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到消息后,就會自動反饋一個消息給服務器。
下面這個例子來測試消息確認的功能。
Sender03.java
1 package com.zf.rabbitmq03; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 /** 10 * 發送消息 11 * @author zhoufeng 12 * 13 */ 14 public class Sender03 { 15 16 public static void main(String[] args) throws IOException { 17 18 19 ConnectionFactory connFac = new ConnectionFactory() ; 20 21 //RabbitMQ-Server安裝在本機,所以直接用127.0.0.1 22 connFac.setHost("127.0.0.1"); 23 24 //創建一個連接 25 Connection conn = connFac.newConnection() ; 26 27 //創建一個渠道 28 Channel channel = conn.createChannel() ; 29 30 //定義Queue名稱 31 String queueName = "queue01" ; 32 33 //為channel定義queue的屬性,queueName為Queue名稱 34 channel.queueDeclare( queueName , false, false, false, null) ; 35 36 String msg = "Hello World!"; 37 38 //發送消息 39 channel.basicPublish("", queueName , null , msg.getBytes()); 40 41 System.out.println("send message[" + msg + "] to "+ queueName +" success!"); 42 43 channel.close(); 44 conn.close(); 45 46 } 47 48 }
與Sender01.java一樣,沒有什么區別。
Recv03.java
1 package com.zf.rabbitmq03; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer; 10 import com.rabbitmq.client.QueueingConsumer.Delivery; 11 import com.rabbitmq.client.ShutdownSignalException; 12 13 /** 14 * 接收消息 15 * @author zhoufeng 16 * 17 */ 18 public class Recv03 { 19 20 public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 21 22 ConnectionFactory connFac = new ConnectionFactory() ; 23 24 connFac.setHost("127.0.0.1"); 25 26 Connection conn = connFac.newConnection() ; 27 28 Channel channel = conn.createChannel() ; 29 30 String channelName = "channel01"; 31 32 channel.queueDeclare(channelName, false, false, false, null) ; 33 34 35 //配置好獲取消息的方式 36 QueueingConsumer consumer = new QueueingConsumer(channel) ; 37 38 39 //取消 autoAck 40 boolean autoAck = false ; 41 42 channel.basicConsume(channelName, autoAck, consumer) ; 43 44 //循環獲取消息 45 while(true){ 46 47 //獲取消息,如果沒有消息,這一步將會一直阻塞 48 Delivery delivery = consumer.nextDelivery() ; 49 50 String msg = new String(delivery.getBody()) ; 51 52 //確認消息,已經收到 53 channel.basicAck(delivery.getEnvelope().getDeliveryTag() 54 , false); 55 56 System.out.println("received message[" + msg + "] from " + channelName); 57 } 58 59 } 60 61 }
注意:一旦將autoAck關閉之后,一定要記得處理完消息之后,向服務器確認消息。否則服務器將會一直轉發該消息
如果將上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);注釋掉, Sender03.java只需要運行一次 , Recv03.java每次運行將都會收到HelloWorld消息
注意:
但是這樣還是不夠的,如果rabbitMQ-Server突然掛掉了,那么還沒有被讀取的消息還是會丟失 ,所以我們可以讓消息持久化。 只需要在定義Queue時,設置持久化消息就可以了,方法如下:
boolean durable = true; channel.queueDeclare(channelName, durable, false, false, null);
這樣設置之后,服務器收到消息后就會立刻將消息寫入到硬盤,就可以防止突然服務器掛掉,而引起的數據丟失了。 但是服務器如果剛收到消息,還沒來得及寫入到硬盤,就掛掉了,這樣還是無法避免消息的丟失。
二、公平調度
上一個例子能夠實現發送一個Message與接收一個Message
從上一個Recv01中可以看出,必須處理完一個消息,才會去接收下一個消息。如果生產者眾多,那么一個消費者肯定是忙不過來的。此時就可以用多個消費者來對同一個Channel的消息進行處理,並且要公平的分配任務給多個消費者。不能部分很忙 部分總是空閑
實現公平調度的方式就是讓每個消費者在同一時刻會分配一個任務。 通過channel.basicQos(1);可以設置