一、什么是RabbitMQ
RabbitMQ,遵循AMQP協議,由內在高並發的erlanng語言開發,用在實時的對可靠性要求比較高的消息傳遞上。
學過websocket的來理解rabbitMQ應該是非常簡單的了,websocket是基於服務器和頁面之間的通信協議,一次握手,多次通信。 而rabbitMQ就像是服務器之間的socket,一個服務器連上MQ監聽,而另一個服務器只要通過MQ發送消息就能被監聽服務器所接收。
但是MQ和socket還是有區別的,socket相當於是頁面直接監聽服務器。而MQ就是服務器之間的中轉站,例如郵箱,一個人投遞信件給郵箱,另一個人去郵箱取,他們中間沒有直接的關系,所以耦合度相比socket小了很多。
上圖是最簡單的MQ關系,生產者-MQ隊列-消費者
二、docker部署RabbitMQ
2.1、獲取鏡像
docker pull rabbitmq:management
2.2、創建並運行容器:
docker run -d --hostname my-rabbit --name rabbit -p 8080:15672 rabbitmq:management
--hostname:指定容器主機名稱
--name:指定容器名稱
-p:將mq端口號映射到本地
或在運行時設置用戶和密碼 docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management 15672:控制台端口號 5672:應用訪問端口號
2.3、查看rabbit運行狀況:
docker logs rabbit
2.4、訪問平台
使用 http://ip:15672訪問rabbit控制台
三、專業術語介紹
1. 生產者: 在現實生活中就好比制造商品的工廠,他們是商品的生產者。生產者只意味着發送。發送消息的程序稱之為一個生產者。
2. 隊列:rabbitMQ就像一個倉庫,一個倉庫里面可以 有很多隊列,每個隊列才是服務器之間消息通信的載體。
3.消費者:消費者就好比是從商店購買或從倉庫取走商品的人,消費的意思就是接收。消費者是一個程序,主要是等待接收消息。
4.交換器:在生產者和消息隊列之間的交換器,功能類似於網絡寬帶的交換機,可以根據不同的關鍵字,將信息發送到不同的隊列。
上圖的E就是交換器,通過關鍵字綁定,如果生產者給的消息中指定類型是ERROR,就給隊列1,如果是INFO或者WARN就給隊列2。當然也可以一個關鍵字綁定兩個隊列。(INFO等字段自己可以定義,也可以用*,#來匹配。*(星號)表示一個單詞#(井號)表示零個或者多個單詞。 比如ok.yes可以被ok.*匹配到)
5.臨時隊列:根據需求臨時創建的一條隊列,在斷開連接后自動刪除。
四、案例演示
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency>
// 可靠生產 // https://www.rabbitmq.com/confirms.html public class Producer { public static void main(String[] args) { // 1、創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 2、設置連接屬性 factory.setHost("46.654.44.123"); factory.setUsername("dfg"); factory.setPassword("efgy"); Connection connection = null; Channel channel = null; try { // 3、從連接工廠獲取連接 connection = factory.newConnection("生產者"); // 4、從鏈接中創建通道 channel = connection.createChannel(); // 進入confirm模式, 每次發送消息,rabbtiqm處理之后會返回一個對應的回執消息 AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); // 增加監聽器 ArrayList<String> queues = new ArrayList<>(); channel.addConfirmListener(new ConfirmListener () { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // deliveryTag 同一個channel中此條消息的編號 。 // 業務.. System.out.println("受理成功 " + queues.get((int) deliveryTag) + " " + multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 失敗重發 // queues.get((int) deliveryTag) System.out.println("受理失敗 " + deliveryTag); } }); // 定義fanout類型的交換器 channel.exchangeDeclare("ps_test", "fanout"); for (int i = 0; i < 10; i++) { // 消息內容 String message = "Hello " + i; queues.add(message); // 發送消息到ps_test交換器上 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties(); channel.basicPublish("ps_test", "", basicProperties, message.getBytes()); System.out.println("消息 " + message + " 已發送!"); } // 等待20秒 Thread.sleep(20 * 1000L); } catch (Exception e) { e.printStackTrace(); } finally { // 7、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8、關閉連接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
/** * 消息確認機制 */ public class Consumer { private static Runnable receive = new Runnable() { public void run() { // 1、創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 2、設置連接屬性 factory.setHost("46.654.44.123"); factory.setUsername("dfg"); factory.setPassword("efgy"); Connection connection = null; Channel channel = null; final String clientName = Thread.currentThread().getName(); try { // 3、從連接工廠獲取連接 connection = factory.newConnection("消費者"); // ###死信隊列相關:專門用來存儲 出錯 出異常的數據 channel = connection.createChannel(); // 1、 創建一個exchange channel.exchangeDeclare("dlq_exchange", "fanout"); // 2、 創建一個queue,和exchange綁定起來 channel.queueDeclare("dlq_queue1", false, false, false, null); channel.queueBind("dlq_queue1", "dlq_exchange", ""); // ######死信隊列結束 // 4、從鏈接中創建通道 channel = connection.createChannel(); // 代碼定義交換器 channel.exchangeDeclare("ps_test", "fanout"); // 還可以定義一個臨時隊列,連接關閉后會自動刪除,此隊列是一個排他隊列 String queueName = "queue1"; // 隊列中有死信產生時,消息會轉發到交換器 dlq_exchange。 Map<String, Object> args = new HashMap<String, Object> (); args.put("x-dead-letter-exchange", "dlq_exchange"); channel.queueDeclare(queueName, false, false, false, args); // 將隊列和交換器綁定 channel.queueBind(queueName, "ps_test", ""); // 監聽隊列 Channel finalChannel = channel; channel.basicConsume(queueName, false, "消費者-手動回執", new DefaultConsumer (finalChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("收到消息: " + new String(body)); // TODO 業務處理 long deliveryTag = envelope.getDeliveryTag(); // 模擬業務處理耗時 Thread.sleep(1000L); // 正常消費 // finalChannel.basicAck(deliveryTag, false); // 異常消費 finalChannel.basicNack(envelope.getDeliveryTag(), false, false); } catch (InterruptedException e) { // 異常消費, requeue參數 true重發,false不重發(丟棄或者移到DLQ死信隊列) // finalChannel.basicNack(envelope.getDeliveryTag(), false, false); e.printStackTrace(); } } }); System.out.println(clientName + " 開始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 8、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 9、關閉連接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(receive, "c1").start(); } }
五、AMQP 的基本概念
AMQP(RabbitMQ)必須由三部分:交換器、隊列和綁定
1、Message(消息)
消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(傳輸模式,指出該消息可能需要持久化存儲)等。
2、Publisher
消息生產者,也是一個向交換器發布消息的客戶端應用程序,就是投遞消息的程序。
3、Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。消息交換機,它指定消息按什么規則,路由到哪個隊列。
4、Routing Key
路由關鍵字,exchange根據這個關鍵字進行消息投遞。
5、Binding(綁定)
用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。它的作用就是把exchange和queue按照路由規則綁定起來。綁定其實就是關聯了exchange和queue,或者這么說:queue對exchange的內容感興趣,exchange要把它的Message deliver到queue。
6、Queue(消息隊列)
消息的載體,每個消息都會被投到一個或多個隊列,等待消費者連接到這個隊列將其取走。它是消息的容器,也是消息的終點。
7、Connection
網絡連接,例如一個TCP連接。
8、Channel(信道,通道)
消息通道,在客戶端的每個連接里,可建立多個channel。多路復用連接中的一條獨立雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀TCP都是非常昂貴的開銷,所以引入了信道的概念以達到復用一條TCP連接的目的。
9、Consumer
消息消費者,表示一個從消息隊列中取得消息的客戶端應用程序,就是接受消息的程序。
10、Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。一個broker里可以有多個vhost,用作不同用戶的權限分離。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個vhost本質上就是一個mini版的rabbitmq服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在連接時指定,rabbitmq默認的vhost是 / 。
11、Broker
表示消息隊列服務器實體。它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸。
六、AMQP中的消息路由
生產者把消息發布到Exchange上,消息最終到達隊列並被消費者接收,而Binding決定交換器的消息應該發送到那個隊列。如下圖所示:
七、Exchange類型
Exchange分發消息時根據類型的不同分發策略有區別,目前共有四種類型:direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由鍵,此外headers交換器和direct交換器完全一致,但性能差很多,目前幾乎用不到。且看direct、fanout、topic這三種類型。
1、direct類型
消息中的路由鍵routing key如果和Binding中的binding key一致,交換器就將消息發到對應的隊列中去。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換器要求路由鍵為“dog”,則只轉發routing key標記為“dog”的消息,不會轉發“dog.puppy”等等。它是完全匹配、單傳播的模式。
Driect exchange的路由算法非常簡單:通過bindingkey的完全匹配,可以用下圖來說明:
Exchange和兩個隊列綁定在一起,Q1的bindingkey是orange,Q2的binding key是black和green。當Producer publish key是orange時,exchange會把它放到Q1上,如果是black或green就會到Q2上,其余的Message被丟棄。
2、fanout類型
每個發到fanout類型交換器的消息都會分到所有綁定的隊列上去。fanout交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。類似於子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout類型轉發消息是最快的。 如下圖所示:
3、topic類型
topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:#
和*
,#
匹配0個或多個單詞,*
只能匹配一個單詞。
對於Message的routing_key是有限制的,不能是任意的。格式是以點號“.”分割的字符表。比如:”stock.usd.nyse”,“nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。對於routing_key,有兩個特殊字符#
和*
,#
匹配0個或多個單詞,*
只能匹配一個單詞。如下圖所示:
Producer發送消息時需要設置routing_key,routing_key包含三個單詞和兩個點號,第一個key描述了celerity(靈巧),第二個是color(色彩),第三個是物種。
在這里我們創建了兩個綁定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:Q1感興趣所有orange顏色的動物;Q2感興趣所有rabbits和所有的lazy的。
例如:rounting_key 為 “quick.orange.rabbit”將會發送到Q1和Q2中。rounting_key 為”lazy.orange.rabbit.hujj.ddd”會被投遞到Q2中,#匹配0個或多個單詞。
八、ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
1、Connection
Connection是Rabbitmq的socket連接,它封裝了socket協議相關部分邏輯。
2、ConnectionFactory
ConnectionFactory是connection的制造工廠。
3、Channel
Channel是我們與rabbitmq打交道的最重要的一個接口,大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
九、任務分發機制
1、Round-robin dispathching 循環分發
RabbbitMQ的分發機制非常適合擴展,而且它是專門為並發程序設計的,如果現在load加重,那么只需要創建更多的Consumer來進行任務處理。
2、Message acknowledgment 消息確認
為了保證數據不被丟失,RabbitMQ支持消息確認機制,為了保證數據能被正確處理而不僅僅是被Consumer收到,這就需要在處理完數據之后發送一個確認ack。
在處理完數據之后發送ack,就是告訴RabbitMQ數據已經被接收並且處理完成,RabbitMQ可以將消息從隊列中移除了。如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer,這樣就保證在Consumer異常退出情況下數據也不會丟失。
RabbitMQ沒有用到超時機制,它僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理,一個消費者處理消息的時間再長也不會導致該消息被發送給其他消費者,即RabbitMQ給了Consumer足夠長的時間來做數據處理。如果忘記ack,那么當Consumer退出時,Mesage會被重新分發,從而導致隊列中的累積的消息越來越多,然后RabbitMQ會占用越來越多的內存。
3、Message durability 消息持久化
如果我們希望即使在rabbitmq服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置成可持久化的(durable),這樣就可以保證絕大部分情況下我們的rabbitmq消息不會丟失。但依然解決不了小概率丟失事件的發生(例如rabbitmq服務器已經接收到了生產者的消息,但還沒來得及持久化該消息時rabbitmq服務器就斷電了)。如果也要將這種小概率事件管理起來就需要使用到事務了。要持久化隊列需要在聲明時指定durable=True;這里要注意,隊列的名字一定要是Broker中不存在的,不然不能改變此隊列的任何屬性。隊列和交換機有一個創建時候指定的標志durable,durable的唯一含義就是讓具有這個標志的隊列和交換機會在重啟之后重新建立。
消息持久化包括3部分
(1)exchange持久化,在聲明時指定durable => true
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久的
(2)queue持久化,在聲明時指定durable => true
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久的
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)。
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
你可能也注意到了,分發機制不是那么優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取余后的,它不管Consumer是否還有unacked Message,只是按照這個默認的機制進行分發。那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那么Rabbit是如何處理這種問題呢?

通過basic.qos方法設置prefetch_count=1,如下設置
channel.basic_qos(prefetch_count=1)
這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它。但是這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。
5、分發到多個Consumer
Direct Exchange:直接匹配,通過Exchange名稱+RountingKey來發送與接收消息。
Fanout Exchange:廣播訂閱,向所有的消費者發布消息,但是只有消費者將隊列綁定到該路由器才能收到消息,忽略Routing Key。
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*
或#
,RoutingKey命名采用英文句點來分隔多個詞,只有消息將隊列綁定到該路由器且指定RoutingKey符合匹配規則時才能收到消息。
Headers Exchange:消息頭訂閱,消息發布前為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息,同時需要定義類似的鍵值對請求頭(如
x-mactch=all或者x_match=any),只有請求頭與消息頭匹配,才能接收消息,忽略RoutingKey。
默認的exchange:如果用空字符串去聲明一個exchange,那么系統就會使用”amq.direct”這個exchange。我們創建一個queue時,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去。如下:
channel.BasicPublish("", "TaskQueue", properties, bytes);
因為在第一個參數選擇了默認的exchange,而我們聲明的隊列叫TaskQueue,所以默認的,它要新建一個也叫TaskQueue的routingKey,並綁定在默認的exchange上,導致了我們可以在第二個參數routingKey中寫TaskQueue,這樣它就會找到定義的同名的queue並把消息放進去。
如果有兩個接收程序都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行為是負載均衡的,也就是說第一個是程序1收到,第二個是程序2收到,以此類推。
如果有兩個接收程序用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行為是復制的,即每個程序都會收到這個消息的副本。行為相當於fanout類型的exchange。
多個queue綁定同一個key也是可以的,對於下圖的例子,Q1和Q2都綁定了black,對於routing key是black的Message,會被deliver到Q1和Q2,其余的Message都會被丟棄。
十、web控制台手動演示
1.建立兩個隊列
2.新建一個交換機
點進剛剛新建的交換機里面看,會發現Bindings提醒的是no bindings,他沒有綁定任何東西
那么接下來要做的事就是將之前創建的兩個隊列和這個交換機綁定起來;因為fanout是廣播類型,所以key值就沒有綁定
3.代碼發送消息
public class Producer { public static void main(String[] args) { // 1、創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 2、設置連接屬性 factory.setHost("123.456.654.54"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、從連接工廠獲取連接 connection = factory.newConnection("生產者"); // 4、從鏈接中創建通道 channel = connection.createChannel(); /** * 5、聲明(創建)隊列 * 如果隊列不存在,才會創建 * RabbitMQ 不允許聲明兩個隊列名相同,屬性不同的隊列,否則會報錯 * * queueDeclare參數說明: * @param queue 隊列名稱 * @param durable 隊列是否持久化 * @param exclusive 是否排他,即是否為私有的,如果為true,會對當前隊列加鎖,其它通道不能訪問,並且在連接關閉時會自動刪除,不受持久化和自動刪除的屬性控制 * @param autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除 * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中所有消息的生命周期等等 */ // channel.queueDeclare("queue1", false, false, false, null); // 消息內容 String message = "Hello World!"; // 6、發送消息* channel.basicPublish("faout-e", "", null, message.getBytes()); System.out.println("消息已發送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 7、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8、關閉連接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
4.web頁面發送消息
5.進入頁面查看測試的消息
上面演示的是fanout類型的交換機類型,下來接着來演示一個direct類型的;
1.進入之前創建好的交換進里,綁定隊列,這里面我取了KEY值
2.發送測試數據
3.查驗數據,會發現只有隊列1收到消息,隊列3沒有消息
我們常用的有三個類型,接下來演示最后一個類型topic類型;
1.和前面一樣,進入對應交換機綁定隊列
2.發送測試數據
3.驗證,發現1和4通過通配符都接收到消息了
最后想了想還是把headers演示下,雖然現在已經棄用了
1.和前面樣進入交換機綁定隊列
2.發送測試數據
3.驗證查看