為什么要使用MQ消息中間件?它解決了什么問題?關於為什么要使用消息中間件?消息中間件是如何做到同步變異步、流量削鋒、應用解耦的?網上已經有很多說明,我這里就不再說明了,讀者可以參考(https://www.jianshu.com/p/2820561158c4)。我在接下來的RabbitMq系列博客里會將官方的講解翻譯過來,同時加以自己的理解整理成博客,希望能和大家共同交流,一起進步。
RabbitMq原理圖
1、RabbitMq簡介
RabbitMq是一個消息中間件:它接收消息、轉發消息。你可以把它理解為一個郵局:當你向郵箱里寄出一封信后,郵遞員們就能最終將信送到收信人手中。類似的,RabbitMq就好比是一個郵箱、郵局和郵遞員。RabbitMq和郵局最大的區別是:RabbitMq接收、轉發的都是二進制數據塊--消息,而不是紙質的數據文件。
RabbitMq、消息相關術語如下:
生產者:生產者只發送消息,發送消息的程序即為生產者:
消息隊列:消息隊列就相當於RabbitMq中的郵箱名稱。盡管消息在你的程序和RabbitMq中流動,但它只能存儲在消息隊列中。隊列本質上是一個大的消息緩存,它能存多少消息,取決於主機的內存和磁盤限制。多個生產者可以往同一個消息隊列中發送消息;多個消費者可以從同一個隊列中獲取數據。我們以下列圖形來表示一個消息隊列:
消費者:消費者是一個等待接收消息的程序:
注意:生產者、消費者和RabbitMq可以在不同的機器上;在很多的應用中,一個生產者同時也可能是消費者。
2、“Hello World!”
在這小節里,我們將寫一個消息生產者用來發送消息、一個消息消費者來消費消息(接收消息並打印出來)。
在下面圖形中,“P”是我們的生產者,“C”是我們的消費者,中間的紅框是我們的消息隊列,保存了從生產者那里接收到的准備轉發到消費方的消息。
Java客戶端類庫說明:
RabbitMq使用多種協議,本指南使用AMQP 0-9-1協議,該協議是一個開源的、通用的消息協議。RabbitMq有多種語言的客戶端,這里我們使用JAVA語言的客戶端做實驗。通過以下地址下載RabbitMq客戶端jar包和依賴包:
把這三個jar包拷貝到你的工作目錄,包括后面教程要新建的java文件。
2.1 發送消息
生產者連接RabbitMq,發送一條簡單的消息”Hello World!“后就退出。
在Send.java類中,需要引入以下依賴包:
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.Channel;
給隊列起個名字:
1 public class Send { 2 private final static String QUEUE_NAME = "hello"; 3 public static void main(String[] argv) throws Exception { 4 ... 5 } 6 }
創建連接到服務器的連接Collection:
1 onnectionFactory factory = new ConnectionFactory(); 2 factory.setHost("localhost"); 3 try (Connection connection = factory.newConnection(); 4 Channel channel = connection.createChannel()) { 5 6 }
這個連接即套接字連接,為我們處理協議版本協商和身份驗證等。這里我們連接一個本地的RabbitMq:因此是localhost,如果你想要連接一個遠程機器上的RabbitMq,只需要把localhst改成那台機器的計算機名或是IP地址。
創建完連接之后,我們繼續創建一個信道:Channel。我們需要使用try-with-resource表達式,因為Connection和Channel都實現了JAVA接口Closeable,屬於資源,需要關閉,這樣我們就不需要顯示地在我們的代碼中進行關閉了。(關於信道,請參考文章最頂部的RabbitMq原理圖,是TCP里面的虛擬鏈接,例如:電纜相當於一個TCP,信道就是里面的一個獨立光纖,一條TCP上面創建多條信道是沒有問題的;TCP一旦打開就分創建AMQP信道;無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的)。
為了發送消息,我們還必須要定義一個需要發送到的消息隊列,這些都要使用try-with-resource表達式:
1 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 2 String message = "Hello World!"; 3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 4 System.out.println(" [x] Sent '" + message + "'");
定義一個消息隊列是冪等的:只有該隊列不存在的時候才能被創建,消息是二進制數組,因此你可以根據需要指定編碼。
完成的Send.java如下:
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 5 public class Send { 6 7 private final static String QUEUE_NAME = "hello"; 8 9 public static void main(String[] argv) throws Exception { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 try (Connection connection = factory.newConnection(); 13 Channel channel = connection.createChannel()) { 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 String message = "Hello World!"; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); 17 System.out.println(" [x] Sent '" + message + "'"); 18 } 19 } 20 }
2.2 接收消息
消費者監聽RabbitMq中的消息,因此與生產者發送一條消息就退出不同,消費者要保持運行狀態來接收消息並打印出來。
Recv.java同樣需要導入以下依賴包:
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 import com.rabbitmq.client.DeliverCallback;
與生產者相同,我們需要創建Connetcion和Channel、定義隊列(需要監聽並接收消息的隊列):
1 public class Recv { 2 3 private final static String QUEUE_NAME = "hello"; 4 5 public static void main(String[] argv) throws Exception { 6 ConnectionFactory factory = new ConnectionFactory(); 7 factory.setHost("localhost"); 8 Connection connection = factory.newConnection(); 9 Channel channel = connection.createChannel(); 10 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 13 14 } 15 }
注意我們也在這里聲明隊列,因為我們可能在生產者之前啟動消費者,我們想要確保在我們嘗試消費消息的時候隊列就已經存在了。
這里我們為什么不使用try-with-resource表達式自動關閉channl和connection?通過這樣,我們就可以使我們的程序一直保持運行狀態,如果把這些關了,程序也就停止了。這就尷尬了,因為我們需要保持消費者一直處於異步監聽消息過來的狀態。
RabbitMq會將隊列中的消息異步地推送過來,我們需要提供一個回調函數來緩存消息直到我們需要用到這些消息:
1 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 2 String message = new String(delivery.getBody(), "UTF-8"); 3 System.out.println(" [x] Received '" + message + "'"); 4 }; 5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
Rec.java完整代碼:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
3、測試
在官方手冊中,測試部分他們是將客戶端jar和依賴jar添加到classpath路徑,然后在cmd終端來運行的,我覺得麻煩,因此,我這里放到IDEA中來運行,效果是一樣的。
第一步:首先運行Send.java:
輸出結果:
[x] Sent 'Hello World!'
查看RabbitMq控制台:
說明消息已經發送成功。
第二步:啟動消費者Recv.java:
輸出結果:
[x] Received 'Hello World!'
說明消息已經消費成功了,此時再查看控制台:
消息依然存在在隊列中,但是區別是,在第一張圖中Ready由1變成了0,Unacknowledged由0變成了1;第二張圖中Ready也由1變成0,Unacked由0變成了1。為什么會這樣?按道理,消息消費了之后就應該刪除掉,否則可能造成重復消費。關於這方面知識,將會在后面的章節中再介紹(Ack機制)。
4、用SpringBoot實現
上面雖然實現了功能,但在實際工作中,我們更多的可能是使用SpringBoot、SpringCloud等成熟的框架來實現。本小節就通過SpringBoot來實現以上功能。
創建工程的時候選擇RabbitMq:
工程目錄如下:
Provider和Consumer的配置文件相同,IP請替換成你自己的:
1 #RabbitMq 2 spring.rabbitmq.host=192.168.xx.xx 3 spring.rabbitmq.username=rabbitmq 4 spring.rabbitmq.password=123456 5 6 hello_world.queue=hello
為方便讓系統啟動時就往隊列發送消息,所以寫了一個SenderRunner類:
1 @Component 2 public class SenderRunner implements ApplicationRunner { 3 4 @Autowired 5 private Send send; 6 7 @Override 8 public void run(ApplicationArguments args) throws Exception { 9 send.doSender("Hello RabbitMq"); 10 } 11 }
Send.java
1 @Component 2 public class Send { 3 4 @Value("${hello_world.queue}") 5 private String queueName; 6 7 @Autowired 8 private AmqpTemplate amqpTemplate; 9 10 public void doSender(String msg) { 11 12 amqpTemplate.convertAndSend(queueName,msg); 13 System.out.println("發送消息:" + msg); 14 } 15 }
啟動類:
1 @SpringBootApplication 2 public class ProviderApplication { 3 public static void main(String[] args) { 4 SpringApplication.run(ProviderApplication.class, args); 5 } 6 }
Recv.java
@Component public class Recv { @RabbitListener(queues = "${hello_world.queue}") public void receive(String msg) { System.out.println("接收到消息:" + msg); } }
啟動Provider:
查看控制台:
啟動Consumer:
可見,SpringBoot為我們做了很多封裝,隱藏了很多底層的細節,使用起來簡單多了。
PS:關於SpringBoot的實現涉及到很多的配置,我將在系統的最后專門用一章來講解SpringBoot的實現。