1.依賴 SpringBoot 2.1.6.RELEASE 版本
<!--rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置信息
#設置端口 server.port=80 #安裝的RabbitMq的服務器IP spring.rabbitmq.host=192.168.***.** #安裝的RabbitMq的服務器端口 spring.rabbitmq.port=5672 #安裝的RabbitMq的用戶名 spring.rabbitmq.username=xxx #安裝的RabbitMq的密碼 spring.rabbitmq.password=xxx #消息確認機制 spring.rabbitmq.publisher-confirms=true #與消息確認機制聯合使用,保證能夠收到回調 spring.rabbitmq.publisher-returns=true #消息確認模式 MANUAL:手動確認 NONE:不確認 AUTO:自動確認 spring.rabbitmq.listener.simple.acknowledge-mode=auto #消費者 spring.rabbitmq.listener.simple.concurrency=10 spring.rabbitmq.listener.simple.max-concurrency=10 #發布后重試 spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=5000 spring.rabbitmq.listener.simple.retry.max-attempts=5 #每隔多久進行重試 spring.rabbitmq.template.retry.multiplier=1.0 #消費失敗后重新消費 spring.rabbitmq.listener.simple.default-requeue-rejected=false #自定義的vhost spring.rabbitmq.dev-virtual-host=devVir spring.rabbitmq.test-virtual-host=testVir
3.配置信息:此處為多個Vhost配置,單個可直接使用,無需另外配置,只需聲明隊列信息即可
package com.rabbit.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* 2019年7月7日15:43:38 Joelan整合 RabbitConfig 概念介紹:
* 1.Queue:隊列,是RabbitMq的內部對象,用於存儲消息,RabbitMq的多個消費者可以訂閱同一個隊列,此時隊列會以輪詢的方式給多個消費者消費,而非多個消費者都收到所有的消息進行消費
* 注意:RabbitMQ不支持隊列層面的廣播消費,如果需要廣播消費,可以采用一個交換器通過路由Key綁定多個隊列,由多個消費者來訂閱這些隊列的方式。
* 2.Exchange:交換器,在RabbitMq中,生產者並非直接將消息投遞到隊列中。真實情況是,生產者將消息發送到Exchange(交換器),由交換器將消息路由到一個或多個隊列中。
* 注意:如果路由不到,或返回給生產者,或直接丟棄,或做其它處理。
* 3.RoutingKey:路由Key,生產者將消息發送給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則。這個路由Key需要與交換器類型和綁定鍵(BindingKey)聯合使用才能
* 最終生效。在交換器類型和綁定鍵固定的情況下,生產者可以在發送消息給交換器時通過指定RoutingKey來決定消息流向哪里。
* 4.Binding:RabbitMQ通過綁定將交換器和隊列關聯起來,在綁定的時候一般會指定一個綁定鍵,這樣RabbitMQ就可以指定如何正確的路由到隊列了。
*/
@Configuration
public class RabbitConfig {
/**
* RabbitMq的主機地址
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* RabbitMq的端口
*/
@Value("${spring.rabbitmq.port}")
private Integer port;
/**
* 用戶賬號
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* 用戶密碼
*/
@Value("${spring.rabbitmq.password}")
private String password;
/**
* 消息確認,回調機制
*/
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean confirms;
@Value("${spring.rabbitmq.publisher-returns}")
private boolean returns;
/**
* vhost:dev
*/
@Value("${spring.rabbitmq.dev-virtual-host}")
private String hrmDevVirtualHost;
/**
* vhost:test
*/
@Value("${spring.rabbitmq.test-virtual-host}")
private String hrmTestVirtualHost;
/**
* 若一個項目只使用一個virtualHost的話,默認只需要在配置文件中配置其屬性即可
* 若項目中使用到多個virtualHost,那么可以以通過創建ConnectionFactory的方式指定不同的virtualHost
*/
public ConnectionFactory createConnectionFactory(String host, Integer port, String username, String password,
String vHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setSimplePublisherConfirms(confirms);
connectionFactory.setPublisherReturns(returns);
connectionFactory.setVirtualHost(vHost);
return connectionFactory;
}
// ----------------------------------------------------------------------------------------第一步,創建消息連接,第一個VirtualHost
/**
* 創建指定vhost:test的連接工廠
*/
@Primary
@Bean(name = "devConnectionFactory")
public ConnectionFactory devConnectionFactory() {
return createConnectionFactory(host, port, username, password, hrmDevVirtualHost);
}
/**
* 若有多個vhost則自定義RabbitMqTemplate 通過名稱指定對應的vhost
*/
@Primary
@Bean(name = "devRabbitTemplate")
public RabbitTemplate devRabbitTemplate(
@Qualifier(value = "devConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息確認機制,ConnectionFactory中必須設置回調機制(publisher-confirms,publisher-returns)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息id為: " + correlationData + "的消息,已經被ack成功");
} else {
System.out.println("消息id為: " + correlationData + "的消息,消息nack,失敗原因是:" + cause);
}
}
});
return rabbitTemplate;
}
// ----------------------------------------------------------------------------------------第二個VirtualHost,以此類推
/**
* 創建指定vhost:test的連接工廠
*/
@Bean(name = "testConnectionFactory")
public ConnectionFactory testConnectionFactory() {
return createConnectionFactory(host, port, username, password, hrmTestVirtualHost);
}
/**
* 若有多個vhost則自定義RabbitMqTemplate 通過名稱指定對應的vhost,此處未使用回調
*/
@Bean(name = "testRabbitTemplate")
public RabbitTemplate testRabbitTemplate(
@Qualifier(value = "testConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
// ----------------------------------------------------------------------------------------引入:死信隊列
/**
* 所謂死信:即(1)消息被拒絕(basic.reject 或者 basic.nack),並且requeue=false;(2)消息的過期時間到期了;
* (3)隊列長度限制超過了 等三個因素造成。
* 我們會將以上原因造成的隊列存入死信隊列,死信隊列其實也是一個普通的隊列,我們可以根據自身需要,可以對死信進行操作。
* 以下為死信隊列的演示(將正常隊列監聽關閉並設置超時):首先聲明一個正常的隊列,並設置死信隊列的相關聲明【死信交換器(與正常隊列一致即可),死信路由Key等】
* 設置完后,准備一個新的隊列,此隊列用於接收上一個正常隊列發生死信后,將由此隊列代替(即候補隊列),然后將新隊列通過上一個交換器以及正常隊列中聲明的死信路由Key進行綁定
* 該操作與正常聲明一致(聲明交換器(可使用正常隊列的交換器,無需另外聲明),隊列,將隊列綁定到交換器)
*/
/**
* 聲明交換器(此處正常的與死信的交換器一致)
*/
@Bean
public Exchange testExchange() {
return new DirectExchange("test_exchange", true, false);
}
/**
* 聲明一個正常的隊列,並設置死信相關信息(交換器,路由Key),確保發生死信后會將死信存入交換器
*/
@Bean
public Queue testQueue() {
Map<String, Object> args = new HashMap<>(4);
// x-dead-letter-exchange 聲明 死信交換機
args.put("x-dead-letter-exchange", "test_exchange");
// x-dead-letter-routing-key 聲明死信路由鍵
args.put("x-dead-letter-routing-key", "test_dead_rout");
return new Queue("test_queue", true, false, false, args);
}
/**
* 將隊列綁定到指定交換器並設置路由
*/
@Bean
public Binding testBinding() {
return BindingBuilder.bind(testQueue()).to(testExchange()).with("test_rout").noargs();
}
/**
* 死信隊列(候補隊列) 若上面的正常隊列發生死信時,需將發生死信的隊列信息路由到此隊列中
* 路由過程:正常隊列發送->信息到交換器->交換器路由到正常隊列->監聽,發生死信->死信回到指定的交換器->再由交換器路由到死信隊列->死信監聽
*/
@Bean
public Queue testDeadQueue() {
return new Queue("test_dead_queue", true, false, false);
}
/**
* 綁定死信的隊列到候補隊列
*/
@Bean
public Binding testDeadBinding() {
return BindingBuilder.bind(testDeadQueue()).to(testExchange()).with("test_dead_rout").noargs();
}
// ----------------------------------------------------------------------------------------第二步,聲明隊列信息,Fanout模式
/**
* 此處使用第一個正常隊列來示例完整隊列過程 創建隊列 參數name:隊列的名稱,不能為空;設置為“”以使代理生成該名稱。
* 參數durable:true表示為持久隊列,該隊列將在服務器重新啟動后繼續存在
* 參數exclusive:如果聲明獨占隊列,則為true,該隊列將僅由聲明者的連接使用
* 參數autoDelete:如果服務器不再使用隊列時應將其刪除,則自動刪除為true 參數arguments:用於聲明隊列的參數
*/
@Bean
public Queue testFanoutQueue() {
/*
* 1.new Queue(name); return new Queue("test_fanout_queue");
*/
/*
* 2.new Queue(name,durable);
*/
return new Queue("test_fanout_queue", true, false, false);
/*
* 3.new Queue(name,durable,exclusive,autoDelete); return new
* Queue("test_fanout_queue", true, false, false);
*/
/*
* 4.new Queue(name,durable,exclusive,autoDelete,arguments); return new
* Queue("test_fanout_queue", true, true, true, null);
*/
}
/**
* 創建交換機 1.fanout:扇形交換器,它會把發送到該交換器的消息路由到所有與該交換器綁定的隊列中,如果使用扇形交換器,則不會匹配路由Key
* 白話:一個交換機可以綁定N個隊列,此模式會將寫入的隊列發送到一個交換機,由此交換機發送到N個隊列中,那么監聽該隊列的消費者都能收到對應的消息
*/
@Bean
@Primary
public Exchange testFanoutExchange() {
return new FanoutExchange("test_fanout_exchange");
}
/**
* 綁定隊列到交換機 Fanout模式不需要RoutingKey
*/
@Bean
public Binding testFanoutBinding() {
return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange()).with("").noargs();
}
// ----------------------------------------------------------------------------------------Direct模式
/**
* 創建隊列
*/
@Bean
public Queue testDirectQueue() {
return new Queue("test_direct_queue", true, false, false);
}
/**
* 創建交換機 2.direct交換器 直連模式,會把消息路由到RoutingKey與BindingKey完全匹配的隊列中。
* 白話:直連模式在綁定隊列到交換機的時候,RoutingKey與發送隊列的RoutingKey要完全保持一致
*/
@Bean
public Exchange testDirectExchange() {
return new TopicExchange("test_direct_exchange");
}
/**
* 綁定隊列到交換機並指定一個路由,此處的RoutingKey為test,發送隊列時也必須使用test
*/
@Bean
public Binding testDirectBinding() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("test").noargs();
}
// ----------------------------------------------------------------------------------------Topic模式
/**
* 創建隊列
*/
@Bean
public Queue testTopicQueue() {
return new Queue("test_topic_queue", true, false, false);
}
/**
* 創建交換機 2.topic 匹配模式(個人)與直連模式區別:RoutingKey可以模糊匹配,兩種匹配風格: *匹配 #匹配
* 我們的RoutingKey和BindKey為一個點分隔的字符串,例:test.routing.client
* 那么我們的模糊匹配,*可以匹配一個單詞,即:*.routing.* 可以匹配到 test.routing.client,
* #可以匹配多個單詞,即:#.client 可以匹配到 test.routing.client,以此類推
*/
@Bean
public Exchange testTopicExchange() {
return new TopicExchange("test_topic_exchange");
}
/**
* 綁定隊列到交換機並指定一個路由
*/
@Bean
public Binding testTopicBinding() {
return BindingBuilder.bind(testTopicQueue()).to(testTopicExchange()).with("test.*").noargs();
}
// ----------------------結束強調:第一步創建連接,第二步聲明隊列,交換器,路由Key信息,第三步發送隊列,第四步監聽隊列
}
4.發送隊列
package com.rabbit.send;
import java.util.UUID;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* RabbitSend
*/
@Component
public class RabbitSend {
@Autowired
@Qualifier(value = "devRabbitTemplate")
private RabbitTemplate rabbitTemplate;
/**
* 發送死信隊列
*/
public void sendDeadMsg(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 聲明消息處理器 這個對消息進行處理 可以設置一些參數 對消息進行一些定制化處理 我們這里 來設置消息的編碼 以及消息的過期時間
// 因為在.net 以及其他版本過期時間不一致 這里的時間毫秒值 為字符串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 設置編碼
messageProperties.setContentEncoding("utf-8");
// 設置過期時間10*1000毫秒
messageProperties.setExpiration("10000");
return message;
};
// 向test_queue 發送消息 10*1000毫秒后過期 形成死信,具體的時間可以根據自己的業務指定
rabbitTemplate.convertAndSend("test_exchange", "test_rout", msg, messagePostProcessor, correlationData);
}
/**
* 發送一條Fanout扇形隊列
*/
public void sendTestFanoutMsg(String msg) {
rabbitTemplate.convertAndSend("test_fanout_exchange", "", msg, new CorrelationData("2"));
}
/**
* 發送一條Direct直連隊列 若有開啟回調機制,必須傳此參數new CorrelationData("1"),用於聲明ID
*/
public void sendTestDirectMsg(String msg) {
rabbitTemplate.convertAndSend("test_direct_exchange", "test", msg, new CorrelationData("1"));
}
/**
* 發送一條Topic消息隊列
*/
public void sendTestMsg(String msg) {
rabbitTemplate.convertAndSend("test_topic_exchange", "test.mq", msg);
}
}
5.監聽隊列
package com.rabbit.receiver;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* RabbitReceiver
*/
@Component
public class RabbitReceiver {
/**
* 若死信隊列監聽到信息,表示我們的死信隊列設置是沒有問題的
*/
@RabbitHandler
@RabbitListener(queues = "test_dead_queue")
public void redirect(Message message, Channel channel) throws IOException {
System.out.println("監聽到死信隊列有消息進來");
}
/**
* 為了測試死信隊列效果,此處注銷監聽
*/
//@RabbitHandler
//@RabbitListener(queues = "test_queue")
//public void handlerTestQueue(Message message, Channel channel) throws IOException {
// System.out.println("監聽到正常隊列有消息進來");
//}
@RabbitHandler
@RabbitListener(queues = "test_fanout_queue")
public void handlerFanout(String msg) {
System.out.println("RabbitReceiver:" + msg + "test_fanout_queue");
}
@RabbitHandler
@RabbitListener(queues = "test_direct_queue")
public void handlerDirect(String msg) {
System.out.println("RabbitReceiver:" + msg + "test_direct_queue");
}
@RabbitHandler
@RabbitListener(queues = "test_topic_queue")
public void handlerTopic(String msg) {
System.out.println("RabbitReceiver:" + msg + "test_topic_queue");
}
}
