1.創建Maven工程,pom.xml引入依賴:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
2.生產者:
public class MyProducer { private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 連接IP factory.setHost("192.168.254.137"); // 連接端口 factory.setPort(5672); // 虛擬機 factory.setVirtualHost("/"); // 用戶 factory.setUsername("guest"); factory.setPassword("guest"); // 建立連接 Connection conn = factory.newConnection(); // 創建消息通道 Channel channel = conn.createChannel(); // 發送消息 String msg = "Hello world, Rabbit MQ"; // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish(EXCHANGE_NAME, "wuzz.test", null, msg.getBytes()); channel.close(); conn.close(); } }
3.消費者:
public class MyConsumer { private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE"; private final static String QUEUE_NAME = "SIMPLE_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 連接IP factory.setHost("192.168.254.137"); // 默認監聽端口 factory.setPort(5672); // 虛擬機 factory.setVirtualHost("/"); // 設置訪問的用戶 factory.setUsername("guest"); factory.setPassword("guest"); // 建立連接 Connection conn = factory.newConnection(); // 創建消息通道 Channel channel = conn.createChannel(); // 聲明交換機 // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null); // 聲明隊列 // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" Waiting for message...."); // 綁定隊列和交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"wuzz.test"); // 創建消費者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); System.out.println("consumerTag : " + consumerTag ); System.out.println("deliveryTag : " + envelope.getDeliveryTag() ); } }; // 開始獲取消息 // String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, true, consumer); } }
參數說明:
聲明交換機的參數:
String type:交換機的類型,direct, topic, fanout中的一種。
boolean durable:是否持久化,代表交換機在服務器重啟后是否還存在。
boolean autoDelete:是否自動刪除。
聲明隊列的參數:
boolean durable:是否持久化,代表隊列在服務器重啟后是否還存在。
boolean exclusive:是否排他性隊列。排他性隊列只能在聲明它的Connection中使用,連接斷開時自動刪除。
boolean autoDelete:是否自動刪除。如果為true,至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,隊列會自動刪除。
Map<String, Object> arguments:隊列的其他屬性,例如x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority。
消息屬性BasicProperties:
消息的全部屬性有14個,以下列舉了一些主要的參數:
Map<String,Object> headers 消息的其他自定義參數
Integer deliveryMode 2持久化,其他:瞬態
Integer priority 消息的優先級
String correlationId 關聯ID,方便RPC相應與請求關聯
String replyTo 回調隊列
String expiration TTL,消息過期時間,單位毫秒
登陸 RabbitMQ 后台管理界面,先運行消費端,建立連接,創建通道,聲明交換機,隊列,創建消費者監聽隊列,打開后台管理頁面可以看到如下信息:
交換機的信息:
后台管理界面可以看到當前的連接,Channel,Exchange,Queue等信息。
由消費者來創建對象(交換機、隊列、綁定關系)。
RabbitMQ常見配置:
1、TTL(Time To Live)消息的過期時間:
有兩種設置方式:通過隊列屬性設置消息過期時間,設置單條消息的過期時間。
// 通過隊列屬性設置消息過期時間 Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-message-ttl",6000); // 聲明隊列(默認交換機AMQP default,Direct) // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss); // 對每條消息設置過期時間 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 持久化消息 .contentEncoding("UTF-8") .expiration("10000") // TTL .build(); // 此處兩種方式設置消息過期時間的方式都使用了,將以較小的數值為准 // 發送消息 channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());
2.死信隊列:
有三種情況消息會進入DLX(Dead Letter Exchange)死信交換機。
1、(NACK || Reject ) && requeue == false 設置手動應答ACK,且沒有應答並且拒絕了消息,禁止消息入隊。
2、消息過期
3、隊列達到最大長度(先入隊的消息會被發送到DLX)
// 創建消費者,並接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); if (msg.contains("拒收")){ // 拒絕消息 // requeue:是否重新入隊列,true:是;false:直接丟棄,相當於告訴隊列可以直接刪除掉 // TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費 channel.basicReject(envelope.getDeliveryTag(), false); } else if (msg.contains("異常")){ // 批量拒絕 // requeue:是否重新入隊列 // TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費 channel.basicNack(envelope.getDeliveryTag(), true, false); } else { // 手工應答 // 如果不應答,隊列中的消息會一直存在,重新連接的時候會重復消費 channel.basicAck(envelope.getDeliveryTag(), true); } } }; // 開始獲取消息,注意這里開啟了手工應答 // String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, false, consumer);
可以設置一個死信隊列(Dead Letter Queue)與DLX()死信交換機綁定,即可以存儲Dead Letter,消費者可以監聽這個隊列取走消息。
Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("x-dead-letter-exchange","DLX_EXCHANGE"); // 指定了這個隊列的死信交換機 channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments); // 聲明死信交換機 channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null); // 聲明死信隊列 channel.queueDeclare("DLX_QUEUE", false, false, false, null); // 綁定 channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");
3.優先級隊列,可以讓消息優先得到消費:
可以通過創建隊列的時候設置一個隊列的優先級的最大值,然后設置指定消息的優先級的值。優先級高的消息可以優先被消費,但是:只有消息堆積(消息的發送速度大於消費者的消費速度)的情況下優先級才有意義。
Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-max-priority",10); // 隊列最大優先級 channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("name", "gupao"); headers.put("level", "top"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2代表持久化 .contentEncoding("UTF-8") // 編碼 .expiration("10000") // TTL,過期時間 .headers(headers) // 自定義屬性 .priority(5) // 優先級,默認為5,配合隊列的 x-max-priority 屬性使用 .messageId(String.valueOf(UUID.randomUUID())) .build(); channel.basicPublish("", "ORIGIN_QUEUE", properties, msg.getBytes());
4.延遲隊列:
RabbitMQ本身不支持延遲隊列。可以使用TTL結合DLX的方式來實現消息的延遲投遞,即把DLX跟某個隊列綁定,到了指定時間,消息過期后,就會從DLX路由到這個隊列,消費者可以從這個隊列取走消息。
另一種方式是使用rabbitmq-delayed-message-exchange插件。當然,將需要發送的信息保存在數據庫,使用任務調度系統掃描然后發送也是可以實現的。
5.服務端流控(Flow Control):
RabbitMQ 會在啟動時檢測機器的物理內存數值。默認當 MQ 占用 40% 以上內存時,MQ 會主動拋出一個內存警告並阻塞所有連接(Connections)。可以通過修改 rabbitmq.config 文件來調整內存閾值,默認值是 0.4,如下所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].
默認情況,如果剩余磁盤空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。
注意:調整隊列長度只在消息堆積的情況下有意義,而且會刪除先入隊的消息,不能實現服務端限流。
6.消費端限流:
在AutoACK為false的情況下,如果一定數目的消息(通過基於consumer或者channel設置Qos的值)未被確認前,不進行消費新的消息。類似ActiveMQ的 FetchSize預取大小,這里也有這么一個概念。
channel.basicQos(2); // 如果超過2條消息沒有發送ACK,當前消費者不再接受隊列消息 channel.basicConsume(QUEUE_NAME, false, consumer);
Spring 集成 RabbitMQ:
ConnectionFactory:
Spring AMQP 的連接工廠接口,用於創建連接。CachingConnectionFactory 是ConnectionFactory 的一個實現類。
RabbitAdmin:
RabbitAdmin 是 AmqpAdmin 的實現,封裝了對 RabbitMQ 的基礎管理操作,比如對交換機、隊列、綁定的聲明和刪除等。為什么我們在配置文件(Spring)或者配置類(SpringBoot)里面定義了交換機、隊列、綁定關系,並沒有直接調用 Channel 的 declare 的方法,Spring 在啟動的時候就可以幫我們創建這些元數據?這些事情就是由 RabbitAdmin 完成的。RabbitAdmin 實 現 了 InitializingBean 接 口 , 里 面 有 唯 一 的 一 個 方 法afterPropertiesSet(),這個方法會在 RabbitAdmin 的屬性值設置完的時候被調用。在 afterPropertiesSet ()方法中,調用了一個 initialize()方法。這里面創建了三個Collection,用來盛放交換機、隊列、綁定關系。最后依次聲明返回類型為 Exchange、Queue 和 Binding 這些 Bean,底層還是調用了 Channel 的 declare 的方法。
Message:
Message 是 Spring AMQP 對消息的封裝。兩個重要的屬性:
- body:消息內容。
- messageProperties:消息屬性。
RabbitTemplate 消息模板:
RabbitTemplate 是 AmqpTemplate 的一個實現(目前為止也是唯一的實現),用來簡化消息的收發,支持消息的確認(Confirm)與返回(Return)。跟 JDBCTemplate一 樣 , 它 封 裝 了 創 建 連 接 、 創 建 消 息 信 道 、 收 發 消 息 、 消 息 格 式 轉 換(ConvertAndSend→Message)、關閉信道、關閉連接等等操作。針對於多個服務器連接,可以定義多個 Template。可以注入到任何需要收發消息的地方使用。
Messager Listener 消息 偵聽:
MessageListener 是 Spring AMQP 異步消息投遞的監聽器接口,它只有一個方法onMessage,用於處理消息隊列推送來的消息,作用類似於 Java API 中的 Consumer。
MessageListenerContainer:
MessageListenerContainer可以理解為MessageListener的容器,一個Container只有一個 Listener,但是可以生成多個線程使用相同的 MessageListener 同時消費消息。Container 可以管理 Listener 的生命周期,可以用於對於消費者進行配置。例如:動態添加移除隊列、對消費者進行設置,例如 ConsumerTag、Arguments、並發、消費者數量、消息確認模式等等。
轉換器 MessageConvertor:
MessageConvertor 的 作用?RabbitMQ 的消息在網絡傳輸中需要轉換成 byte[](字節數組)進行發送,消費者需要對字節數組進行解析。在 Spring AMQP 中,消息會被封裝為 org.springframework.amqp.core.Message對象。消息的序列化和反序列化,就是處理 Message 的消息體 body 對象。如果消息已經是 byte[]格式,就不需要轉換。如果是 String,會轉換成 byte[]。如果是 Java 對象,會使用 JDK 序列化將對象轉換為 byte[](體積大,效率差)。在 調 用 RabbitTemplate 的 convertAndSend() 方 法 發 送 消 息 時 , 會 使 用MessageConvertor 進行消息的序列化,默認使用 SimpleMessageConverter。在某些情況下,我們需要選擇其他的高效的序列化工具。如果我們不想在每次發送消息時自己處理消息,就可以直接定義一個 MessageConvertor。如何 自定義 MessageConverter ?例如:我們要使用 Gson 格式化消息:創建一個類,實現 MessageConverter 接口,重寫 toMessage()和 fromMessage()方法。
1.在導入Spring 的相關依賴之外,導入 RabbitMQ的依賴:
<!--rabbitmq依賴 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2. applicationContext.xml :
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitMQ.xml" /> <!-- 掃描指定package下所有帶有如 @Controller,@Service,@Resource 並把所注釋的注冊為Spring Beans --> <context:component-scan base-package="com.wuzz.*" /> <!-- 激活annotation功能 --> <context:annotation-config /> <!-- 激活annotation功能 --> <context:spring-configured /> </beans>
3.配置rabbitMQ.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd"> <!--配置connection-factory,指定連接rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" virtual-host="/" username="guest" password="guest" host="127.0.0.1" port="5672" /> <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 --> <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" /> <!--######分隔線######--> <!--定義queue --> <rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" /> <!--定義direct exchange,綁定MY_FIRST_QUEUE --> <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin"> <rabbit:bindings> <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey"> </rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--定義rabbit template用於數據的接收和發送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" /> <!--消息接收者 --> <bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean> <!--queue listener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" /> </rabbit:listener-container> <!--定義queue --> <rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" /> <!-- 將已經定義的Exchange綁定到MY_SECOND_QUEUE,注意關鍵詞是key --> <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin"> <rabbit:bindings> <rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="receiverSecond" class="com.gupaoedu.consumer.SecondConsumer"></bean> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" /> </rabbit:listener-container> <!--######分隔線######--> <!--定義queue --> <rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" /> <!-- 定義topic exchange,綁定MY_THIRD_QUEUE,注意關鍵詞是pattern --> <rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin"> <rabbit:bindings> <rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--定義rabbit template用於數據的接收和發送 --> <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="MY_TOPIC_EXCHANGE" /> <!-- 消息接收者 --> <bean id="receiverThird" class="com.gupaoedu.consumer.ThirdConsumer"></bean> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" /> </rabbit:listener-container> <!--######分隔線######--> <!--定義queue --> <rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" /> <!-- 定義fanout exchange,綁定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE --> <rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true" declared-by="connectAdmin" > <rabbit:bindings> <rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding> <rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 消息接收者 --> <bean id="receiverFourth" class="com.gupaoedu.consumer.FourthConsumer"></bean> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" /> </rabbit:listener-container> </beans>
4.生產者:
@Service public class MessageProducer { private Logger logger = LoggerFactory.getLogger(MessageProducer.class); @Autowired @Qualifier("amqpTemplate") private AmqpTemplate amqpTemplate; @Autowired @Qualifier("amqpTemplate2") private AmqpTemplate amqpTemplate2; /** * 演示三種交換機的使用 * * @param message */ public void sendMessage(Object message) { logger.info("Send message:" + message); // amqpTemplate 默認交換機 MY_DIRECT_EXCHANGE // amqpTemplate2 默認交換機 MY_TOPIC_EXCHANGE // Exchange 為 direct 模式,直接指定routingKey amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message); amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message); // Exchange模式為topic,通過topic匹配關心該主題的隊列 amqpTemplate2.convertAndSend("msg.Third.send","[Topic,msg.Third.send] "+message); // 廣播消息,與Exchange綁定的所有隊列都會收到消息,routingKey為空 amqpTemplate2.convertAndSend("MY_FANOUT_EXCHANGE",null,"[Fanout] "+message); } }
5.消費者,需要實現 MessageListener 接口:
public class FirstConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(FirstConsumer.class); public void onMessage(Message message) { logger.info("The first consumer received message : " + message.getBody()); } }
SpringBoot 集成 RabbitMQ:
基於下面這個結構圖去集成:
消費者的配置:
1.構建Springboot項目導入依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.Rabbit配置類,生成交換機,隊列,以及匹配規則:
@Configuration public class RabbitConfig { //1.定義三個交換機 @Bean public DirectExchange directExchange(){ return new DirectExchange("DIRECT_EXCHANGE"); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("TOPIC_EXCHANGE"); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("FANOUT_EXCHANGE"); } //2.定義四個隊列 @Bean public Queue firstQueue(){ return new Queue("FIRST_QUEUE"); } @Bean public Queue secondQueue(){ return new Queue("SECOND_DQUEUE"); } @Bean public Queue thirdQueue(){ return new Queue("THIRD_DQUEUE"); } @Bean public Queue fourthQueue(){ return new Queue("FOURTH_DQUEUE"); } //3.定義四個綁定關系 @Bean public Binding bindFirst(@Qualifier("firstQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("wuzz.test"); } @Bean public Binding bindSecond(@Qualifier("secondQueue") Queue queue, @Qualifier("topicExchange") TopicExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("*.wuzz.*"); } @Bean public Binding bindThird(@Qualifier("thirdQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding bindFourth(@Qualifier("fourthQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } }
3.定義4個消費者:
@Configuration @RabbitListener(queues = "FIRST_QUEUE") public class FirstConsumer { @RabbitHandler public void process(String msg){ System.out.println("First Queue received msg : " + msg); } } @Configuration @RabbitListener(queues = "SECOND_DQUEUE") public class SecondConsumer { @RabbitHandler public void process(String msg){ System.out.println("second Queue received msg : " + msg); } } @Configuration @RabbitListener(queues = "THIRD_DQUEUE") public class ThirdConsumer { @RabbitHandler public void process(String msg){ System.out.println("third Queue received msg : " + msg); } } @Configuration @RabbitListener(queues = "FOURTH_DQUEUE") public class FourthConsumer { @RabbitHandler public void process(String msg){ System.out.println("fourth Queue received msg : " + msg); } }
4.配置application.properties,定義鏈接地址等信息:
spring.application.name=spirng-boot-rabbitmq-consumer spring.rabbitmq.host=192.168.254.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
啟動消費者,可以通過管理頁面看到有4個鏈接通道及交換機隊列信息。
生產者:
1.構建SpringBoot項目導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.通過 RabbitTemplate 構建自己的生產者類,發送4條消息:
@Component public class MyProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(){ rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","wuzz.test","DIRECT_EXCHANGE message"); rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","hangzhou.wuzz.test","TOPIC_EXCHANGE hangzhou message"); rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","wenzhou.wuzz.test","TOPIC_EXCHANGE wenzhou message"); rabbitTemplate.convertAndSend("FANOUT_EXCHANGE","","FANOUT_EXCHANGE message"); } }
3.編寫測試類測試,並且啟動:
@RunWith(SpringRunner.class) @SpringBootTest public class AppTest { @Autowired private MyProducer myProducer; @Test public void send(){ myProducer.send(); } }
同樣需要配置application.properties,順利的話消費者方會收到消息如下,這樣就集成完了: