PS:近期在南寧出差,工作比較忙,所以更新會比較慢。
說到消息通信,可能我們首先會想到的是郵箱,QQ,微信,短信等等這些通信方式,這些通信方式都有發送者,接收者,還有一個中間存儲離線消息的容器。但是這些通信方式和我們要講的 RabbitMQ 的通信模型是不一樣的,比如和郵件的通信方式相比,郵件服務器基於 POP3/SMTP 協議,通信雙方需要明確指定,並且發送的郵件內容有固定的結構。而 RabbitMQ 服務器基於 AMQP 協議,這個協議是不需要明確指定發送方和接收方的,而且發送的消息也沒有固定的結構,甚至可以直接存儲二進制數據,並且和郵件服務器一樣,也能存儲離線消息,最關鍵的是 RabbitMQ 既能夠以一對一的方式進行路由,還能夠以一對多的方式進行廣播。
下面這張圖是大致展示了 RabbitMQ 消息通信的過程:
ps:看不懂沒關系,后面會通過具體的例子進行講解。
1、生產者和消費者
在 RabbitMQ 的通信過程中,有兩個主要的角色:生產者和消費者。類比於郵件通信的發送方和接收方。
這里首先我們要明確 RabbtiMQ 服務器是不能夠產生數據的,正如同其名字——消息中間件,是一個用來傳遞消息的中間商。生產者產生創建消息,然后發布到代理服務器(RabbitMQ),而消費者則從代理服務器獲取消息(不是直接找生產者要消息),而且在實際應用中,生產者和消費者也是可以角色互相轉換的,所以當我們應用程序連接到 RabbitMQ 服務器時,必須要明確我是生產者呢還是消費者。
2、消息
生產者創建消息,然后發布到 RabbitMQ 服務器中,那么什么是消息?
這里的消息分為兩部分:有效內容和內容標簽。
①、有效內容:可以是任何內容,一個數組,一個集合,甚至二進制數據都可以。RabbitMQ 不會在意你發什么數據,盡管發就行了。
②、內容標簽:描述有效內容,是 RabbitMQ 用來決定誰將獲得消息。前面說的郵件通信,必須明確指定發送方地址和收件方地址,而基於 AMQP 協議的 RabbitMQ 則是通過生產者發送消息附帶的內容標簽將消息發送個感興趣的消費者。
后面我們會詳細解析標簽是什么,這里只需要知道生產者會創建消息並設置標簽。注意最上面的大圖,一般來說生產者創建消息會設置標簽,但是傳輸到消費者那里就沒有標簽了,除非你在有效內容中說明誰是生產者,一般消費者是不知道誰產生的消息的。
3、信道
生產者產生了消息,然后發布到 RabbitMQ 服務器,發布之前肯定要先連接上服務器,也就是要在應用程序和rabbitmq 服務器之間建立一條 TCP 連接,一旦連接建立,應用程序就可以創建一條 AMQP 信道。
信道是建立在“真實的”TCP 連接內的虛擬連接,AMQP 命令都是通過信道發送出去的,每條信道都會被指派一個唯一的ID(AMQP庫會幫你記住ID的),不論是發布消息、訂閱隊列或者接收消息,這些動作都是通過信道來完成的。
可能有人會問,為什么不直接通過 TCP 連接來發送AMQP命令呢?
這里原因是效率問題,因為對於操作系統來說,每次建立和銷毀 TCP 會話是非常昂貴的開銷,而實際系統中,比如電商雙十一,每秒鍾高峰期成千上萬條連接,一般來說操作系統建立TCP連接是有數量限制的,那么這就會遇到瓶頸。
引入信道的概念,我們可以在一條 TCP 連接上創建 N多個信道,這樣既能發送命令,也能夠保證每條信道的私密性,我們可以將其想象為光纖電纜。
4、交換器和隊列
截取上面的一部分圖:
交換器和隊列都是 RabbitMQ 服務器的一部分,我們知道生產者會將消息發送到 RabbitMQ 服務器,而進入該服務器后,首先進入交換機部分,然后由交換器根據消息附帶的內容標簽,將消息綁定到相應的隊列。我們首先來看什么是隊列:
①、容納消息的場所,生產者發送到RabbitMQ服務器的消息會在隊列中等待消費者消費。
②、隊列是 RabbitMQ 服務器中最后的終點(除非消息進入了黑洞,黑洞的概念下面會介紹)。
③、隊列可以實現負載均衡,我們可以增加一堆消費者,然后讓 RabbitMQ 以循環的方式來均勻的分配消息。
搞清楚了隊列是什么了,那么消息是如何到達隊列的呢?沒錯,就是通過交換器。
消息進入RabbitMQ 服務器時,會首先將消息發送到交換器,然后交換器會根據特定的路由算法以及消息的內容標簽將消息綁定到相應的隊列。在 AMQP 協議中有四種交換器:direct、fanout、topic和 headers,每種交換器都實現了不同的路由算法,這也對應 RabbitMQ 工作的幾種不同方式,這是重點,后面博客會進行詳細介紹。
5、虛擬主機
最上面那張大圖,我畫了虛擬主機A以及虛擬主機B,說明在 RabbitMQ 服務器中存在着多個虛擬主機,那么虛擬主機到底是什么?
首先我們拋出這樣一個問題,一個 RabbitMQ 肯定不是只服務一個應用程序,那么多個應用程序同時使用 RabbitMQ 服務器,如何保證彼此之間不會沖突?
答案就是使用虛擬主機,虛擬主機其實就是一個迷你版的RabbitMQ 服務器,它擁有自己的交換器和隊列,更重要的是虛擬主機擁有自己的權限機制,一個服務器能夠創建多個虛擬主機。那么我們在使用RabbitMQ服務器的時候,只需要將一個應用程序對應一個虛擬主機,這種各個實例間邏輯上的分離就能夠保證不同的應用程序安全的傳遞消息。
默認的虛擬主機是“/”。
6、簡單實例
介紹完RabbitMQ 消息通信過程中的一些基本概念后,下面我們通過一個代碼實例來實際感受一下。
這是一個Maven工程,首先我們看 pom.xml 文件:導入 amqp-client 依賴即可
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ys.rabbitmq</groupId> <artifactId>RabbitMQTest</artifactId> <version>1.0-SNAPSHOT</version> <packaging>war</packaging> <name>RabbitMQTest Maven Webapp</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> </dependencies> </project>
生產者:
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.ys.utils.ConnectionUtil; 6 7 /** 8 * Create by hadoop 9 */ 10 public class Send { 11 private final static String QUEUE_NAME = "hello"; 12 13 public static void main(String[] args) throws Exception{ 14 //1、獲取連接 15 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 16 //2、聲明通道 17 Channel channel = connection.createChannel(); 18 //3、聲明(創建)隊列 19 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20 //4、定義消息內容 21 String message = "hello rabbitmq "; 22 //5、發布消息 23 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 24 System.out.println("[x] Sent'"+message+"'"); 25 //6、關閉通道和連接 26 channel.close(); 27 connection.close(); 28 } 29 }
消費者:
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.QueueingConsumer; 6 import com.ys.utils.ConnectionUtil; 7 8 9 /** 10 * Create by hadoop 11 */ 12 public class Recv { 13 14 private final static String QUEUE_NAME = "hello"; 15 16 public static void main(String[] args) throws Exception{ 17 //1、獲取連接 18 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 19 //2、聲明通道 20 Channel channel = connection.createChannel(); 21 //3、聲明隊列 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 //4、定義隊列的消費者 24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 25 //5、監聽隊列 26 channel.basicConsume(QUEUE_NAME,true,queueingConsumer); 27 //6、獲取消息 28 while (true){ 29 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [x] Received '" + message + "'"); 32 } 33 } 34 35 }
工具類:ConnectionUtil
1 package com.ys.utils; 2 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 /** 7 * Create by hadoop 8 */ 9 public class ConnectionUtil { 10 11 public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{ 12 //1、定義連接工廠 13 ConnectionFactory factory = new ConnectionFactory(); 14 //2、設置服務器地址 15 factory.setHost(host); 16 //3、設置端口 17 factory.setPort(port); 18 //4、設置虛擬主機、用戶名、密碼 19 factory.setVirtualHost(vHost); 20 factory.setUsername(userName); 21 factory.setPassword(passWord); 22 //5、通過連接工廠獲取連接 23 Connection connection = factory.newConnection(); 24 return connection; 25 } 26 }