1.創建Maven工程,pom.xml引入依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
2.生產者:
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 連接IP
factory.setHost("192.168.254.137");
// 連接端口
factory.setPort(5672);
// 虛擬機
factory.setVirtualHost("/");
// 用戶
factory.setUsername("guest");
factory.setPassword("guest");
// 建立連接
Connection conn = factory.newConnection();
// 創建消息通道
Channel channel = conn.createChannel();
// 發送消息
String msg = "Hello world, Rabbit MQ";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "wuzz.test", null, msg.getBytes());
channel.close();
conn.close();
}
}
3.消費者:
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
private final static String QUEUE_NAME = "SIMPLE_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 連接IP
factory.setHost("192.168.254.137");
// 默認監聽端口
factory.setPort(5672);
// 虛擬機
factory.setVirtualHost("/");
// 設置訪問的用戶
factory.setUsername("guest");
factory.setPassword("guest");
// 建立連接
Connection conn = factory.newConnection();
// 創建消息通道
Channel channel = conn.createChannel();
// 聲明交換機
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);
// 聲明隊列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Waiting for message....");
// 綁定隊列和交換機
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"wuzz.test");
// 創建消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 開始獲取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
參數說明:
聲明交換機的參數:
String type:交換機的類型,direct, topic, fanout中的一種。
boolean durable:是否持久化,代表交換機在服務器重啟后是否還存在。
boolean autoDelete:是否自動刪除。
聲明隊列的參數:
boolean durable:是否持久化,代表隊列在服務器重啟后是否還存在。
boolean exclusive:是否排他性隊列。排他性隊列只能在聲明它的Connection中使用,連接斷開時自動刪除。
boolean autoDelete:是否自動刪除。如果為true,至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,隊列會自動刪除。
Map<String, Object> arguments:隊列的其他屬性,例如x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority。
消息屬性BasicProperties:
消息的全部屬性有14個,以下列舉了一些主要的參數:
Map<String,Object> headers 消息的其他自定義參數
Integer deliveryMode 2持久化,其他:瞬態
Integer priority 消息的優先級
String correlationId 關聯ID,方便RPC相應與請求關聯
String replyTo 回調隊列
String expiration TTL,消息過期時間,單位毫秒
登陸 RabbitMQ 后台管理界面,先運行消費端,建立連接,創建通道,聲明交換機,隊列,創建消費者監聽隊列,打開后台管理頁面可以看到如下信息:
交換機的信息:

后台管理界面可以看到當前的連接,Channel,Exchange,Queue等信息。
由消費者來創建對象(交換機、隊列、綁定關系)。
RabbitMQ常見配置:
1、TTL(Time To Live)消息的過期時間:
有兩種設置方式:通過隊列屬性設置消息過期時間,設置單條消息的過期時間。
// 通過隊列屬性設置消息過期時間
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl",6000);
// 聲明隊列(默認交換機AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
// 對每條消息設置過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.contentEncoding("UTF-8")
.expiration("10000") // TTL
.build();
// 此處兩種方式設置消息過期時間的方式都使用了,將以較小的數值為准
// 發送消息
channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());
2.死信隊列:
有三種情況消息會進入DLX(Dead Letter Exchange)死信交換機。
1、(NACK || Reject ) && requeue == false 設置手動應答ACK,且沒有應答並且拒絕了消息,禁止消息入隊。
2、消息過期
3、隊列達到最大長度(先入隊的消息會被發送到DLX)
// 創建消費者,並接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
if (msg.contains("拒收")){
// 拒絕消息
// requeue:是否重新入隊列,true:是;false:直接丟棄,相當於告訴隊列可以直接刪除掉
// TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費
channel.basicReject(envelope.getDeliveryTag(), false);
} else if (msg.contains("異常")){
// 批量拒絕
// requeue:是否重新入隊列
// TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費
channel.basicNack(envelope.getDeliveryTag(), true, false);
} else {
// 手工應答
// 如果不應答,隊列中的消息會一直存在,重新連接的時候會重復消費
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
};
// 開始獲取消息,注意這里開啟了手工應答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);
可以設置一個死信隊列(Dead Letter Queue)與DLX()死信交換機綁定,即可以存儲Dead Letter,消費者可以監聽這個隊列取走消息。
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");
// 指定了這個隊列的死信交換機
channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);
// 聲明死信交換機
channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);
// 聲明死信隊列
channel.queueDeclare("DLX_QUEUE", false, false, false, null);
// 綁定
channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");
3.優先級隊列,可以讓消息優先得到消費:
可以通過創建隊列的時候設置一個隊列的優先級的最大值,然后設置指定消息的優先級的值。優先級高的消息可以優先被消費,但是:只有消息堆積(消息的發送速度大於消費者的消費速度)的情況下優先級才有意義。
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-max-priority",10); // 隊列最大優先級
channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2代表持久化
.contentEncoding("UTF-8") // 編碼
.expiration("10000") // TTL,過期時間
.headers(headers) // 自定義屬性
.priority(5) // 優先級,默認為5,配合隊列的 x-max-priority 屬性使用
.messageId(String.valueOf(UUID.randomUUID()))
.build();
channel.basicPublish("", "ORIGIN_QUEUE", properties, msg.getBytes());
4.延遲隊列:
RabbitMQ本身不支持延遲隊列。可以使用TTL結合DLX的方式來實現消息的延遲投遞,即把DLX跟某個隊列綁定,到了指定時間,消息過期后,就會從DLX路由到這個隊列,消費者可以從這個隊列取走消息。
另一種方式是使用rabbitmq-delayed-message-exchange插件。當然,將需要發送的信息保存在數據庫,使用任務調度系統掃描然后發送也是可以實現的。
5.服務端流控(Flow Control):
RabbitMQ 會在啟動時檢測機器的物理內存數值。默認當 MQ 占用 40% 以上內存時,MQ 會主動拋出一個內存警告並阻塞所有連接(Connections)。可以通過修改 rabbitmq.config 文件來調整內存閾值,默認值是 0.4,如下所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].
默認情況,如果剩余磁盤空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。
注意:調整隊列長度只在消息堆積的情況下有意義,而且會刪除先入隊的消息,不能實現服務端限流。
6.消費端限流:
在AutoACK為false的情況下,如果一定數目的消息(通過基於consumer或者channel設置Qos的值)未被確認前,不進行消費新的消息。類似ActiveMQ的 FetchSize預取大小,這里也有這么一個概念。
channel.basicQos(2); // 如果超過2條消息沒有發送ACK,當前消費者不再接受隊列消息 channel.basicConsume(QUEUE_NAME, false, consumer);
Spring 集成 RabbitMQ:
ConnectionFactory:
Spring AMQP 的連接工廠接口,用於創建連接。CachingConnectionFactory 是ConnectionFactory 的一個實現類。
RabbitAdmin:
RabbitAdmin 是 AmqpAdmin 的實現,封裝了對 RabbitMQ 的基礎管理操作,比如對交換機、隊列、綁定的聲明和刪除等。為什么我們在配置文件(Spring)或者配置類(SpringBoot)里面定義了交換機、隊列、綁定關系,並沒有直接調用 Channel 的 declare 的方法,Spring 在啟動的時候就可以幫我們創建這些元數據?這些事情就是由 RabbitAdmin 完成的。RabbitAdmin 實 現 了 InitializingBean 接 口 , 里 面 有 唯 一 的 一 個 方 法afterPropertiesSet(),這個方法會在 RabbitAdmin 的屬性值設置完的時候被調用。在 afterPropertiesSet ()方法中,調用了一個 initialize()方法。這里面創建了三個Collection,用來盛放交換機、隊列、綁定關系。最后依次聲明返回類型為 Exchange、Queue 和 Binding 這些 Bean,底層還是調用了 Channel 的 declare 的方法。
Message:
Message 是 Spring AMQP 對消息的封裝。兩個重要的屬性:
- body:消息內容。
- messageProperties:消息屬性。
RabbitTemplate 消息模板:
RabbitTemplate 是 AmqpTemplate 的一個實現(目前為止也是唯一的實現),用來簡化消息的收發,支持消息的確認(Confirm)與返回(Return)。跟 JDBCTemplate一 樣 , 它 封 裝 了 創 建 連 接 、 創 建 消 息 信 道 、 收 發 消 息 、 消 息 格 式 轉 換(ConvertAndSend→Message)、關閉信道、關閉連接等等操作。針對於多個服務器連接,可以定義多個 Template。可以注入到任何需要收發消息的地方使用。
Messager Listener 消息 偵聽:
MessageListener 是 Spring AMQP 異步消息投遞的監聽器接口,它只有一個方法onMessage,用於處理消息隊列推送來的消息,作用類似於 Java API 中的 Consumer。
MessageListenerContainer:
MessageListenerContainer可以理解為MessageListener的容器,一個Container只有一個 Listener,但是可以生成多個線程使用相同的 MessageListener 同時消費消息。Container 可以管理 Listener 的生命周期,可以用於對於消費者進行配置。例如:動態添加移除隊列、對消費者進行設置,例如 ConsumerTag、Arguments、並發、消費者數量、消息確認模式等等。
轉換器 MessageConvertor:
MessageConvertor 的 作用?RabbitMQ 的消息在網絡傳輸中需要轉換成 byte[](字節數組)進行發送,消費者需要對字節數組進行解析。在 Spring AMQP 中,消息會被封裝為 org.springframework.amqp.core.Message對象。消息的序列化和反序列化,就是處理 Message 的消息體 body 對象。如果消息已經是 byte[]格式,就不需要轉換。如果是 String,會轉換成 byte[]。如果是 Java 對象,會使用 JDK 序列化將對象轉換為 byte[](體積大,效率差)。在 調 用 RabbitTemplate 的 convertAndSend() 方 法 發 送 消 息 時 , 會 使 用MessageConvertor 進行消息的序列化,默認使用 SimpleMessageConverter。在某些情況下,我們需要選擇其他的高效的序列化工具。如果我們不想在每次發送消息時自己處理消息,就可以直接定義一個 MessageConvertor。如何 自定義 MessageConverter ?例如:我們要使用 Gson 格式化消息:創建一個類,實現 MessageConverter 接口,重寫 toMessage()和 fromMessage()方法。
1.在導入Spring 的相關依賴之外,導入 RabbitMQ的依賴:
<!--rabbitmq依賴 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
2. applicationContext.xml :
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="classpath*:rabbitMQ.xml" />
<!-- 掃描指定package下所有帶有如 @Controller,@Service,@Resource 並把所注釋的注冊為Spring Beans -->
<context:component-scan base-package="com.wuzz.*" />
<!-- 激活annotation功能 -->
<context:annotation-config />
<!-- 激活annotation功能 -->
<context:spring-configured />
</beans>
3.配置rabbitMQ.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定連接rabbit server參數 -->
<rabbit:connection-factory id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" virtual-host="/" username="guest" password="guest" host="127.0.0.1" port="5672" />
<!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--######分隔線######-->
<!--定義queue -->
<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!--定義direct exchange,綁定MY_FIRST_QUEUE -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義rabbit template用於數據的接收和發送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" />
<!--消息接收者 -->
<bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean>
<!--queue listener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
</rabbit:listener-container>
<!--定義queue -->
<rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 將已經定義的Exchange綁定到MY_SECOND_QUEUE,注意關鍵詞是key -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean id="receiverSecond" class="com.gupaoedu.consumer.SecondConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
</rabbit:listener-container>
<!--######分隔線######-->
<!--定義queue -->
<rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定義topic exchange,綁定MY_THIRD_QUEUE,注意關鍵詞是pattern -->
<rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbit template用於數據的接收和發送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="MY_TOPIC_EXCHANGE" />
<!-- 消息接收者 -->
<bean id="receiverThird" class="com.gupaoedu.consumer.ThirdConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
</rabbit:listener-container>
<!--######分隔線######-->
<!--定義queue -->
<rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定義fanout exchange,綁定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
<rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true" declared-by="connectAdmin" >
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
<rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 消息接收者 -->
<bean id="receiverFourth" class="com.gupaoedu.consumer.FourthConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
</rabbit:listener-container>
</beans>
4.生產者:
@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
@Qualifier("amqpTemplate")
private AmqpTemplate amqpTemplate;
@Autowired
@Qualifier("amqpTemplate2")
private AmqpTemplate amqpTemplate2;
/**
* 演示三種交換機的使用
*
* @param message
*/
public void sendMessage(Object message) {
logger.info("Send message:" + message);
// amqpTemplate 默認交換機 MY_DIRECT_EXCHANGE
// amqpTemplate2 默認交換機 MY_TOPIC_EXCHANGE
// Exchange 為 direct 模式,直接指定routingKey
amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);
// Exchange模式為topic,通過topic匹配關心該主題的隊列
amqpTemplate2.convertAndSend("msg.Third.send","[Topic,msg.Third.send] "+message);
// 廣播消息,與Exchange綁定的所有隊列都會收到消息,routingKey為空
amqpTemplate2.convertAndSend("MY_FANOUT_EXCHANGE",null,"[Fanout] "+message);
}
}
5.消費者,需要實現 MessageListener 接口:
public class FirstConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(FirstConsumer.class);
public void onMessage(Message message) {
logger.info("The first consumer received message : " + message.getBody());
}
}
SpringBoot 集成 RabbitMQ:
基於下面這個結構圖去集成:

消費者的配置:
1.構建Springboot項目導入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.Rabbit配置類,生成交換機,隊列,以及匹配規則:
@Configuration
public class RabbitConfig {
//1.定義三個交換機
@Bean
public DirectExchange directExchange(){
return new DirectExchange("DIRECT_EXCHANGE");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("TOPIC_EXCHANGE");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FANOUT_EXCHANGE");
}
//2.定義四個隊列
@Bean
public Queue firstQueue(){
return new Queue("FIRST_QUEUE");
}
@Bean
public Queue secondQueue(){
return new Queue("SECOND_DQUEUE");
}
@Bean
public Queue thirdQueue(){
return new Queue("THIRD_DQUEUE");
}
@Bean
public Queue fourthQueue(){
return new Queue("FOURTH_DQUEUE");
}
//3.定義四個綁定關系
@Bean
public Binding bindFirst(@Qualifier("firstQueue") Queue queue,
@Qualifier("directExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("wuzz.test");
}
@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("*.wuzz.*");
}
@Bean
public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
@Qualifier("fanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding bindFourth(@Qualifier("fourthQueue") Queue queue,
@Qualifier("fanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
3.定義4個消費者:
@Configuration
@RabbitListener(queues = "FIRST_QUEUE")
public class FirstConsumer {
@RabbitHandler
public void process(String msg){
System.out.println("First Queue received msg : " + msg);
}
}
@Configuration
@RabbitListener(queues = "SECOND_DQUEUE")
public class SecondConsumer {
@RabbitHandler
public void process(String msg){
System.out.println("second Queue received msg : " + msg);
}
}
@Configuration
@RabbitListener(queues = "THIRD_DQUEUE")
public class ThirdConsumer {
@RabbitHandler
public void process(String msg){
System.out.println("third Queue received msg : " + msg);
}
}
@Configuration
@RabbitListener(queues = "FOURTH_DQUEUE")
public class FourthConsumer {
@RabbitHandler
public void process(String msg){
System.out.println("fourth Queue received msg : " + msg);
}
}
4.配置application.properties,定義鏈接地址等信息:
spring.application.name=spirng-boot-rabbitmq-consumer spring.rabbitmq.host=192.168.254.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
啟動消費者,可以通過管理頁面看到有4個鏈接通道及交換機隊列信息。
生產者:
1.構建SpringBoot項目導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.通過 RabbitTemplate 構建自己的生產者類,發送4條消息:
@Component
public class MyProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","wuzz.test","DIRECT_EXCHANGE message");
rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","hangzhou.wuzz.test","TOPIC_EXCHANGE hangzhou message");
rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","wenzhou.wuzz.test","TOPIC_EXCHANGE wenzhou message");
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE","","FANOUT_EXCHANGE message");
}
}
3.編寫測試類測試,並且啟動:
@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTest {
@Autowired
private MyProducer myProducer;
@Test
public void send(){
myProducer.send();
}
}
同樣需要配置application.properties,順利的話消費者方會收到消息如下,這樣就集成完了:

