RabbitMQ-從基礎到實戰(4)— 消息的交換(中)


轉載請注明出處

0.目錄

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ

RabbitMQ-從基礎到實戰(2)— 防止消息丟失

RabbitMQ-從基礎到實戰(3)— 消息的交換(上)

RabbitMQ-從基礎到實戰(5)— 消息的交換(下)

RabbitMQ-從基礎到實戰(6)— 與Spring集成

1.簡介

本章節和官方教程相似度較高,英文好的可以移步官方教程

在上一章的例子中,我們創建了一個消費者,生產日志消息,廣播給兩個消費者,對消息進行不同的處理。這一節,我們將對它進行擴展,實現一些更加高級的功能,例如:使消費者A只接受error級別的日志保存到硬盤,消費者B接收所有級別的消息進行打印。

本文中涉及到的所有概念(包括前面幾章),都將摒棄個人經驗,以官方文檔為基礎進行講解,在書寫本文的同時,也是我對RabbitMQ的重新學習。

2.綁定

回顧一下上一章的隊列綁定代碼

// 把剛剛獲取的隊列綁定到logs這個交換中心上,
channel.queueBind(queueName, "logs", "");

這段代碼在消費者中,為什么生產者沒有?因為在RabbitMQ中消息是發送到交換中心(exchange)的,這在上一張已經重點強調過。

上述代碼可以理解成,queueName這個隊列對logs這個exchange中的消息感興趣,routingKey是""

在發送消息的basicPublish方法中,也有一個參數叫做routingKey,沒錯,他們是有關聯的,下面會介紹

在不同的exchange類型中,routingKey扮演的角色也相應的不同,比如上一章我們使用的fanout(扇出,多貼切的名字,想象一下WOW中盜賊的刀扇)將忽略routingKey,所有綁定在fanout類型的exchange上的隊列,都將接收到該exchange上的所有消息。

3.Direct Exchange

fanout類型的exchange沒有給我們太多的靈活性,direct類型的echange非常簡單,會匹配消息發布時的routingKey和queue的routingKey,完全相等則把消息放入該隊列。

image

如上圖,Q1綁定了orange,Q2綁定了black和green,就可以實現不同級別的日志用不同的消費者進行處理

我們看到Q2綁定了兩個routingKey,難道第二次綁定不會把第一次綁定覆蓋掉嗎?

實踐出真正,我們來試一下

image

  1. 聲明一個名為logs的exchange,類型換為direct,讓它通過routingKey的完全匹配去分發消息
  2. 然后把消息發送到名為logs的exchange上,routingKey是外面傳進來的

改造一下發送方法,輪流發送info和error信息

image

給Consummer隊列綁定兩個routingKey

image

激動人心的時刻到來了,跑一把

Duang,報錯了

image

報錯信息:

inequivalent arg 'type' for exchange 'logs' in vhost '/': received 'direct' but current is 'fanout', class-id=40, method-id=10)

大意就是logs exchange已經被聲明稱fanout了,不能再聲明成direct類型,RabbitMQ的隊列聲明方法和exchange聲明方法都是冪等的,如果沒有,就創建,如果有,參數相同,就不管,如果有了還用不同的參數重新聲明,就報錯

進入RabbitMQ控制台把logs刪除,重新執行

image

逆襲成功,消費一下看看

image

成功了,一個隊列可以綁定多個routingKey,這里注意先啟動消費者,因為前面的代碼里我們用的是臨時隊列,斷開連接后,隊列就刪除了,如果先啟動生產者,exchange接到消息后發現沒有隊列對它感興趣,就任性的把消息給丟掉了。

一個隊列可以綁定多個routingKey,反之,一個routingKey也可以綁定多個隊列,如下圖,感興趣的朋友可以自己試一下

image

如果綁定在一個direct類型的exchange上的隊列都使用同一個routingKey,那它就是一個fanout

4.實戰

要實現本章的需求,即Q1只接收error級別的日志寫到硬盤上,Q2接收error和info級別的日志打印出來

用direct類型的exchange來實現這個需求非常簡單,Q1綁定error,Q2綁定error和info即可,缺點是Q2需要綁定N個routingKey,N=日志級別數量,我們可以用一些編程的技巧來規避它

Sender的代碼上面已經改好了,把exchange換為direct,注意刪除原exchange,不再贅述

Q2綁定所有日志級別,我們用一個Enum來規避手動綁定

定義一個Enum

1 public enum LogType{
2     error,info;
3 }

用foreach語法糖進行循環綁定

1 //綁定所有類型
2 for(LogType logType: LogType.values()){
3     channel.queueBind(queueName, "logs", logType.name());
4 }

foreach是單線程的,這里也可以裝個逼用一下JAVA8的lambda,由於lambda是並行處理,所以外圍的try catch無效,需要在內層重新抓取異常,而且不能拋出,反而顯得代碼很不美好,裝逼失敗

1 IntStream.range(0, LogType.values().length).forEach(n->{
2     try {
3         channel.queueBind(queueName, "logs", LogType.values()[n].name());
4     } catch (IOException e) {
5         e.printStackTrace();
6     }
7 });

把另外一個Consumer改成只綁定error隊列

1 channel.queueBind(queueName, "logs", LogType.error.name());

然后,改造一下發送消息的地方,一開始我們用了一個while還有一組if else,看起來比較挫,別人看你代碼的時候,就不會覺得你很厲害,這樣和你不寫博客一樣,對你的工作是沒有好處的,我們把它改的高端一點

1 while(true){
2     boolean info = ++i%2==0;
3     String type = info?LogType.info.name():LogType.error.name();
4     sender.sendMessage(type +" message: "+i, type);
5     Thread.sleep(1000);
6 }

對比一下

image

是不是覺得自己厲害了很多?這就是編碼的藝術(得意臉)

好了,這一章沒有太多內容,跑一下看看結果

左邊的Consumer1,消費了info和error級別的日志,右邊的Consumer2,只消費了error級別的日志

5.結束語

這一章主要是介紹了RabbitMQ中direct類型的exchange,下一章將跟着官方教程的進度繼續介紹topic類型的exchange,以及下下章介紹用RabbitMQ實現RPC調用。之后則會介紹RabbitMQ與Spring的集成等與真實開發環境更相關的技術。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM