Spring Boot 整合RabbitMQ


導入RabbitMQ相關依賴,當然要確保RabbitMQ環境搭建成功了。

pom.xml:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 *  1 自動配置
 *      RabbitAutoConfiguration ---rabiitmq的自動配置類
 *      自動配置了連接工廠ConnectionFactory
 *      RabbitProperties封裝了RabbitMQ的配置,默認就有一寫配置
 * RabbitTemplate:給RabbitMQ發送和接受消息。
 *      AmqpAdmin:RabbitMQ系統管理功能組件。

 

我們可以在測試中對RabbitMQ進行測試:

package com.xt.springbootamqp;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitAmqpApplicationTests {

    //自動注入RabbitTemplate,由他來發送和接受消息
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *@Description:  單播模式,點對點
     *@date: 2019/6/17
     *   
     *   RabbitTemplate中發送消息send()的重載方法:
     *          @Override
     *        public void send(Message message) throws AmqpException {
     *                 send(this.exchange, this.routingKey, message);
     *          }
     *
     *           @Override
     *       public void send(String routingKey, Message message) throws AmqpException {
     *                 send(this.exchange, routingKey, message);
     *          }
     *
     *          @Override
     *      public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {
     *                 send(exchange, routingKey, message, null);
     *       }
     * 
     */
    @Test
    public void contextLoads() {
    // 使用send方法 rabbitTemplate.send(exchange,rountKey,message);
      String str = "hello";
      Message message = new Message(str.getBytes(),null);
      rabbitTemplate.send("","",message);
} }

但是我們一般發送消息常用的不是send方法,而是convertAndSend(),當然該方法也有很多重載方法。如圖:

 

發送消息的過程:當生產者發送消息時,首先會根據指定的交換機exchange找到所屬的多個隊列,又因為與隊列綁定的路由鍵rountKey不同,消息會根據路由鍵發送至相對應的隊列。而消費者直接面對的是隊列,只需要根據隊列名字獲取隊列相應信息就能得到消息。

由於測試,我事先已經在RabbitMQ客戶端上新增了多個交換機個隊列。#表示匹配多個單詞   .號表示匹配單個單詞

Exchanges:wqq.direct(單播)     ---> rountingkey:  key.direct  --->  queue.direct    

               ---> rountingKey: key.direct2 ---> queue.direct2

      wqq.fanout(廣播)   ---> rountingkey: 廣播模式與路由鍵無關   ---> queue.fanout

               ---> rountingkey:                                  ---> queue.fanout2

      wqq.topic(發布訂閱)  ---> rountingkey: #.topic  ---> ceshi.topic           只要是路由鍵是xxx.topic都能匹配

                ---> rountingkey: queue.# ---> queue.#

 

單播模式:

單播模式,也就是點對點。一個消息只能發送到一個交換機下的一個隊列上去。

例如:將消息發送至exchange為wqq.direct所綁定的rountkey為key.direct的消息隊列上,消息體是msg

    /**
     *@Description: 測試單播模式
     *@date: 2019/6/17
     */
    @Test
    public void test1(){
        //常用的發送消息的方法
       // rabbitTemplate.convertAndSend(String exchange,String rountKey,object message);
        String msg = "測試單播directRabbitMQ";
 rabbitTemplate.convertAndSend("wqq.direct","key.direct",msg);

    }

消費者獲取消息:

    @Test
    public void getMsg(){
        Object o = rabbitTemplate.receiveAndConvert("queue.direct");
        System.out.println(o);
    }

 

廣播模式:

交換機廣播模式下與路由鍵無關,因為會直接向所屬的隊列全部發送該消息。

    /**
     *@Description: 廣播模式
     *@date: 2019/6/17
     * 在廣播模式下,即使我設置了路由鍵,但是該交換機下的所欲隊列都會受到該消息,證明路由鍵無效
     */
    @Test
    public void fanoutTest(){
        rabbitTemplate.convertAndSend("wqq.fanout","queue.fanout","測試廣播模式");
    }

消費者接受消息:

    @Test
    public void getFanoutMsg(){
        Object o = rabbitTemplate.receiveAndConvert("queue.fanout");
        System.out.println(o);
    }

 

發布訂閱模式:

發布訂閱模式它會根據消息的路由鍵去匹配路由鍵綁定的隊列,只要能匹配上,就能將消息發送至隊列上。

    /**
     *@Description: 發布訂閱模式
     *@date: 2019/6/17
     */
    @Test
    public void topicMsg(){
        // 匹配#.topic的路由鍵或者adc.#的路由鍵     匹配..topic的路由鍵或者adc..的路由鍵
        rabbitTemplate.convertAndSend("wqq.topic","abc.topic","發布訂閱topic模式");  
    }
    @Test
    public void topicMsg1(){
        // 匹配#.topic的路由鍵或者queue.#的路由鍵    匹配..topic的路由鍵或者queue..的路由鍵
        rabbitTemplate.convertAndSend("wqq.topic","queue.topic","發布訂閱topic模式");
    }

 


 

雖然生產者發送消息和消費者接受消息測試通過了,但是還有一個·問題是:我們一般發送消息都不太可能是字符串,一般是對象或者集合。但是經過測試后,如果發送對象,那么消息是序列換之后的結果,不利於查看。

這是由於RabbitMQ使用的是默認的jdk序列化器,所以我們可以自定義序列化器,將對象類型轉換為JSON類型。

編寫配置類:

@Configuration
public class MyRabbitConfig {

 @Bean public MessageConverter messageConverter(){ MessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; }

}

 

 

 


 

監聽消息隊列:

一般在實際開發中一般使用監聽消息隊列當有消息發送至RabbitMQ中時,通過監聽器監聽到隊列中存在消息,然后將消息進行消費。

例子:如果有兩個系統,訂單系統、庫存系統,當訂單系統產生一個訂單,發送一個消息至RabbitMQ,庫存系統監聽到RabbitMQ隊列中產生了新的消息,然后對該消息進行消費,也就是減庫存操作。

步驟:

開啟基於RabbitMQ的注解:

package com.xt.springbootamqp;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 *@Description:
 *@date: 2019/6/17
 *  1 自動配置
 *      RabbitAutoConfiguration   ---rabiitmq的自動配置類
 *      自動配置了連接工廠ConnectionFactory
 *      RabbitProperties封裝了RabbitMQ的配置,默認就有一寫配置
 *      RabbitTemplate:給RabbitMQ發送和接受消息。
 *      AmqpAdmin:RabbitMQ系統管理功能組件。
 *
 *
 */
@SpringBootApplication
@EnableRabbit public class SpringBootRabbitAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitAmqpApplication.class, args);
    }

}

模擬訂單系統:

package com.xt.springbootamqp.controller;

import com.xt.springbootamqp.service.OrderServcieImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Date: 2019/6/17 16:16
 * @Description:  模擬用戶產生訂單
 */
@RestController
public class UserController {

    @Autowired
    private OrderServcieImpl orderServcie;


    @RequestMapping(value = "/addOrder")
    public void addOrder(){
        orderServcie.addOrder();
    }

}
package com.xt.springbootamqp.service;

import com.xt.springbootamqp.pojo.Order;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Date: 2019/6/17 16:08
 * @Description:  訂單系統
 */
@Service
public class OrderServcieImpl {

    /**
     *@Description: 模擬訂單系統產生訂單
     *@date: 2019/6/17
     */

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void addOrder(){
        //產生一個訂單
        Order order = new Order();
        order.setOrderId(88);
        order.setOrderName("小馬的訂單");

        //將訂單發送至RabbitMQ中;
        rabbitTemplate.convertAndSend("wqq.topic","queue.topic",order);
    }

}

模擬庫存系統:

package com.xt.springbootamqp.service;

import com.xt.springbootamqp.pojo.Order;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.function.ObjDoubleConsumer;

/**
 * @Date: 2019/6/17 16:18
 * @Description:   模擬庫存系統,監聽消息隊列,當訂單系統查詢消息時,庫存系統自動減庫存
 */
@Service
public class StockServiceImpl {


 @RabbitListener(queues = {"queue.topic"}) //隊列可以寫多個,也就是可以監聽多個隊列 public void reduceStock(Order order){
        //監聽到消息,自動減庫存
        //獲取訂單信息
        System.out.println(order.toString());
    }
  @RequestMapping(value = "/addOrder")
  public void addOrderMsg(Message message){
  byte[] body = message.getBody();
  MessageProperties messageProperties = message.getMessageProperties();
  }
}

 

 雖然監聽消息成功了,但是我們有沒有發現,我們都是以RabbitMQ中已經有了交換器和隊列在測試,那么如果RabbitMQ中沒有交換器和隊列。而是需要我們自己手動添加組件,那該怎么辦呢?


 

AmpqAdmin 管理組件:

    /**
     *@Description: 測試AmqpAdmin管理組件
     *@date: 2019/6/17
     */
    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     *@Description: 創建exchange
     *@date: 2019/6/17
     */
    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange(""));
        amqpAdmin.declareExchange(new FanoutExchange(""));
        amqpAdmin.declareExchange(new TopicExchange(""));
    }

 

 自定義創建Exchange、隊列、綁定關系:

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     *@Description: 創建exchange、隊列、綁定規則
     *@date: 2019/6/17
     */
    @Test
    public void test3(){
        //創建exchange
        amqpAdmin.declareExchange(new DirectExchange("amqp.direct_exchange"));
       /*
        amqpAdmin.declareExchange(new FanoutExchange(""));
        amqpAdmin.declareExchange(new TopicExchange(""));
        */

       //創建隊列
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//是否持久化

        //創建綁定關系
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqp.direct_exchange","amapadmin_key",null));

    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM