RabbitMQ簡介
RabbitMQ在CentOS上安裝
配置文件
application.properties
spring.rabbitmq.host=119.29.213.48
spring.rabbitmq.port=5672 # 注意這里
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.virtual-host=/
pom.xml
我是通過maven來構建項目的。因此在一個SpringBoot項目的基礎上還需要添加一個依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
實踐
概述
RabbitMQ我學習到的有四種模式:
分別是 Direct,Topic,Fanout,Headers 模式。
Direct 模式就是直接用隊列的名字來進行綁定,實現點對點的消息傳輸。
Topic 模式是根據 Config 中設置的 RoutineKey 還有發送消息時候的 topic 來判斷是否會傳輸。
Fanout 模式類似於廣播,不用設置路由,只要發送消息設置了對應的 Exchange 就可以對該 Exchange 中的接收者進行廣播。
Headers 模式比較不同於其他三種模式。Headers 是一個鍵值對,可以定義成 Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。
Demo
下面是一些實踐的代碼。
MQConfig.java
package com.psd.rabbitmq.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
// 四種模式:Direct,Topic,Fanout,Header
public static final String DIRECT_QUEUE = "direct_queue";
public static final String TOPIC_QUEUE_1 = "topic_queue_1";
public static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String ROUTINE_KEY_1 = "topic.key1";
// 可以使用通配符
public static final String ROUTINE_KEY_2 = "topic.*";
public static final String FANOUT_QUEUE_1 = "fanout_queue_1";
public static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String HEADERS_QUEUE = "headers_queue";
public static final String HEADERS_EXCHANGE = "headers_exchange";
// Direct模式
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE, true);
}
// Topic模式:根據RoutineKey去綁定接收的消息
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE_1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE_2, true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTINE_KEY_1);
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTINE_KEY_2);
}
// Fanout模式:廣播
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE_1, true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE_2, true);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
// Headers模式:只有檢驗頭部的KV是一致的才會接收到消息
@Bean
public HeadersExchange headersExchage(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headerQueue() {
return new Queue(HEADERS_QUEUE, true);
}
@Bean
public Binding headerBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("h1", "v1");
map.put("h2", "v2");
return BindingBuilder.bind(headerQueue()).to(headersExchage()).whereAll(map).match();
}
}
MQSender.java
package com.psd.rabbitmq.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MQSender {
private static Logger logger = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate;
public void sendDirect() {
String msg = "DirectMsg";
logger.info("send msg : " + msg);
amqpTemplate.convertAndSend(MQConfig.DIRECT_QUEUE, msg);
}
public void sendTopic() {
String msg = "TopicMsg";
logger.info("send msg : " + msg);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg + "_1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg + "_2");
}
public void sendFanout() {
String msg = "FanoutMsg";
logger.info("send msg : " + msg);
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
}
public void sendHeaders() {
String msg = "HeadersMsg";
logger.info("send msg : " + msg);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("h1", "v1");
messageProperties.setHeader("h2", "v2");
Message message = new Message(msg.getBytes(), messageProperties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", message);
}
}
MQReceiver.java
package com.psd.rabbitmq.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitListener(queues = MQConfig.DIRECT_QUEUE)
public void receiveDirect(String msg) {
log.info("Direct Receive : " + msg);
}
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_1)
public void receiveTopic1(String msg) {
log.info("Topic1 Receive : " + msg);
}
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_2)
public void receiveTopic2(String msg) {
log.info("Topic2 Receive : " + msg);
}
@RabbitListener(queues = MQConfig.FANOUT_QUEUE_1)
public void receiveFanout1(String msg) {
log.info("Fanout1 Receive : " + msg);
}
@RabbitListener(queues = MQConfig.FANOUT_QUEUE_2)
public void receiveFanout2(String msg) {
log.info("Fanout2 Receive : " + msg);
}
@RabbitListener(queues = MQConfig.HEADERS_QUEUE)
public void receiveHeaders(byte[] msg) {
log.info("Headers Receive : " + new String(msg));
}
}
RabbitmqApplicationTests.java
package com.psd.rabbitmq;
import com.psd.rabbitmq.rabbitmq.MQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
MQSender mqSender;
@Test
public void contextLoads() {
mqSender.sendDirect();
mqSender.sendTopic();
mqSender.sendFanout();
mqSender.sendHeaders();
}
}
執行結果:
遇到的BUG
啟動異常
org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:74) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:476) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:614) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
Caused by: java.util.concurrent.TimeoutException: null
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:306) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:957) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar:5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar:5.1.2]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:449) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
... 9 common frames omitted
這個BUG我是通過修改 application.properties 中的 spring.rabbitmq.port 修復好的,這里我一開始使用了http的端口 15672,發生了上述異常。后面改成了 5672 可以成功連接。
映射2個端口:15672是Web管理界面的端口;5672是MQ訪問的端口。
以下是官方解釋:
5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
15672: HTTP API clients and rabbitmqadmin (only if the management plugin is enabled)
無法自動創建隊列
無法創建隊列,或者找不到聲明的隊列。
@Bean
public Queue queue() {
return new Queue(queue_name, true);
}
這里是忘了注入隊列的Bean了。
