在SpringBoot中使用RabbitMQ


RabbitMQ簡介

wikipedia

RabbitMQ在CentOS上安裝

csdn-blog

配置文件

application.properties

spring.rabbitmq.host=119.29.213.48
spring.rabbitmq.port=5672 # 注意這里
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.virtual-host=/

pom.xml

我是通過maven來構建項目的。因此在一個SpringBoot項目的基礎上還需要添加一個依賴。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

實踐

概述

RabbitMQ我學習到的有四種模式:

分別是 DirectTopicFanoutHeaders 模式。

Direct 模式就是直接用隊列的名字來進行綁定,實現點對點的消息傳輸。

Topic 模式是根據 Config 中設置的 RoutineKey 還有發送消息時候的 topic 來判斷是否會傳輸。

Fanout 模式類似於廣播,不用設置路由,只要發送消息設置了對應的 Exchange 就可以對該 Exchange 中的接收者進行廣播。

Headers 模式比較不同於其他三種模式。Headers 是一個鍵值對,可以定義成 Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。

Demo

下面是一些實踐的代碼。

MQConfig.java

package com.psd.rabbitmq.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    // 四種模式:Direct,Topic,Fanout,Header

    public static final String DIRECT_QUEUE = "direct_queue";

    public static final String TOPIC_QUEUE_1 = "topic_queue_1";
    public static final String TOPIC_QUEUE_2 = "topic_queue_2";
    public static final String TOPIC_EXCHANGE = "topic_exchange";
    public static final String ROUTINE_KEY_1 = "topic.key1";
    // 可以使用通配符
    public static final String ROUTINE_KEY_2 = "topic.*";

    public static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    public static final String FANOUT_QUEUE_2 = "fanout_queue_2";
    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    public static final String HEADERS_QUEUE = "headers_queue";
    public static final String HEADERS_EXCHANGE = "headers_exchange";

    // Direct模式
    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE, true);
    }

    // Topic模式:根據RoutineKey去綁定接收的消息
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE_1, true);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE_2, true);
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTINE_KEY_1);
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTINE_KEY_2);
    }

    // Fanout模式:廣播
    @Bean
    public Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE_1, true);
    }
    @Bean
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE_2, true);
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    // Headers模式:只有檢驗頭部的KV是一致的才會接收到消息
    @Bean
    public HeadersExchange headersExchage(){
        return new HeadersExchange(HEADERS_EXCHANGE);
    }
    @Bean
    public Queue headerQueue() {
        return new Queue(HEADERS_QUEUE, true);
    }
    @Bean
    public Binding headerBinding() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("h1", "v1");
        map.put("h2", "v2");
        return BindingBuilder.bind(headerQueue()).to(headersExchage()).whereAll(map).match();
    }
}

MQSender.java

package com.psd.rabbitmq.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MQSender {
    private static Logger logger = LoggerFactory.getLogger(MQSender.class);

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendDirect() {
        String msg = "DirectMsg";
        logger.info("send msg : " + msg);
        amqpTemplate.convertAndSend(MQConfig.DIRECT_QUEUE, msg);
    }

    public void sendTopic() {
        String msg = "TopicMsg";
        logger.info("send msg : " + msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg + "_1");
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg + "_2");
    }

    public void sendFanout() {
        String msg = "FanoutMsg";
        logger.info("send msg : " + msg);
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
    }

    public void sendHeaders() {
        String msg = "HeadersMsg";
        logger.info("send msg : " + msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("h1", "v1");
        messageProperties.setHeader("h2", "v2");
        Message message = new Message(msg.getBytes(), messageProperties);
        amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", message);
    }
}

MQReceiver.java

package com.psd.rabbitmq.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MQReceiver {
    private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @RabbitListener(queues = MQConfig.DIRECT_QUEUE)
    public void receiveDirect(String msg) {
        log.info("Direct Receive : " + msg);
    }

    @RabbitListener(queues = MQConfig.TOPIC_QUEUE_1)
    public void receiveTopic1(String msg) {
        log.info("Topic1 Receive : " + msg);
    }

    @RabbitListener(queues = MQConfig.TOPIC_QUEUE_2)
    public void receiveTopic2(String msg) {
        log.info("Topic2 Receive : " + msg);
    }

    @RabbitListener(queues = MQConfig.FANOUT_QUEUE_1)
    public void receiveFanout1(String msg) {
        log.info("Fanout1 Receive : " + msg);
    }

    @RabbitListener(queues = MQConfig.FANOUT_QUEUE_2)
    public void receiveFanout2(String msg) {
        log.info("Fanout2 Receive : " + msg);
    }

    @RabbitListener(queues = MQConfig.HEADERS_QUEUE)
    public void receiveHeaders(byte[] msg) {
        log.info("Headers Receive : " + new String(msg));
    }
}

RabbitmqApplicationTests.java

package com.psd.rabbitmq;

import com.psd.rabbitmq.rabbitmq.MQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Autowired
    MQSender mqSender;

    @Test
    public void contextLoads() {
        mqSender.sendDirect();
        mqSender.sendTopic();
        mqSender.sendFanout();
        mqSender.sendHeaders();
    }

}

執行結果:

alt

遇到的BUG

啟動異常

org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:74) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:476) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:614) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
Caused by: java.util.concurrent.TimeoutException: null
	at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:306) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:957) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar:5.1.2]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:449) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
	... 9 common frames omitted

這個BUG我是通過修改 application.properties 中的 spring.rabbitmq.port 修復好的,這里我一開始使用了http的端口 15672,發生了上述異常。后面改成了 5672 可以成功連接。

映射2個端口:15672是Web管理界面的端口;5672是MQ訪問的端口。

以下是官方解釋:

5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS

15672: HTTP API clients and rabbitmqadmin (only if the management plugin is enabled)

無法自動創建隊列

無法創建隊列,或者找不到聲明的隊列。

@Bean
public Queue queue() {
    return new Queue(queue_name, true);
}

這里是忘了注入隊列的Bean了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM