人生終將是場單人旅途,孤獨之前是迷茫,孤獨過后是成長。
楔子
本篇是消息隊列RabbitMQ
的第三彈。
RabbitMQ的入門和RabbitMQ+SpringBoot的整合可以點此鏈接進去回顧,今天要講的是RabbitMQ
的交換機。
本篇是理解RabbitMQ
很重要的一篇,交換機是消息的第一站,只有理解了交換機的分發模式,我們才能知道不同交換機根據什么規則分發消息,才能明白在面對不同業務需求的時候應采用哪種交換機。
1. 🔍Exchange
先來放上幾乎每篇都要出現一遍的我畫了好久的RabbitMQ
架構圖。
前兩篇文中我們一直沒有顯式的去使用Exchange
,都是使用的默認Exchange
,其實Exchange
是一個非常關鍵的組件,有了它才有了各種消息分發模式。
我先簡單說說Exchange
有哪幾種類型:
-
fanout:
Fanout-Exchange
會將它接收到的消息發往所有與他綁定的Queue中。 -
direct:
Direct-Exchange
會把它接收到的消息發往與它有綁定關系且Routingkey
完全匹配的Queue中(默認)。 -
topic:
Topic-Exchange
與Direct-Exchange相似,不過Topic-Exchange不需要全匹配,可以部分匹配,它約定:Routingkey
為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞)。 -
header:
Header-Exchange
不依賴於RoutingKey或綁定關系來分發消息,而是根據發送的消息內容中的headers屬性進行匹配。此模式已經不再使用,本文中也不會去講,大家知道即可。
本文中我們主要講前三種Exchange
方式,相信憑借着我簡練的文字和靈魂的畫技給大家好好講講,爭取老嫗能解。
Tip:本文的代碼演示直接使用SpringBoot+RabbitMQ的模式。
2. 📕Fanout-Exchange
先來看看Fanout-Exchange
,Fanout-Exchange
又稱扇形交換機,這個交換機應該是最容易理解的。
Exchange
和Queue
建立一個綁定關系,Exchange
會分發給所有和它有綁定關系的Queue
中,綁定了十個Queue
就把消息復制十份進行分發。
這種綁定關系為了效率肯定都會維護一張表,從算法效率上來說一般是O(1),所以Fanout-Exchange
是這幾個交換機中查找需要被分發隊列最快的交換機。
下面是一段代碼演示:
@Bean
public Queue fanout1() {
return new Queue("fanout1");
}
@Bean
public Queue fanout2() {
return new Queue("fanout2");
}
@Bean
public FanoutExchange fanoutExchange() {
// 三個構造參數:name durable autoDelete
return new FanoutExchange("fanoutExchange", false, false);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(fanout1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(fanout2()).to(fanoutExchange());
}
為了清晰明了,我新建了兩個演示用的隊列,然后建了一個FanoutExchange
,最后給他們都設置上綁定關系,這樣一組隊列和交換機的綁定設置就算完成了。
緊接着編寫一下生產者和消費者:
public void sendFanout() {
Client client = new Client();
// 應讀者要求,以后代碼打印的地方都會改成log方式,這是一種良好的編程習慣,用System.out.println一般是不推薦的。
log.info("Message content : " + client);
rabbitTemplate.convertAndSend("fanoutExchange",null,client);
System.out.println("消息發送完畢。");
}
@Test
public void sendFanoutMessage() {
rabbitProduce.sendFanout();
}
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {
@RabbitListener(queues = "fanout1")
public void onMessage1(Message message, Channel channel) throws Exception {
log.info("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息已確認");
}
@RabbitListener(queues = "fanout2")
public void onMessage2(Message message, Channel channel) throws Exception {
log.info("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息已確認");
}
}
這兩段代碼都很好理解,不再贅述,有遺忘的可以去看RabbitMQ第一彈的內容。
其中發送消息的代碼有三個參數,第一個參數是Exchange
的名稱,第二個參數是routingKey
的名稱,這個參數在扇形交換機里面用不到,在其他兩個交換機類型里面會用到。
代碼的准備到此結束,我們可以運行發送方法之后run一下了~
項目啟動后,我們可以先來觀察一下隊列與交換機的綁定關系有沒有生效,我們在RabbitMQ控制台使用rabbitmqctl list_bindings
命令查看綁定關系。
關鍵部分我用紅框標記了起來,這就代表着名叫fanoutExchange
的交換機綁定着兩個隊列,一個叫fanout1
,另一個叫fanout2
。
緊接着,我們來看控制台的打印情況:
可以看到,一條信息發送出去之后,兩個隊列都接收到了這條消息,緊接着由我們的兩個消費者消費。
Tip: 如果你的演示應用啟動之后沒有消費信息,可以嘗試重新運行一次生產者的方法發送消息。
3. 📗Direct-Exchange
Direct-Exchange
是一種精准匹配的交換機,我們之前一直使用默認的交換機,其實默認的交換機就是Direct類型。
如果將Direct交換機都比作一所公寓的管理員,那么隊列就是里面的住戶。(綁定關系)
管理員每天都會收到各種各樣的信件(消息),這些信件的地址不光要標明地址(ExchangeKey)還需要標明要送往哪一戶(routingKey),不然消息無法投遞。
以上圖為例,准備一條消息發往名為SendService
的直接交換機中去,這個交換機主要是用來做發送服務,所以其綁定了兩個隊列,SMS隊列和MAIL隊列,用於發送短信和郵件。
我們的消息除了指定ExchangeKey
還需要指定routingKey
,routingKey
對應着最終要發送的是哪個隊列,我們的示例中的routingKey
是sms,這里這條消息就會交給SMS隊列。
聽了上面這段,可能大家對routingKey
還不是很理解,我們上段代碼實踐一下,大家應該就明白了。
准備工作:
@Bean
public Queue directQueue1() {
return new Queue("directQueue1");
}
@Bean
public Queue directQueue2() {
return new Queue("directQueue2");
}
@Bean
public DirectExchange directExchange() {
// 三個構造參數:name durable autoDelete
return new DirectExchange("directExchange", false, false);
}
@Bean
public Binding directBinding1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
}
@Bean
public Binding directBinding2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
}
新建兩個隊列,新建了一個直接交換機,並設置了綁定關系。
這里的示例代碼和上面扇形交換機的代碼很像,唯一可以說不同的就是綁定的時候多調用了一個with
將routingKey
設置了上去。
所以是交換機和隊列建立綁定關系的時候設置的routingKey
,一個消息到達交換機之后,交換機通過消息上帶來的routingKey
找到自己與隊列建立綁定關系時設置的routingKey
,然后將消息分發到這個隊列去。
生產者:
public void sendDirect() {
Client client = new Client();
log.info("Message content : " + client);
rabbitTemplate.convertAndSend("directExchange","sms",client);
System.out.println("消息發送完畢。");
}
消費者:
@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {
@RabbitListener(queues = "directQueue1")
public void onMessage1(Message message, Channel channel) throws Exception {
log.info("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息已確認");
}
@RabbitListener(queues = "directQueue2")
public void onMessage2(Message message, Channel channel) throws Exception {
log.info("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息已確認");
}
}
效果圖如下:
只有一個消費者進行了消息,符合我們的預期。
4. 📙Topic-Exchange
Topic-Exchange
是直接交換機的模糊匹配版本,Topic類型的交換器,支持使用"*"和"#"通配符定義模糊bindingKey,然后按照routingKey
進行模糊匹配隊列進行分發。
-
*
:能夠模糊匹配一個單詞。 -
#
:能夠模糊匹配零個或多個單詞。
因為加入了兩個通配定義符,所以Topic交換機的routingKey
也有些變化,routingKey
可以使用.
將單詞分開。
這里我們直接來用一個例子說明會更加的清晰:
准備工作:
// 主題交換機示例
@Bean
public Queue topicQueue1() {
return new Queue("topicQueue1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topicQueue2");
}
@Bean
public TopicExchange topicExchange() {
// 三個構造參數:name durable autoDelete
return new TopicExchange("topicExchange", false, false);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
}
新建兩個隊列,新建了一個Topic交換機,並設置了綁定關系。
這里的示例代碼我們主要看設置routingKey
,這里的routingKey
用上了通配符,且中間用.
隔開,這就代表topicQueue1
消費sms
開頭的消息,topicQueue2
消費mail
開頭的消息,具體不同往下看。
生產者:
public void sendTopic() {
Client client = new Client();
log.info("Message content : " + client);
rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
System.out.println("消息發送完畢。");
}
消費者:
@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {
@RabbitListener(queues = "topicQueue1")
public void onMessage1(Message message, Channel channel) throws Exception {
log.info("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息已確認");
}
@RabbitListener(queues = "topicQueue2")
public void onMessage2(Message message, Channel channel) throws Exception {
log.info("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息已確認");
}
}
這里我們的生產者發送的消息routingKey
是sms.liantong
,它就會被發到topicQueue1
隊列中去,這里消息的routingKey
也需要用.
隔離開,用其他符號無法正確識別。
如果我們的routingKey
是sms.123.liantong
,那么它將無法找到對應的隊列,因為topicQueue1
的模糊匹配用的通配符是*
而不是#
,只有#
是可以匹配多個單詞的。
Topic-Exchange
和Direct-Exchange
很相似,我就不再贅述了,通配符*
和#
的區別也很簡單,大家可以自己試一下。
后記
周一沒更文實在慚愧,去醫院抽血了,抽了三管,吃多少才能補回來
RabbitMQ已經更新了三篇了,這三篇的內容有些偏基礎,下一篇將會更新高級部分內容:包括防止消息丟失,防止消息重復消費等等內容,希望大家持續關注。
最近這段時間壓力挺大,優狐令我八月底之前升級到三級,所以各位讀者的贊對我很重要,希望大家能夠高抬貴手,幫我一哈~
好了,以上就是本期的全部內容,感謝你能看到這里,歡迎對本文點贊收藏與評論,👍你們的每個點贊都是我創作的最大動力。
我是耳朵,一個一直想做知識輸出的偽文藝程序員,我們下期見。