簡介
使用Erlang語言編寫的一種消息中間件。消息中間件是一種數據傳送的消息傳遞機制,換句話說,是一種軟件應用之間的通訊方式。
舉個栗子
消息中間件的作用之一是應用解耦。
拿取快遞為例,前幾年的快遞收取方式通常是由快遞員上門派件,那么也就是說快遞員需要與顧客建立直接聯系;近年來,快遞行業蒸蒸日上,大量的快件如果由人工派送,不僅效率低,資源耗費也相當大。
而現在不管是小區,還是學校,都專門設置了快遞櫃,菜鳥驛站。這個快遞櫃就可以類比消息中間件,快遞員無需把快件送到顧客手中,只要放入對應區域設置的快遞櫃中,顧客就可以自己來取。這樣就消除了快遞員與顧客的直接聯,也就是應用解耦。
基礎概念
Broker:簡單來說就是消息隊列服務器實體
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列,Exchange有4種類型:direct(默認點對點),fanout(發布訂閱), topic(匹配規則發布訂閱), 和headers,不同類型的Exchange轉發消息的策略有所區別
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列
Binding:綁定,它的作用就是把 exchange和 queue按照路由規則綁定起來
Routing Key:路由關鍵字, exchange根據這個關鍵字進行消息投遞
vhost:虛擬主機,一個 broker里可以開設多個 vhost,用作不同用戶的權限分離
producer:消息生產者,就是投遞消息的程序
consumer:消息消費者,就是接受消息的程序
channel:消息通道,在客戶端的每個連接里,可建立多個 channel,每個 channel代表一個會話任務
Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
消息發送、接受流程
首先生產者把消息投遞到exchange交換機,
exchange接受到消息后,根據消息的key和已經設置好的binging,進行消息路由,將消息投遞到一個或者多個queue里面,
消費者從queue中取出消息,開始執行相應的代碼
RabbitMQ三種分發策略
1.direct 單播
根據消息體中的路由鍵指定分發的隊列
2.fanout 廣播
忽略路由鍵,分發消息到所有隊列
3.topic 多播(模式匹配)
路由鍵可包含模式匹配字符(*代表一個字符,#代表一個或多個字符)
搭建RabbitMQ環境
1.下載安裝RabbitMQ鏡像
docker pull rabbitmq:3.8.1-management #下載鏡像 docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 68055d63a993 #啟動鏡像
-d :后台運行
-p:暴露端口
訪問:http://192.168.0.146:15672/(146服務器地址) 進入rabbitMQ管理頁面,默認用戶名和密碼為guest。
SpringBoot集成RabbitMQ
1.Pom(引入starter)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置application.properties
spring.rabbitmq.host=192.168.0.146 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
3.測試單播(點對點),多播(發布訂閱)
@Autowired RabbitTemplate rabbitTemplate; //單播發送 @Test void directTest() { Person person = new Person("mengY",25,"男"); rabbitTemplate.convertAndSend("exchange.direct","queue_01", person); } //單播接收 @Test void directReceive(){ Person person = (Person)rabbitTemplate.receiveAndConvert("queue_01"); System.out.println(person.getClass()); System.out.println(person.toString()); } //廣播發送 @Test void fanoutTest() { Person person = new Person("mengY",25,"男"); rabbitTemplate.convertAndSend("exchange.fanout","", person); } //廣播接收 @Test void fanoutReceive(){ Object obj = rabbitTemplate.receiveAndConvert("queue_03"); System.out.println(obj.getClass()); System.out.println(obj); }
rabboitMQ默認的是一java的方式序列化消息對象
上面的展示方式不太直觀,有時我們需要用json的方式來序列化消息對象,可以通過配置MessageConverter來實現
@Configuration public class RabbitConfig { /** * 消息轉換器 * @return */ @Bean public MessageConverter getMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
消息監聽
@EnableRabbit @SpringBootApplication public class DemoMqApplication { public static void main(String[] args) { SpringApplication.run(DemoMqApplication.class, args); } }
@EnableRabbit:開啟基於注解的rabbitMQ模式
@Service public class PersonService { /** * 監聽接受消息 */ @RabbitListener(queues = "queue_01") public void receive(Person person){ System.out.println("收到消息:"+person.toString()); } }
@RabbitListener:監聽消息隊列
@RabbitListener(queues = {"queue_03","queue_04"}):監聽多個隊列
AmqpAdmin管理組件
創建,刪除交換器,隊列
參考:https://blog.csdn.net/Leon_Jinhai_Sun/article/details/99692190
參考資料:
SpringBoot整合RabbitMQ之 典型應用場景實戰一