SpringBoot整合RabbitMQ


SpringBoot整合RabbitMQ

一、引入相關依賴

<dependencies>
    <!--amqp-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--web-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--test-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

二、配置RabbitMQ

首先應當確保你已安裝了RabbitMQ,如果你沒有安裝,請參考:Docker 安裝 RabbitMq

查看RabbitMQ自動配置類RabbitAutoConfiguration:

@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {

其中@EnableConfigurationProperties(RabbitProperties.class)是RabbitMQ的相關屬性配置。

點進去RabbitProperties.class

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {

	private static final int DEFAULT_PORT = 5672;

	private static final int DEFAULT_PORT_SECURE = 5671;

	/**
	 * RabbitMQ host. Ignored if an address is set.
	 */
	private String host = "localhost";

	/**
	 * RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is
	 * enabled.
	 */
	private Integer port;

	/**
	 * Login user to authenticate to the broker.
	 */
	private String username = "guest";

	/**
	 * Login to authenticate against the broker.
	 */
	private String password = "guest";

我們可以通過spring.rabbitmq,在application.yml文件中配置相關的屬性,比如host、port、username、password。

在application.yml配置RabbitMQ:

spring:
  #rabbitmq的相關配置
  rabbitmq:
    host: 192.168.204.131
    port: 5672
    username: guest
    password: guest

繼續查看RabbitAutoConfiguration:

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    return template;
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
}

發現其向容器中注入了兩個組件:RabbitTemplate和AmqpAdmin,這兩個組件有什么作用呢?

RabbitTemplate:可以發送消息、接收消息。

AmqpAdmin操作Exchange、Queue、Binding等,比如創建、刪除、解綁。

1、測試RabbitTemplate

首先在容器中通過自動注入的方式獲取RabbitTemplate,然后在測試類中測試:

@SpringBootTest
class SpringBoot02AmqpApplicationTests {
    @Autowired
    RabbitTemplate rabbitTemplate;
}

(1)使用RabbitTemplate測試發送消息

  • send(String exchange, String routingKey, Message message):需要自己定義一個Message,比較麻煩。
  • convertAndSend(String exchange, String routingKey, Object object):只需要傳入一個Object,自動序列化發送給rabbitmq,object默認被當成消息體。
//單播(點對點)發送。
@Test
public void testRabbitTemplate() {
    HashMap<String, Object> map = new HashMap<>();
    map.put("name", "zhangsan");
    map.put("age", 22);
    rabbitTemplate.convertAndSend("exchange.direct","aiguigu.news",map);
}

這種方式在接收端接收的數據是這樣式的:

rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAADdAAEbmFtZXQA CHpoYW5nc2FudAAEbGlzdHNyABpqYXZhLnV0aWwuQXJyYXlzJEFycmF5TGlzdNmkPL7NiAbSAgABWwABYXQAE1tMamF2YS9sYW5nL09iamVjdDt4cHVyABdb TGphdmEuaW8uU2VyaWFsaXphYmxlO67QCaxT1+1JAgAAeHAAAAADdAAEaGFoYXNyABFqYXZhLmxhbmcuSW50ZWdlchLioKT3gYc4AgABSQAFdmFsdWV4cgAQ amF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAKac3IAEWphdmEubGFuZy5Cb29sZWFuzSBygNWc+u4CAAFaAAV2YWx1ZXhwAXQAA2FnZXNxAH4ACwAA
ABZ4

這是由於默認使用的是jdk的序列化方式,那么如何將消息轉化為json格式的數據發送出去?接下來自定義使用Jackson2JsonMessageConverter的消息轉化器。

自定義MessageConverter配置:

@Configuration
@EnableRabbit   //開啟基於注解的rabbitmq
public class MyAMQPConfig {

    /**
     * 設置自定義的 MessageConverter
     * 使用Jackson2JsonMessageConverter消息轉換器
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

然后再次測試,在接收端接收的數據如下:

{"name":"zhangsan","list":["haha",666,true],"age":22}

再發送個對象試試:

Book book = new Book("西游記", "吳承恩");
rabbitTemplate.convertAndSend("exchange.direct", "aiguigu.news", book);

使用自定義的消息轉化器之后,接收端數據:

{"bookName":"西游記","author":"吳承恩"}

(2)使用RabbitTemplate測試接收消息

  • receiveAndConvert(String queueName):接收隊列名稱為queueName的消息。
//接收數據
@Test
public void testReceive() {
    Object o = rabbitTemplate.receiveAndConvert("aiguigu.news");
    System.out.println(o.getClass());
    System.out.println(o);

    // 接收map
    // class java.util.HashMap
    // {name=zhangsan, list=[haha, 666, true], age=22}

    // 接收book對象
    // class com.example.bean.Book
    // Book{bookName='西游記', author='吳承恩'}
}

2、測試AmqpAdmin

  • removeBinding(Binding binding):解除某個bingding

    @Test
    public void testRemoveBinding() {
        //解除某個bingding
        amqpAdmin.removeBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,"amqpAdmin_direct.exchange", "amqp.haha", null));
    }
    
  • deleteExchange(String s):刪除指定的exchange.

    boolean deleteExchange = amqpAdmin.deleteExchange("amqpAdmin_direct.exchange");
    System.out.println(deleteExchange); //true
    
  • deleteQueue(String s):刪除指定Queue

    boolean deleteQueue = amqpAdmin.deleteQueue("declaredQueue");
    System.out.println("deleteQueue:"+deleteQueue); //true
    
  • getQueueInfo(String s),獲取指定隊列的信息。

    @Test
    public void getQueueInformation() {
        QueueInformation queueInformation = amqpAdmin.getQueueInfo("declaredQueue");
        int consumerCount = queueInformation.getConsumerCount();
        int messageCount = queueInformation.getMessageCount();
        String name = queueInformation.getName();
        System.out.println("consumerCount:" + consumerCount);   //0
        System.out.println("messageCount:" + messageCount); //0
        System.out.println("name:" + name); //declaredQueue
    }
    
  • declareExchange(Exchange exchange):聲明一個exchange.

    /**
         * 以declare開頭的是創建組件。
         * declareExchange(Exchange exchange):聲明一個exchange
         * Exchange是一個接口,其實現類有:
         * 1.DirectExchange
         * 2.FanoutExchange
         * 3.TopicExchange
         * 4.HeadersExchange
         * 5.CustomExchange
         */
    @Test
    public void testCreateExchange() {
        //創建一個Exchange
        amqpAdmin.declareExchange(new DirectExchange("amqpAdmin_direct.exchange"));
        System.out.println("創建完成!");
    
        //創建一個queue
        String declaredQueue = amqpAdmin.declareQueue(new Queue("declaredQueue"));
        System.out.println("declaredQueue:" + declaredQueue);   //declaredQueue
    
        //創建綁定規則
        amqpAdmin.declareBinding(new Binding("declaredQueue", Binding.DestinationType.QUEUE,
                                             "amqpAdmin_direct.exchange", "amqp.haha", null));
    } 
    

三、監聽消息隊列中的內容

使用@EnableRabbit+@RabbitListener監聽消息隊列中的內容。

@EnableRabbit:表示開啟基於注解的rabbitmq。

@RabbitListener:表示監聽某個隊列的內容。

@Service
public class BookServiceImpl implements BookService {
    /**
     * 注解:@RabbitListener(queues = "aiguigu.news"),表示監聽aigui.news這個隊列的內容。
     *
     * @param book
     */
    @RabbitListener(queues = "aiguigu.news")
    @Override
    public void receive(Book book) {
        System.out.println("收到aiguigu.news消息:" + book);
    }

    /**
     * 接收消息的第二種方式:
     *
     * @param message
     */
    @RabbitListener(queues = "aiguigu")
    @Override
    public void receive(Message message) {
        //獲取消息體
        byte[] body = message.getBody();
        System.out.println(body);   //[B@fe4bdc2
        //獲得消息屬性
        MessageProperties properties = message.getMessageProperties();
        System.out.println(properties);
        /*
        MessageProperties [headers={__TypeId__=com.example.bean.Book}, contentType=application/json,
        contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
        redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=aiguigu, deliveryTag=1,
        consumerTag=amq.ctag-ynkD05MwnffSCo9h7W5DGA, consumerQueue=aiguigu]
         */
    }
}

實現的效果,當給某個exchange發送消息的之后,exchange按照binding規則將消息分發給對應的隊列,使用 @RabbitListener可以監聽到這個隊列的消息,就可以獲取消息進行相應的操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM