java使用RabbitMQ要點知識


轉載於:https://blog.csdn.net/LeiXiaoTao_Java/article/details/78924863

1、maven依賴


  
  
  
          
  1. <dependency>
  2. <groupId>commons-lang</groupId>
  3. <artifactId>commons-lang</artifactId>
  4. <version> 2.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.rabbitmq</groupId>
  8. <artifactId>amqp-client</artifactId>
  9. <version> 3.4.1</version>
  10. </dependency>


2、RabbitMQ重要方法介紹(基本常用的)

2.1、創建連接


  
  
  
          
  1. // 創建連接工廠
  2. ConnectionFactory cf = new ConnectionFactory();
  3. // 設置rabbitmq服務器IP地址
  4. cf.setHost( "*.*.*.*");
  5. // 設置rabbitmq服務器用戶名
  6. cf.setUsername( "***");
  7. // 設置rabbitmq服務器密碼
  8. cf.setPassword( "***");
  9. // 指定端口,默認5672
  10. cf.setPort(AMQP.PROTOCOL.PORT);
  11. // 獲取一個新的連接
  12. connection = cf.newConnection();
  13. // 創建一個通道
  14. channel = connection.createChannel();
  15. //關閉管道和連接
  16. channel.close();
  17. connection.close();

2.2、聲明隊列


  
  
  
          
  1. /**
  2. * 申明一個隊列,如果這個隊列不存在,將會被創建
  3. * @param queue 隊列名稱
  4. * @param durable 持久性:true隊列會再重啟過后存在,但是其中的消息不會存在。
  5. * @param exclusive 是否只能由創建者使用,其他連接不能使用。
  6. * @param autoDelete 是否自動刪除(沒有連接自動刪除)
  7. * @param arguments 隊列的其他屬性(構造參數)
  8. * @return Queue.DeclareOk:宣告隊列的聲明確認方法已成功聲明。
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.queueDeclare( "testQueue", true, false, false, null);


此方法一般由Producer調用創建消息隊列。如果由Consumer創建隊列,有可能Producer發布消息的時候Queue還沒有被創建好,會造成消息丟失的情況。

 

2.3、聲明Exchange


  
  
  
          
  1. /**
  2. * 聲明一個 exchange.
  3. * @param exchange 名稱
  4. * @param type exchange type:direct、fanout、topic、headers
  5. * @param durable 持久化
  6. * @param autoDelete 是否自動刪除(沒有連接自動刪除)
  7. * @param arguments 隊列的其他屬性(構造參數)
  8. * @return 成功地聲明了一個聲明確認方法來指示交換。
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.exchangeDeclare( "leitao", "topic", true, false, null);

2.4、將queue和Exchange進行綁定(Binding)


  
  
  
          
  1. /**
  2. * 將隊列綁定到Exchange,不需要額外的參數。
  3. * @param queue 隊列名稱
  4. * @param exchange 交換機名稱
  5. * @param routingKey 路由關鍵字
  6. * @return Queue.BindOk:如果成功創建綁定,則返回綁定確認方法。
  7. * @throws java.io.IOException if an error is encountered
  8. */
  9. channel.queueBind( "testQueue", "leitao", "testRoutingKey");

2.5、發布消息


  
  
  
          
  1. /**
  2. * 發布一條不用持久化的消息,且設置兩個監聽。
  3. * @param exchange 消息交換機名稱,空字符串將使用直接交換器模式,發送到默認的Exchange=amq.direct。此狀態下,RoutingKey默認和Queue名稱相同
  4. * @param routingKey 路由關鍵字
  5. * @param mandatory 監聽是否有符合的隊列
  6. * @param immediate 監聽符合的隊列上是有至少一個Consumer
  7. * @param BasicProperties 設置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化。
  8. * @param body 消息對象轉換的byte[]
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.basicPublish( "",queueName, true, false,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));

exchange的值為空字符串或者是amq.direct時,此時的交換器類型默認是direct類型可以不用單獨聲明Exchange,也不用單獨進行Binding,系統默認將queue名稱作為RoutingKey進行了綁定。

 

兩個傳入參數的含義

mandatory

當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置為false時,出現上述情形broker會直接將消息扔掉。

immediate

當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對於的queue上沒有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。

概括來說,mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。

注意:在RabbitMQ3.0以后的版本里,去掉了immediate參數的支持,發送帶immediate=true標記的publish會返回如下錯誤:

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error;protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)。

為什么取消支持:immediate標記會影響鏡像隊列性能,增加代碼復雜性,並建議采用“TTL”和“DLX”等方式替代。

 

2.6、接收消息


  
  
  
          
  1. /**
  2. * 設置消費批量投遞數目,一次性投遞10條消息。當消費者未確認消息累計達到10條時,rabbitMQ將不會向此Channel上的消費者投遞消息,直到未確認數小於10條再投遞
  3. * @param prefetchCount 投遞數目
  4. * @param global 是否針對整個Channel。true表示此投遞數是給Channel設置的,false是給Channel上的Consumer設置的。
  5. * @throws java.io.IOException if an error is encountered
  6. */
  7. channel.basicQos( 10, false);
  8. //整個傳輸管道最多15條,具體分到每個消費者身上又不能大於10條
  9. channel.basicQos( 15, true);
  10. /**
  11. * 開始一個非局部、非排他性消費, with a server-generated consumerTag.
  12. * 執行這個方法會回調handleConsumeOk方法
  13. * @param queue 隊列名稱
  14. * @param autoAck 是否自動應答。false表示consumer在成功消費過后必須要手動回復一下服務器,如果不回復,服務器就將認為此條消息消費失敗,繼續分發給其他consumer。
  15. * @param callback 回調方法類,一般為自己的Consumer類
  16. * @return 由服務器生成的consumertag
  17. * @throws java.io.IOException if an error is encountered
  18. */
  19. channel.basicConsume(queueName, false, Consumer);

2.7、Consumer處理消息


  
  
  
          
  1. /**
  2. * 消費者收到消息的回調函數
  3. * @param consumerTag 消費者標簽
  4. * @param envelope 消息的包裝數據
  5. * @param properties 消息的內容頭數據
  6. * @param body 消息對象的byte[]
  7. * @throws IOException
  8. */
  9. void handleDelivery(String consumerTag,
  10. Envelope envelope,
  11. AMQP.BasicProperties properties,
  12. byte[] body)
  13. throws IOException;

3、Producer消息確認機制

3.1、什么是生產者消息確認機制?

沒有消息確認模式時,生產者不知道消息是不是已經到達了Broker服務器,這對於一些業務嚴謹的系統來說將是災難性的。消息確認模式可以采用AMQP協議層面提供的事務機制實現(此文沒有這種實現方式),但是會降低RabbitMQ的吞吐量。RabbitMQ自身提供了一種更加高效的實現方式:confirm模式。

消息生產者通過調用Channel.confirmSelect()方法將Channel信道設置成confirm模式。一旦信道被設置成confirm模式,該信道上的所有消息都會被指派一個唯一的ID(從1開始),一旦消息被對應的Exchange接收,Broker就會發送一個確認給生產者(其中deliveryTag就是此唯一的ID),這樣消息生產者就知道消息已經成功到達Broker。

confirm模式最大的好處在於他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息。

在channel 被設置成 confirm 模式之后,所有被 publish 的后續消息都將被 confirm(即 ack) 或者被nack一次。但是沒有對消息被 confirm 的快慢做任何保證,並且同一條消息不會既被 confirm又被nack 。

3.2、開啟confirm模式

如上所說生產者通過調用Channel.confirmSelect()方法將Channel信道設置成confirm模式。

注意:已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的

3.3、普通confirm模式

普通confirm模式是串行的,即每次發送了一次消息,生產者都要等待Broker的確認消息,然后根據確認標記權衡消息重發還是繼續發下一條。由於是串行的,在效率上是比較低下的。

 

(1)重點方法


  
  
  
          
  1. /**
  2. * 等待Broker返回消息確認標記
  3. * 注意,在非確定的通道,waitforconfirms拋出IllegalStateException。
  4. * @return 是否發送成功
  5. * @throws java.lang.IllegalStateException
  6. */
  7. boolean waitForConfirms() throws InterruptedException;

(2 部分使用代碼如下:


  
  
  
          
  1. //注意:返回的時候Return在前,Confirm在后
  2. channel.confirmSelect();
  3. int i= 1;
  4. while (i<= 50) {
  5. //發布消息
  6. channel.basicPublish( "",queueName, true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
  7. //等待Broker的確認回調
  8. if(channel.waitForConfirms())
  9. System.out.println( "send success!");
  10. else
  11. System.out.println( "send error!");
  12. i++;
  13. }

3.4、批量confirm模式

批量confirm模式是異步的方式,效率要比普通confirm模式高許多,但是此種方式也會造成線程阻塞,想要進行失敗重發就必須要捕獲異常。網絡上還有采用waitForConfirms()實現批量confirm模式的,但是只要一條失敗了,就必須把這批次的消息統統再重發一次,非常的消耗性能,因此此文不予考慮。

 

(1)重點代碼


  
  
  
          
  1. /**
  2. * 等待直到所有消息被確認或者某個消息發送失敗。如果消息發送確認失敗了,
  3. * waitForConfirmsOrDie 會拋出IOException異常。當在非確認通道上調用時
  4. * ,會拋出IllegalStateException異常。
  5. * @throws java.lang.IllegalStateException
  6. */
  7. void waitForConfirmsOrDie() throws IOException, InterruptedException;

(2 )部分代碼如下:


  
  
  
          
  1. //注意:返回的時候Return在前,Confirm在后
  2. channel.confirmSelect();
  3. int i= 1;
  4. while (i<= 50) {
  5. //發布消息
  6. channel.basicPublish( "",queueName, true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
  7. i++;
  8. }
  9. channel.waitForConfirmsOrDie();

3.5、ConfirmListener監聽器模式

RabbitMQ提供了一個ConfirmListener接口專門用來進行確認監聽,我們可以實現ConfirmListener接口來創建自己的消息確認監聽。ConfirmListener接口中包含兩個回調方法:


  
  
  
          
  1. /**
  2. * 生產者發送消息到exchange成功的回調方法
  3. */
  4. void handleAck(long deliveryTag, boolean multiple) throws IOException;
  5. /**
  6. * 生產者發送消息到服務器broker失敗的回調方法,服務器丟失了此消息。
  7. * 注意,丟失的消息仍然可以傳遞給消費者,但broker不能保證這一點。
  8. */
  9. void handleNack(long deliveryTag, boolean multiple) throws IOException;

其中deliveryTagBroker給每條消息指定的唯一ID(從1開始);multiple表示是否接收所有的應答消息,比如multiple=true時,發送100條消息成功過后,我們並不會收到100次handleAck方法調用。

 

(1)重要方法


  
  
  
          
  1. //注冊消息確認監聽器
  2. channel.addConfirmListener( new MyConfirmListener());

(2)部分使用代碼如下:


  
  
  
          
  1. //注意:返回的時候Return在前,Confirm在后
  2. channel.confirmSelect();
  3. //注冊消息確認監聽器
  4. channel.addConfirmListener( new MyConfirmListener());
  5. //注冊消息結果返回監聽器
  6. channel.addReturnListener( new MyReturnListener());
  7. int i= 1;
  8. while (i<= 50) {
  9. //發布消息
  10. channel.basicPublish( "",queueName, true,MessageProperties.TEXT_PLAIN,SerializationUtils.
  11. serialize(object));
  12. i++;
  13. }

 


  
  
  
          
  1. //自定義的消息確認監聽器
  2. public class MyConfirmListener implements ConfirmListener{
  3. /**
  4. * 生產者發送消息到exchange成功的回調方法
  5. * 消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄。但是可以設置ReturnListener監聽來監聽有沒有匹配的隊列。
  6. * 因此handleAck執行了,並不能完全表示消息已經進入了對應的隊列,只能表示對應的exchange成功的接收了消息。
  7. * 消息被exchange接收過后,還需要通過一定的匹配規則分發到對應的隊列queue中。
  8. */
  9. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  10. //注意:deliveryTag是broker給消息指定的唯一id(從1開始)
  11. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)成功!multiple="+multiple);
  12. }
  13. /**
  14. * 生產者發送消息到服務器broker失敗的回調方法,服務器丟失了此消息。
  15. * 注意,丟失的消息仍然可以傳遞給消費者,但broker不能保證這一點。(不明白,既然丟失了,為啥還能發送)
  16. */
  17. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  18. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)失敗!服務器broker丟失了消息");
  19. }
  20. }



  
  
  
          
  1. //自定義的結果返回監聽器
  2. /**
  3. * 實現此接口以通知交付basicpublish失敗時,“mandatory”或“immediate”的標志監聽(源代碼注釋翻譯)。
  4. * 在發布消息時設置mandatory等於true,監聽消息是否有相匹配的隊列,
  5. * 沒有時ReturnListener將執行handleReturn方法,消息將返給發送者
  6. */
  7. public class MyReturnListener implements ReturnListener {
  8. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
  9. BasicProperties properties, byte[] body) throws IOException {
  10. System.out.println( "消息發送到隊列失敗:回復失敗編碼:"+replyCode+ ";回復失敗文本:"+replyText+ ";失敗消息對象:"+SerializationUtils.deserialize(body));
  11. }
  12. }


4、Consumer消息確認機制

為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在注冊消費者時,可以指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號后才從內存(或磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。

當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息(web管理界面上的Ready狀態);一部分是已經投遞給消費者,但是還沒有收到消費者ack信號的消息(web管理界面上的Unacked狀態)。如果服務器端一直沒有收到消費者的ack信號,並且消費此消息的消費者已經斷開連接,則服務器端會安排該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。

 

(1)重要方法


  
  
  
          
  1. /**
  2. *1. 開始一個非局部、非排他性消費, with a server-generated consumerTag.
  3. * 注意:執行這個方法會回調handleConsumeOk方法,在此方法中處理消息。
  4. * @param queue 隊列名稱
  5. * @param autoAck 是否自動應答。false表示consumer在成功消費過后必須要手動回復一下服務器,如果不回復,服務器就將認為此條消息消費失敗,繼續分發給其他consumer。
  6. * @param callback 回調方法類
  7. * @return 由服務器生成的consumertag
  8. * @throws java.io.IOException if an error is encountered
  9. */
  10. String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  11. /**
  12. *2
  13. consumer處理成功后,通知broker刪除隊列中的消息,如果設置multiple=true,表示支持批量確認機制以減少網絡流量。
  14. 例如:有值為5,6,7,8 deliveryTag的投遞
  15. 如果此時channel.basicAck(8, true);則表示前面未確認的5,6,7投遞也一起確認處理完畢。
  16. 如果此時channel.basicAck(8, false);則僅表示deliveryTag=8的消息已經成功處理。
  17. */
  18. void basicAck(long deliveryTag, boolean multiple) throws IOException;
  19. /**3
  20. consumer處理失敗后,例如:有值為5,6,7,8 deliveryTag的投遞。
  21. 如果channel.basicNack(8, true, true);表示deliveryTag=8之前未確認的消息都處理失敗且將這些消息重新放回隊列中。
  22. 如果channel.basicNack(8, true, false);表示deliveryTag=8之前未確認的消息都處理失敗且將這些消息直接丟棄。
  23. 如果channel.basicNack(8, false, true);表示deliveryTag=8的消息處理失敗且將該消息重新放回隊列。
  24. 如果channel.basicNack(8, false, false);表示deliveryTag=8的消息處理失敗且將該消息直接丟棄。
  25. */
  26. void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  27. /**4
  28. 相比channel.basicNack,除了沒有multiple批量確認機制之外,其他語義完全一樣。
  29. 如果channel.basicReject(8, true);表示deliveryTag=8的消息處理失敗且將該消息重新放回隊列。
  30. 如果channel.basicReject(8, false);表示deliveryTag=8的消息處理失敗且將該消息直接丟棄。
  31. */
  32. void basicReject(long deliveryTag, boolean requeue) throws IOException;


(2)部分使用代碼如下:


  
  
  
          
  1. //this表示自己的Consumer
  2. channel.basicConsume(queueName, false, this);
  3. ...
  4. @Override
  5. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  6. if (body == null)
  7. return;
  8. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  9. /**
  10. * 專門處理奇數消息的消費者
  11. */
  12. int tagId = (Integer) map.get( "tagId");
  13. if (tagId % 2 != 0) {
  14. //處理消息
  15. System.out.println( "接收並處理消息:"+tagId);
  16. //通知服務器此消息已經被處理了
  17. channel.basicAck(envelope.getDeliveryTag(), false);
  18. } else{
  19. //通知服務器消息處理失敗,重新放回隊列。false表示處理失敗消息不放會隊列,直接刪除
  20. channel.basicReject(envelope.getDeliveryTag(), true);
  21. }
  22. }


5、Demo項目整體代碼

此demo就是向RabbitMQ服務器上面發送20個消息,消息體是map,里面裝的是tagId=數字。然后注冊了兩個消費者,分別處理奇數和偶數。

5.1、連接工具類


  
  
  
          
  1. /**
  2. * 連接工具類
  3. */
  4. public class ConnectionUtil {
  5. Channel channel;
  6. Connection connection;
  7. String queueName;
  8. public ConnectionUtil(String queueName) throws IOException {
  9. this.queueName = queueName;
  10. // 創建連接工廠
  11. ConnectionFactory cf = new ConnectionFactory();
  12. // 設置rabbitmq服務器IP地址
  13. cf.setHost( "*.16.0.*");
  14. // 設置rabbitmq服務器用戶名
  15. cf.setUsername( "*");
  16. // 設置rabbitmq服務器密碼
  17. cf.setPassword( "*");
  18. cf.setPort(AMQP.PROTOCOL.PORT);
  19. // 獲取一個新的連接
  20. connection = cf.newConnection();
  21. // 創建一個通道
  22. channel = connection.createChannel();
  23. /**
  24. *申明一個隊列,如果這個隊列不存在,將會被創建
  25. * @param queue 隊列名稱
  26. * @param durable 持久性:true隊列會再重啟過后存在,但是其中的消息不會存在。
  27. * @param exclusive 是否只能由創建者使用
  28. * @param autoDelete 是否自動刪除(沒有連接自動刪除)
  29. * @param arguments 隊列的其他屬性(構造參數)
  30. * @return 宣告隊列的聲明確認方法已成功聲明。
  31. * @throws java.io.IOException if an error is encountered
  32. */
  33. channel.queueDeclare(queueName, true, false, false, null);
  34. }
  35. public void close() throws IOException{
  36. channel.close();
  37. connection.close();
  38. }
  39. }

 

5.2、具體生產者


  
  
  
          
  1. /**
  2. * 消息生產者
  3. */
  4. public class MessageProducer {
  5. private ConnectionUtil connectionUtil;
  6. public MessageProducer(ConnectionUtil connectionUtil){
  7. this.connectionUtil=connectionUtil;
  8. }
  9. /**
  10. * 發送消息到隊列中
  11. */
  12. public void sendMessage(Serializable object) throws IOException{
  13. /**
  14. * Publish a message
  15. * @param exchange 消息交換機名稱,空字符串將使用直接交換器模式,發送到默認的Exchange=amq.direct
  16. * @param routingKey 路由關鍵字
  17. * @param mandatory 監聽是否有符合的隊列
  18. * @param BasicProperties 設置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化
  19. * @param body 消息對象
  20. * @throws java.io.IOException if an error is encountered
  21. */
  22. connectionUtil.channel.basicPublish( "", connectionUtil.queueName, true, MessageProperties.TEXT_PLAIN, SerializationUtils.serialize(object));
  23. System.out.println( "MessageProducer發送了一條消息:"+object);
  24. }
  25. }

5.3、公共消費者父類


  
  
  
          
  1. /**
  2. * 消息消費者基礎類
  3. */
  4. public class MessageConsumer implements Consumer {
  5. //消費者標簽,注冊成功時由rabbitmq服務器自動生成
  6. protected String consumerTag;
  7. protected ConnectionUtil connectionUtil;
  8. public MessageConsumer(ConnectionUtil connectionUtil){
  9. this.connectionUtil=connectionUtil;
  10. }
  11. public void basicConsume(){
  12. try {
  13. /**
  14. * 設置消費投遞數目,一次性投遞10條消息。當消費者未確認消息達到10條時,rabbitMQ將不會向此消費者投遞消息,直到未確認數小於10條再投遞
  15. * @param prefetchCount 投遞數目
  16. * @param global 是否針對整個Channel。true表示此投遞數是給Channel設置的,false是給Channel上的Consumer設置的。
  17. * @throws java.io.IOException if an error is encountered
  18. */
  19. connectionUtil.channel.basicQos( 10, false); //表示每個消費者最多10條
  20. connectionUtil.channel.basicQos( 15, true); //整個傳輸管道最多15條,具體分到每個消費者身上又不能大於10條
  21. /**
  22. * 開始一個非局部、非排他性消費, with a server-generated consumerTag.
  23. * 執行這個方法會回調handleConsumeOk方法
  24. * @param queue 隊列名稱
  25. * @param autoAck 是否自動應答。false表示consumer在成功消費過后必須要手動回復一下服務器,如果不回復,服務器就將認為此條消息消費失敗,繼續分發給其他consumer。
  26. * @param callback 回調方法類
  27. * @return 由服務器生成的consumertag
  28. * @throws java.io.IOException if an error is encountered
  29. */
  30. connectionUtil.channel.basicConsume(connectionUtil.queueName, false, this);
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. /**
  36. * 收到消息時的回調函數
  37. */
  38. public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
  39. //子類重寫覆蓋具體操作
  40. }
  41. /**
  42. * 消費者注冊成功回調函數
  43. */
  44. public void handleConsumeOk(String consumerTag) {
  45. this.consumerTag=consumerTag;
  46. System.out.println( "消費者:"+consumerTag+ ",注冊成功!");
  47. }
  48. /**
  49. * 手動取消消費者注冊成功回調函數
  50. * 當調用Channel類的void basicCancel(String consumerTag) throws IOException;方法觸發此回調函數
  51. */
  52. public void handleCancelOk(String consumerTag) {
  53. System.out.println(consumerTag+ " 手動取消消費者注冊成功!");
  54. }
  55. /**
  56. * 當消費者因為其他原因被動取消注冊時調用,比如queue被刪除了。
  57. */
  58. public void handleCancel(String consumerTag) throws IOException {
  59. System.out.println( "因為外部原因消費者:"+consumerTag+ " 取消注冊!");
  60. }
  61. /**
  62. * 當通道或基礎連接被關閉時調用
  63. */
  64. public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
  65. System.out.println( "通道或基礎連接被關閉");
  66. }
  67. /**
  68. * Called when a <code><b>basic.recover-ok</b></code> is received
  69. * in reply to a <code><b>basic.recover</b></code>. All messages
  70. * received before this is invoked that haven't been <i>ack</i>'ed will be
  71. * re-delivered. All messages received afterwards won't be.
  72. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  73. */
  74. public void handleRecoverOk(String consumerTag) {
  75. }
  76. }

 

5.4、具體的消費者


  
  
  
          
  1. /**
  2. * 專門處理偶數消息的消費者
  3. */
  4. public class EvenConsumer extends MessageConsumer {
  5. public EvenConsumer(ConnectionUtil connectionUtil) {
  6. super(connectionUtil);
  7. }
  8. @Override
  9. public void handleConsumeOk(String consumerTag) {
  10. this.consumerTag=consumerTag;
  11. System.out.println( "EvenConsumer消費者:"+consumerTag+ ",注冊成功!");
  12. }
  13. @Override
  14. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  15. if (body == null)
  16. return;
  17. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  18. int tagId = (Integer) map.get( "tagId");
  19. if (tagId % 2 == 0) {
  20. //處理消息
  21. System.out.println( "EvenConsumer接收並處理消息:"+tagId);
  22. //通知服務器此消息已經被處理了
  23. connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
  24. } else{
  25. //通知服務器消息處理失敗,重新放回隊列。false表示處理失敗消息不放會隊列,直接刪除
  26. connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
  27. }
  28. }
  29. }

 


  
  
  
          
  1. /**
  2. * 專門處理奇數消息的消費者
  3. */
  4. public class OddConsumer extends MessageConsumer {
  5. public OddConsumer(ConnectionUtil connectionUtil) {
  6. super(connectionUtil);
  7. }
  8. @Override
  9. public void handleConsumeOk(String consumerTag) {
  10. this.consumerTag=consumerTag;
  11. System.out.println( "OddConsumer消費者:"+consumerTag+ ",注冊成功!");
  12. }
  13. @Override
  14. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  15. if (body == null)
  16. return;
  17. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  18. int tagId = (Integer) map.get( "tagId");
  19. if (tagId % 2 != 0) {
  20. //處理消息
  21. System.out.println( "OddConsumer接收並處理消息:"+tagId);
  22. //通知服務器此消息已經被處理了
  23. connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
  24. } else{
  25. //通知服務器消息處理失敗,重新放回隊列。false表示處理失敗消息不放會隊列,直接刪除
  26. connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
  27. }
  28. }
  29. }


5.5、監聽器


  
  
  
          
  1. /**
  2. *producer發送確認事件。
  3. */
  4. public class MyConfirmListener implements ConfirmListener{
  5. /**
  6. * 生產者發送消息到exchange成功的回調方法
  7. * 消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄。但是可以設置ReturnListener監聽來監聽有沒有匹配的隊列。
  8. * 因此handleAck執行了,並不能完全表示消息已經進入了對應的隊列,只能表示對應的exchange成功的接收了消息。
  9. * 消息被exchange接收過后,還需要通過一定的匹配規則分發到對應的隊列queue中。
  10. */
  11. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  12. //注意:deliveryTag是broker給消息指定的唯一id(從1開始)
  13. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)成功!multiple="+multiple);
  14. }
  15. /**
  16. * 生產者發送消息到服務器broker失敗的回調方法,服務器丟失了此消息。
  17. * 注意,丟失的消息仍然可以傳遞給消費者,但broker不能保證這一點。
  18. */
  19. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  20. System.out.println( "Exchange接收消息:"+deliveryTag+ "(deliveryTag)失敗!服務器broker丟失了消息");
  21. }
  22. }



  
  
  
          
  1. /**
  2. * 實現此接口以通知交付basicpublish失敗時,“mandatory”或“immediate”的標志監聽(源代碼注釋翻譯)。
  3. * 在發布消息時設置mandatory等於true,監聽消息是否有相匹配的隊列,
  4. * 沒有時ReturnListener將執行handleReturn方法,消息將返給發送者 。
  5. * 由於3.0版本過后取消了支持immediate,此處不做過多的解釋。
  6. */
  7. public class MyReturnListener implements ReturnListener {
  8. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
  9. BasicProperties properties, byte[] body) throws IOException {
  10. System.out.println( "消息發送到隊列失敗:回復失敗編碼:"+replyCode+ ";回復失敗文本:"+replyText+ ";失敗消息對象:"+SerializationUtils.deserialize(body));
  11. }
  12. }


5.6、客戶端


  
  
  
          
  1. public class Client {
  2. public static void main(String[] args) {
  3. new Client();
  4. }
  5. public Client(){
  6. try {
  7. //發消息
  8. publishMessage();
  9. //注冊消費者
  10. addConsumer();
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. public void publishMessage() throws IOException, InterruptedException{
  18. ConnectionUtil connectionUtil= new ConnectionUtil( "testqueue");
  19. MessageProducer producer= new MessageProducer(connectionUtil);
  20. connectionUtil.channel.confirmSelect();
  21. //注意:返回的時候Return在前,Confirm在后
  22. connectionUtil.channel.addConfirmListener( new MyConfirmListener());
  23. connectionUtil.channel.addReturnListener( new MyReturnListener());
  24. int i= 1;
  25. while (i<= 10) {
  26. HashMap<String, Object> map= new HashMap<String, Object>();
  27. map.put( "tagId", i);
  28. producer.sendMessage(map);
  29. i++;
  30. }
  31. }
  32. public void addConsumer() throws IOException{
  33. ConnectionUtil connectionUtil= new ConnectionUtil( "testqueue");
  34. OddConsumer odd= new OddConsumer(connectionUtil);
  35. odd.basicConsume();
  36. EvenConsumer even= new EvenConsumer(connectionUtil);
  37. even.basicConsume();
  38. }
  39. }

 

5.7、測試結果


  
  
  
          
  1. MessageProducer發送了一條消息:{tagId= 1}
  2. MessageProducer發送了一條消息:{tagId= 2}
  3. MessageProducer發送了一條消息:{tagId= 3}
  4. Exchange接收消息: 1(deliveryTag)成功!multiple= false
  5. Exchange接收消息: 2(deliveryTag)成功!multiple= false
  6. MessageProducer發送了一條消息:{tagId= 4}
  7. Exchange接收消息: 3(deliveryTag)成功!multiple= false
  8. MessageProducer發送了一條消息:{tagId= 5}
  9. Exchange接收消息: 4(deliveryTag)成功!multiple= false
  10. MessageProducer發送了一條消息:{tagId= 6}
  11. Exchange接收消息: 5(deliveryTag)成功!multiple= false
  12. MessageProducer發送了一條消息:{tagId= 7}
  13. Exchange接收消息: 6(deliveryTag)成功!multiple= false
  14. MessageProducer發送了一條消息:{tagId= 8}
  15. Exchange接收消息: 7(deliveryTag)成功!multiple= false
  16. Exchange接收消息: 8(deliveryTag)成功!multiple= false
  17. MessageProducer發送了一條消息:{tagId= 9}
  18. Exchange接收消息: 9(deliveryTag)成功!multiple= false
  19. MessageProducer發送了一條消息:{tagId= 10}
  20. Exchange接收消息: 10(deliveryTag)成功!multiple= false
  21. OddConsumer消費者:amq.ctag-z8s8LaSgYvo02jktCZrCYA,注冊成功!
  22. OddConsumer接收並處理消息: 1
  23. OddConsumer接收並處理消息: 3
  24. OddConsumer接收並處理消息: 5
  25. OddConsumer接收並處理消息: 7
  26. OddConsumer接收並處理消息: 9
  27. EvenConsumer消費者:amq.ctag-LpN6Q5VvNY3wCof2lXqS4A,注冊成功!
  28. EvenConsumer接收並處理消息: 4
  29. EvenConsumer接收並處理消息: 8
  30. EvenConsumer接收並處理消息: 2
  31. EvenConsumer接收並處理消息: 10
  32. EvenConsumer接收並處理消息: 6

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM