springboot中使用RabbitMq消息隊列使用筆記(使用配置版本)


rabbitmq是什么,怎么搭建我在這就不敘述了,這里只說怎么使用。

說個小插曲,如果有的人連接rabbitmq非常慢,經常超時或者啟動rabbitmq很慢,可以在linux環境下配置一下就行了:

第一步:打開linux輸入以下命令

[root@hadoop1 mq]# hostname
hadoop1

這里的hadoop1就是我的主機名,然后把這個主機名配置進hosts文件即可;

第二步:vim /etc/hosts 命令對hosts文件進行添加一行數據:

主機名hadoop1為例
只需在hosts文件中加入 127.0.0.1 hadoop1
然后wq保存退出即可。

 第三步:重啟rabbitmq應該就可以解決連接,啟動過慢的問題了;

=====================================================

以下是使用rabbitmq的過程。

給大家推薦https://blog.csdn.net/qq_35387940/article/details/100514134這個博客,我的案例也是參照這個寫的,修改部分代碼加上自己的心得體會,有些原作者沒有的注釋我都加上了。可以先去看原作者的,然后再看這個。

 

 

 一個交換機可以連接多個隊列,生產者發送信息給交換機,交換機根據生產者帶來的路由關鍵字來確認傳遞給哪個隊列,消費者可以監聽一個或多個隊列,當隊列里存在消息的時候就會消費。

第一步:添加相關依賴

<dependency>
 <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

第二步:編寫application.yml主配置文件,有的配置看情況而定

server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:開啟失敗重試 enabled: true #第一次重試的間隔時長 initial-interval: 10000ms #最長重試間隔,超過這個間隔將不再重試 max-interval: 300000ms #下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍 multiplier: 2

第三步:編寫配置類

@Configuration public class DirectConfig { /** * 配置一個隊列,可以配置多個
   *第一個參數是隊列的名稱 *第二個參數durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,但並不是消息的持久化 *當消息代理重啟時仍然存在,暫存隊列:當前連接有效 *第三個參數exclusive(獨有的):默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable *第四個參數autoDelete:是否自動刪除,默認是false,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 *return new Queue("TestDirectQueue",true,true,false); *一般設置一下隊列的持久化就好,其余兩個就是默認false
*/ @Bean public Queue myQueue1 () { return new Queue("directQueue111",true); }

  @Bean
    public Queue myQueue2 () {
        return new Queue("directQueue222",true);
    }
 
         
  /** * 給交換機起名字,也可以配置多個交換機
   *第一個參數是交換機的名字,
   *第二個參數durable:是否持久化,默認false,持久化交換機
   *第三個參數autoDelete,默認為false,同隊列解釋
  
    如果你是topic模式的,那么交換機類型就是TopicExchange,如果是廣播(fanout)類型的就是FanoutExchange
   */
 @Bean public DirectExchange myDirectExchange() { return new DirectExchange("directExchange",true,false); } @Bean public DirectExchange lonelyExchange() { return new DirectExchange("lonelyExchange",true,false); } /*綁定 將隊列和交換機綁定, 並設置用於匹配鍵:mykey 隊列在前,交換機在后
  1.如果是fanout的模式的話,就沒有后面的with路由關鍵字
  2.如果是topic模式的話是支持通配符的

    通配符規則:
​      `#`:匹配一個或多個詞
​      `*`:匹配不多不少恰好1個詞
    舉例:
​      `audit.#`:能夠匹配`audit.irs.corporate`隊列 或者 `audit.irs`隊列
​      `audit.*`:只能匹配`audit.irs`隊列

    */

 @Bean public Binding directBinding(){ return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("mykey");
    //return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("audit.#"); } }

第四步:寫個接口方法進行測試

   @Autowired
   private AmqpTemplate amqpTemplate;
   @GetMapping("/sendDirectMessage") @ResponseBody public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //將消息攜帶綁定鍵值:路由關鍵字mykey 發送到交換機 directExchange
        amqpTemplate.convertAndSend("directExchange", "mykey", map); return "ok"; }

紅框圈起來的是比較常用的3個方法,分別是:

- 指定交換機、RoutingKey和消息體
- 指定消息
- 指定RoutingKey和消息,會向默認的交換機發送消息

到此,生產者已經編寫完成,一般來說我們生產者和消費者在不同的模塊里,我也是采用這種模式,編寫兩個spring boot模塊。

 

第五步:編寫消費者,配置跟生產者基本一致。

依賴包不變,編寫application.yml,注意端口不能沖突

server: port: 9999 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl

第六步:編寫消費者,也就是一個監聽器,監聽自己隊列是否有數據。

@Component//必須要寫 //@RabbitListener(queues = {"directQueue111","directQueue222"})
public class DirectListener { /** * RabbitListener 注解放在類上就需要使用RabbitHandler注解放在方法上進行配合使用, * 根據傳遞過來的參數類型判斷那個方法執行,如果傳遞過來的是String類型的,就去找帶有@RabbitHandler注解的方法,參數是String的
   * 如果傳遞過來的是map類型的,就找參數是map的方法
   *@RabbitListener也可以直接放在方法上監聽隊列,如果該隊列存在消息,就由這個方法來接收處理
*/ //@RabbitHandler @RabbitListener(queues = {"directQueue111"}) public void process(Map map) { System.out.println("DirectReceiver111消費者收到消息 : " + map.toString()); }
  @RabbitListener(queues = {"directQueue222"})
  public void process(Map map) {
  System.out.println("DirectReceiver222消費者收到消息 : " + map.toString());
  }
}

一般情況下消息隊列的使用已經介紹完畢了,但是有的小伙伴就會問,我怎么知道我的消息是發出去了沒有?消費者收到了沒有?好的,這就涉及到了生產者的確認機制和消費者的確認機制。

我們先來說生產者的確認機制在生產者application.yml中添加消息確認配置

server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:開啟失敗重試 enabled: true #第一次重試的間隔時長 initial-interval: 10000ms #最長重試間隔,超過這個間隔將不再重試 max-interval: 300000ms #下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍 multiplier: 2 #確認消息已發送到交換機(Exchange) publisher-confirms: true #確認消息已發送到隊列(Queue) publisher-returns: true

 在生產者方編寫一個配置文件RabbitConfig:

@Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //Mandatory強制的 //設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數
        rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("ConfirmCallback:     " + "相關數據:" + correlationData); System.out.println("ConfirmCallback:     " + "確認情況:" + b); System.out.println("ConfirmCallback:     " + "原因:" + s); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback:     " + "消息:" + message); System.out.println("ReturnCallback:     " + "回應碼:" + replyCode); System.out.println("ReturnCallback:     " + "回應信息:" + replyText); System.out.println("ReturnCallback:     " + "交換機:" + exchange); System.out.println("ReturnCallback:     " + "路由鍵:" + routingKey); } }); return rabbitTemplate; } }

生產者消息確認無非就四種情況:先把結論也寫了出來

①消息推送到server,但是在server里找不到交換機 --》 觸發confirm回調失敗
②消息推送到server,找到交換機了,但是沒找到隊列 --》 先觸發confirm回調返回true,然后再觸發return返回失敗
③消息推送到sever,交換機和隊列啥都沒找到 --》 觸發confirm回調失敗
④消息推送成功

①消息推送到server,但是在server里找不到交換機
寫個測試接口,把消息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有創建沒有配置的)
 @GetMapping("/TestMessageAck") public String TestMessageAck() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: non-existent-exchange test message "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map); return "ok"; }

調用接口,查看rabbitmq-provuder項目的控制台輸出情況(原因里面有說,沒有找到交換機'non-existent-exchange'):

2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40) ConfirmCallback: 相關數據:null ConfirmCallback: 確認情況:false ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)

結論: ①這種情況觸發的是 ConfirmCallback 回調函數。

 

②消息推送到server,找到交換機了,但是沒找到隊列  
這種情況就是需要新增一個交換機,但是不給這個交換機綁定隊列,我來簡單地在DirectRabitConfig里面新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何綁定配置操作

@Bean DirectExchange lonelyDirectExchange() {   return new DirectExchange("lonelyDirectExchange"); }

然后寫個測試接口,把消息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何隊列配置的):

@GetMapping("/TestMessageAck2") public String TestMessageAck2() {   String messageId = String.valueOf(UUID.randomUUID());   String messageData = "message: lonelyDirectExchange test message ";   String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));   Map<String, Object> map = new HashMap<>();   map.put("messageId", messageId);   map.put("messageData", messageData);   map.put("createTime", createTime);   rabbitTemplate.convertAndSend("lonelyDirectExchange", "DirectRouting", map);   return "ok"; }

調用接口,查看rabbitmq-provuder項目的控制台輸出情況:

ReturnCallback: 消息:(Body:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ReturnCallback: 回應碼:312 ReturnCallback: 回應信息:NO_ROUTE ReturnCallback: 交換機:lonelyDirectExchange ReturnCallback: 路由鍵:TestDirectRouting ConfirmCallback: 相關數據:null ConfirmCallback: 確認情況:true ConfirmCallback: 原因:null

可以看到這種情況,兩個函數都被調用了;
這種情況下,消息是推送成功到服務器了的,所以ConfirmCallback對消息確認情況是true;
而在RetrunCallback回調函數的打印參數里面可以看到,消息是推送到了交換機成功了,但是在路由分發給隊列的時候,找不到隊列,所以報了錯誤 NO_ROUTE 。
  結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回調函數。

 

③消息推送到sever,交換機和隊列啥都沒找到 
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回調是一致的,所以不做結果說明了。
  結論: ③這種情況觸發的是 ConfirmCallback 回調函數。

 

 ④消息推送成功
那么測試下,按照正常調用之前消息推送的接口就行,就調用下 /sendFanoutMessage接口,可以看到控制台輸出:

ConfirmCallback: 相關數據:null ConfirmCallback: 確認情況:true ConfirmCallback: 原因:null

結論: ④這種情況觸發的是 ConfirmCallback 回調函數。

以上是生產者推送消息的消息確認 回調函數的使用介紹(可以在回調函數根據需求做對應的擴展或者業務數據處理)。

下面我們繼續說一說消費者的消息確認機制:
把消費者自動確認消息改為手動確認:
server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:開啟失敗重試 enabled: true #第一次重試的間隔時長 initial-interval: 10000ms #最長重試間隔,超過這個間隔將不再重試 max-interval: 300000ms #下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍 multiplier: 2
    #確認消息已發送到交換機(Exchange)
    publisher-confirms: true
    #確認消息已發送到隊列(Queue)
    publisher-returns: true
# 手動確認消息(默認是自動確認): listener: simple: acknowledge-mode: manual
 
        
還是那個在消費者方編寫監聽器:
@Component public class DirectListener {
/**
* 手動確認機制
*/
@RabbitListener(queues = {"directQueue111"})
public void userInsert(Map map, Channel channel, Message message) throws IOException {

try {
     //Message中是rabbitmq管道中的所有信息,包括了消息體,隊列名稱,交換機名稱,唯一標志,狀態碼等基本信息
System.out.println(message);
     
    //這個map就是到接收的消息

System.out.println(map.toString());

    //同一時刻服務器只會發一條消息給消費者(能者多勞模式)

//channel.basicQos(1);

/**
* deliveryTag:該消息的index
* multiple:是否批量.true:將一次性ack所有小於deliveryTag的消息。
*/
//確認簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//簽收失敗
/**
* deliveryTag:該消息的index
* multiple:是否批量.true:將一次性拒絕所有小於deliveryTag的消息。
* requeue:被拒絕的是否重新入隊列
*/
//如果重新放進隊列中還是會放在隊列頭部,繼續消費者消費,如果一直消費一直錯誤就會產生堆積問題,理性使用
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
deliveryTag:該消息的index
requeue:被拒絕的是否重新入隊列


channel.basicNack 與 channel.basicReject 的區別在於basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息


如果不手動確認,也不拋出異常,消息不會自動重新推送(包括其他消費者),因為對於rabbitmq來說始終沒有接收到消息消費是否成功的確認,並且Channel是在消費端有緩存的,沒有斷開連接。
如果rabbitmq斷開,連接后會自動重新推送。如果消費端應用重啟,消息會自動重新推送。

東西呢就是這么個東西,大部分都是跟另外一篇博客一致的,只有消費者消息確認這一塊差異多點。over


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM