github地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/spb-brian-query-service
1.RabbitMQ簡介
AMQP(高級消息隊列協議)是一個異步消息傳遞所使用應用層協議規范,為面向消息中間件設計,基於此協議的客戶端與消息中間件可以無視消息來源傳遞消息,不受客戶端、消息中間件、不同的開發語言環境等條件的限制;
涉及概念解釋:
Server(Broker):接收客戶端連接,實現AMQP協議的消息隊列和路由功能的進程;
Virtual Host:虛擬主機的概念,類似權限控制組,一個Virtual Host里可以有多個Exchange和Queue。
Exchange:交換機,接收生產者發送的消息,並根據Routing Key將消息路由到服務器中的隊列Queue。(這個有點類似於Nginx服務器的概念)
ExchangeType:交換機類型決定了路由消息行為,RabbitMQ中有三種類型Exchange,分別是fanout、direct、topic;
Message Queue:消息隊列,用於存儲還未被消費者消費的消息;
Message:由Header和body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、優先級是多少、由哪個Message Queue接收等;body是真正需要發送的數據內容;
BindingKey:綁定關鍵字,將一個特定的Exchange和一個特定的Queue綁定起來。
2.RabbitMQ運行機制
2.1 Exchange類型
生產者發送消息不會向傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列在將消息以推送或者拉取方式給消費者進行消費,這點Nginx有點類似。
交換機的作用根據具體的路由策略分發到不同的隊列中,交換機有四種類型。
Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的
Fanout exchange(扇型交換機)將消息路由給綁定到它身上的所有隊列
Topic exchange(主題交換機)隊列通過路由鍵綁定到交換機上,然后,交換機根據消息里的路由值,將消息路由給一個或多個綁定隊列
Headers exchange(頭交換機)類似主題交換機,但是頭交換機使用多個消息屬性來代替路由鍵建立路由規則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則
Headers 匹配 AMQP 消息的 header 而不是路由鍵, headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:
2.1.1 Direct Exchange
消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。
路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,
不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。
2.1.2 Fanout Exchange
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。
fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。
很像子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。
2.1.3 Topic Exchange
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。
它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。
#匹配0個或多個單詞,*匹配一個單詞。
3.RabbitMQ安裝
基於docker的國內鏡像安裝(3-management帶管理界面的rabbitmq),關於docker三分鍾上手和常用指令這篇博客有匯總:https://www.cnblogs.com/hlkawa/p/9742015.html
> docker pull registry.docker-cn.com/library/rabbitmq:3-management
啟動rabbitmq(-d 后台啟動 -p 端口映射 5672 連接rabbirmq的端口 15672訪問rabbitmq web管理界面的端口)
> docker run -d -p 5672:5672 -p 15672:15672 --name brian_rabbitmq xxxxxx(鏡像name或者鏡像ID)
rabbitmq的管理web訪問url: ip:15672,默認的賬戶密碼guest/guest
4.RabbitTemplate發送接受消息&序列化機制
4.1 引用依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
4.2 AmqpAdmin創建和刪除 Queue, Exchange,Binding
ManageMQService.java
package com.kawa.mq; import com.kawa.config.Contents; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class ManageMQService { @Autowired AmqpAdmin amqpAdmin; public void createExchange(String exchangeName,String mqType){ if(mqType.equals(Contents.DIRECT_EXCHANGE)){ amqpAdmin.declareExchange(new DirectExchange(exchangeName)); } if(mqType.equals(Contents.FANOUT_EXCHANGE)){ amqpAdmin.declareExchange(new FanoutExchange(exchangeName)); } if(mqType.equals(Contents.TOPIC_EXCHANGE)){ amqpAdmin.declareExchange(new TopicExchange(exchangeName)); } } public void removeExchange(String exchangeName){ amqpAdmin.deleteExchange(exchangeName); } public void createQueue(String queueName){ amqpAdmin.declareQueue(new Queue(queueName,true)); } public void removeQueue(String queueName){ amqpAdmin.deleteQueue(queueName); } public void createBinding(String queueName, String exchangeName, String routingKey){ amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null)); } public void removeBinding(String queueName, String exchangeName, String routingKey){ amqpAdmin.removeBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null)); } }
4.3 使用RabbitTemplate發送消息
SendMessageService.java
package com.kawa.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class SendMessageService { @Autowired RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object obj){ //Message需要自己構造一個;定義消息體和消息頭 //rabbitTemplate.send(exchange,routingKey,message); //object默認當成消息體,只需要傳入發送對象,自動序列化發送給rabbitmq rabbitTemplate.convertAndSend(exchange,routingKey,obj); } }
4.4 測試創建 Queue, Exchange,Binding和發送消息
SpbDemoApplicationTests.java
@Test public void sendMessage() { manageMQService.createQueue("brian.test"); manageMQService.createExchange("brian",Contents.DIRECT_EXCHANGE); manageMQService.createBinding("brian.test","brian","mymq"); Brian brian = new Brian(); User user = new User(); user.setId((long) 12345678); user.setUsername("cassiel"); user.setPassword("#fyds"); List<String> list = new ArrayList<>(); list.add("我"); list.add("愛"); list.add("你"); list.add("中"); list.add("國"); Map<String,Object> map = new HashMap<>(); map.put("123","包郵"); brian.setKawadate(new Date()); brian.setLists(list); brian.setObj(map); brian.setUser(user); sendMessageService.sendMessage("brian","mymq",brian); }
查看結果
4.5 添加@RabbitListener監聽和處理消息
在使用RabbitListener注解接收消息時,需要在啟動類上加上數據@EnableRabbit
BrianService.java
package com.kawa.sercice; import com.kawa.pojo.Brian; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BrianService { @RabbitListener(queues = "brian.test") public void receiveMessage(Brian brian){ System.out.println("接收到的消息體:" + brian); } }
4.6 啟動工程查看測試結果
查看到隊列里面消息已經沒有了