導入RabbitMQ相關依賴,當然要確保RabbitMQ環境搭建成功了。
pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
* 1 自動配置 * RabbitAutoConfiguration ---rabiitmq的自動配置類 * 自動配置了連接工廠ConnectionFactory * RabbitProperties封裝了RabbitMQ的配置,默認就有一寫配置 * RabbitTemplate:給RabbitMQ發送和接受消息。 * AmqpAdmin:RabbitMQ系統管理功能組件。
我們可以在測試中對RabbitMQ進行測試:
package com.xt.springbootamqp; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringBootRabbitAmqpApplicationTests { //自動注入RabbitTemplate,由他來發送和接受消息 @Autowired private RabbitTemplate rabbitTemplate; /** *@Description: 單播模式,點對點 *@date: 2019/6/17 * * RabbitTemplate中發送消息send()的重載方法: * @Override * public void send(Message message) throws AmqpException { * send(this.exchange, this.routingKey, message); * } * * @Override * public void send(String routingKey, Message message) throws AmqpException { * send(this.exchange, routingKey, message); * } * * @Override * public void send(final String exchange, final String routingKey, final Message message) throws AmqpException { * send(exchange, routingKey, message, null); * } * */ @Test public void contextLoads() {
// 使用send方法 rabbitTemplate.send(exchange,rountKey,message);
String str = "hello";
Message message = new Message(str.getBytes(),null);
rabbitTemplate.send("","",message);
} }
但是我們一般發送消息常用的不是send方法,而是convertAndSend(),當然該方法也有很多重載方法。如圖:
發送消息的過程:當生產者發送消息時,首先會根據指定的交換機exchange找到所屬的多個隊列,又因為與隊列綁定的路由鍵rountKey不同,消息會根據路由鍵發送至相對應的隊列。而消費者直接面對的是隊列,只需要根據隊列名字獲取隊列相應信息就能得到消息。
由於測試,我事先已經在RabbitMQ客戶端上新增了多個交換機個隊列。#表示匹配多個單詞 .號表示匹配單個單詞
Exchanges:wqq.direct(單播) ---> rountingkey: key.direct ---> queue.direct
---> rountingKey: key.direct2 ---> queue.direct2
wqq.fanout(廣播) ---> rountingkey: 廣播模式與路由鍵無關 ---> queue.fanout
---> rountingkey: ---> queue.fanout2
wqq.topic(發布訂閱) ---> rountingkey: #.topic ---> ceshi.topic 只要是路由鍵是xxx.topic都能匹配
---> rountingkey: queue.# ---> queue.#
單播模式:
單播模式,也就是點對點。一個消息只能發送到一個交換機下的一個隊列上去。
例如:將消息發送至exchange為wqq.direct所綁定的rountkey為key.direct的消息隊列上,消息體是msg
/** *@Description: 測試單播模式 *@date: 2019/6/17 */ @Test public void test1(){ //常用的發送消息的方法 // rabbitTemplate.convertAndSend(String exchange,String rountKey,object message); String msg = "測試單播directRabbitMQ"; rabbitTemplate.convertAndSend("wqq.direct","key.direct",msg); }
消費者獲取消息:
@Test public void getMsg(){ Object o = rabbitTemplate.receiveAndConvert("queue.direct"); System.out.println(o); }
廣播模式:
交換機廣播模式下與路由鍵無關,因為會直接向所屬的隊列全部發送該消息。
/** *@Description: 廣播模式 *@date: 2019/6/17 * 在廣播模式下,即使我設置了路由鍵,但是該交換機下的所欲隊列都會受到該消息,證明路由鍵無效 */ @Test public void fanoutTest(){ rabbitTemplate.convertAndSend("wqq.fanout","queue.fanout","測試廣播模式"); }
消費者接受消息:
@Test public void getFanoutMsg(){ Object o = rabbitTemplate.receiveAndConvert("queue.fanout"); System.out.println(o); }
發布訂閱模式:
發布訂閱模式它會根據消息的路由鍵去匹配路由鍵綁定的隊列,只要能匹配上,就能將消息發送至隊列上。
/** *@Description: 發布訂閱模式 *@date: 2019/6/17 */ @Test public void topicMsg(){ // 匹配#.topic的路由鍵或者adc.#的路由鍵 匹配..topic的路由鍵或者adc..的路由鍵 rabbitTemplate.convertAndSend("wqq.topic","abc.topic","發布訂閱topic模式"); } @Test public void topicMsg1(){ // 匹配#.topic的路由鍵或者queue.#的路由鍵 匹配..topic的路由鍵或者queue..的路由鍵 rabbitTemplate.convertAndSend("wqq.topic","queue.topic","發布訂閱topic模式"); }
雖然生產者發送消息和消費者接受消息測試通過了,但是還有一個·問題是:我們一般發送消息都不太可能是字符串,一般是對象或者集合。但是經過測試后,如果發送對象,那么消息是序列換之后的結果,不利於查看。
這是由於RabbitMQ使用的是默認的jdk序列化器,所以我們可以自定義序列化器,將對象類型轉換為JSON類型。
編寫配置類:
@Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ MessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; } }
監聽消息隊列:
一般在實際開發中一般使用監聽消息隊列當有消息發送至RabbitMQ中時,通過監聽器監聽到隊列中存在消息,然后將消息進行消費。
例子:如果有兩個系統,訂單系統、庫存系統,當訂單系統產生一個訂單,發送一個消息至RabbitMQ,庫存系統監聽到RabbitMQ隊列中產生了新的消息,然后對該消息進行消費,也就是減庫存操作。
步驟:
開啟基於RabbitMQ的注解:
package com.xt.springbootamqp; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** *@Description: *@date: 2019/6/17 * 1 自動配置 * RabbitAutoConfiguration ---rabiitmq的自動配置類 * 自動配置了連接工廠ConnectionFactory * RabbitProperties封裝了RabbitMQ的配置,默認就有一寫配置 * RabbitTemplate:給RabbitMQ發送和接受消息。 * AmqpAdmin:RabbitMQ系統管理功能組件。 * * */ @SpringBootApplication @EnableRabbit public class SpringBootRabbitAmqpApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRabbitAmqpApplication.class, args); } }
模擬訂單系統:
package com.xt.springbootamqp.controller; import com.xt.springbootamqp.service.OrderServcieImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Date: 2019/6/17 16:16 * @Description: 模擬用戶產生訂單 */ @RestController public class UserController { @Autowired private OrderServcieImpl orderServcie; @RequestMapping(value = "/addOrder") public void addOrder(){ orderServcie.addOrder(); } }
package com.xt.springbootamqp.service; import com.xt.springbootamqp.pojo.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Date: 2019/6/17 16:08 * @Description: 訂單系統 */ @Service public class OrderServcieImpl { /** *@Description: 模擬訂單系統產生訂單 *@date: 2019/6/17 */ @Autowired private RabbitTemplate rabbitTemplate; public void addOrder(){ //產生一個訂單 Order order = new Order(); order.setOrderId(88); order.setOrderName("小馬的訂單"); //將訂單發送至RabbitMQ中; rabbitTemplate.convertAndSend("wqq.topic","queue.topic",order); } }
模擬庫存系統:
package com.xt.springbootamqp.service; import com.xt.springbootamqp.pojo.Order; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.util.function.ObjDoubleConsumer; /** * @Date: 2019/6/17 16:18 * @Description: 模擬庫存系統,監聽消息隊列,當訂單系統查詢消息時,庫存系統自動減庫存 */ @Service public class StockServiceImpl { @RabbitListener(queues = {"queue.topic"}) //隊列可以寫多個,也就是可以監聽多個隊列 public void reduceStock(Order order){ //監聽到消息,自動減庫存 //獲取訂單信息 System.out.println(order.toString()); }
@RequestMapping(value = "/addOrder")
public void addOrderMsg(Message message){
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
}
}
雖然監聽消息成功了,但是我們有沒有發現,我們都是以RabbitMQ中已經有了交換器和隊列在測試,那么如果RabbitMQ中沒有交換器和隊列。而是需要我們自己手動添加組件,那該怎么辦呢?
AmpqAdmin 管理組件:
/** *@Description: 測試AmqpAdmin管理組件 *@date: 2019/6/17 */ @Autowired private AmqpAdmin amqpAdmin; /** *@Description: 創建exchange *@date: 2019/6/17 */ @Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("")); amqpAdmin.declareExchange(new FanoutExchange("")); amqpAdmin.declareExchange(new TopicExchange("")); }
自定義創建Exchange、隊列、綁定關系:
@Autowired private AmqpAdmin amqpAdmin; /** *@Description: 創建exchange、隊列、綁定規則 *@date: 2019/6/17 */ @Test public void test3(){ //創建exchange amqpAdmin.declareExchange(new DirectExchange("amqp.direct_exchange")); /* amqpAdmin.declareExchange(new FanoutExchange("")); amqpAdmin.declareExchange(new TopicExchange("")); */ //創建隊列 amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//是否持久化 //創建綁定關系 amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqp.direct_exchange","amapadmin_key",null)); }