RabbitMQ簡介

在介紹RabbitMQ之前,我們需要了解一些最基礎的概念,相信使用過或者聽說過RabbitMQ的人都不會陌生,但筆者還是不厭其煩地在這里講述,因為筆者的理念是self contained。
Queue: 隊列。計算機數據結構中的一種基本類型,遵循“先入先出”(FIFO)的原則,比如我們日常生活中常見的排隊時的隊伍就是一個隊列。Message Queue: 消息隊列,簡稱MQ。消息隊列本質上也是隊列,只不過隊列中的元素為Message(消息),而消息則是服務之間最常見的通信方式。流行的MQ框架主要有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。AMQP:Advanced Message Queuing Protocol,是一個提供統一消息服務的應用層標准高級消息隊列協議,簡單來說,它就是一個消息列隊的協議,其標准高,要求嚴。Erlang:Erlang是一種通用的面向並發的編程語言,它由瑞典電信設備制造商愛立信所轄的CS-Lab開發,目的是創造一種可以應對大規模並發活動的編程語言和運行環境。RabbitMQ:RabbitMQ是一個實現了AMQP高級消息隊列協議的消息隊列服務,用Erlang語言實現。RabbitMQ的運行原理如下圖(后續我們會解釋其中的含義,現階段只作為瀏覽):

以上是我們對RabbitMQ的最初認識。接下來我們還需要了解RabbitMQ的下載與安裝,如下:
- RabbitMQ的下載頁面:https://www.rabbitmq.com/download.html ;
- RabbitMQ的安裝過程:https://www.rabbitmq.com/download.html#installation-guides ;
- RabbitMQ入門教程: https://www.rabbitmq.com/getstarted.html ;
說了這么多,我們為什么要選擇RabbitMQ,也就是說它的優勢又是什么呢?RabbitMQ的強大之處在於:
- 可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認及發布確認等。
- 靈活的路由:在消息進入隊列之前,通過交換器來路由消息。對於典型的路由功能,RabbitMQ己經提供了一些內置的交換器來實現。針對更復雜的路由功能,可以將多個交換器綁定在一起,也可以通過插件機制來實現自己的交換器。
- 擴展性:多個RabbitMQ節點可以組成一個集群,也可以根據實際業務情況動態地擴展集群中節點。
- 高可用性:隊列可以在集群中的機器上設置鏡像,使得在部分節點出現問題的情況下隊仍然可用。
- 多種協議:RabbitMQ除了原生支持AMQP協議,還支持STOMP,MQTT等多種消息中間件協議。
- 多語言客戶端:RabbitMQ幾乎支持所有常用語言,比如Java、Python、Ruby、PHP、C#、JavaScript等。
- 管理界面:RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息、集群中的節點等。
- 插件機制:RabbitMQ提供了許多插件,以實現從多方面進行擴展,當然也可以編寫自己的插件。
帶着對RabbitMQ的初次見面,我們不妨再了解下如何簡單地使用RabbitMQ。
RabbitMQ入門之Hello World
在計算機領域中,每次學習一個新事物的驚喜,往往都是伴隨着Hello World。在編程語言中,會有輸出“Hello World”;在大數據中,“Hello World”就是統計單詞的詞頻;在Docker中,就是使用“Hello World”鏡像;在RabbitMQ,這次的“Hello World”就是生產者發送“Hello World”,而消費者輸出“Hello World”。
RabbitMQ就是消息代理,它接受並推動消息流動。你可以把它想象成一個郵局:當你把一封信塞進郵箱,你需要確保它能送到收信人的手里。而RabbitMQ就是一個郵箱,郵局,郵遞員。不同於真實的郵局(處理信件),RabbitMQ處理接受、存儲、推動消息。
在RabbitMQ,或者消息隊列領域中,有如下術語。
生產者(Producer):生產者僅產生消息,也就說一個產生消息的程序就是生產者。對應於郵局的例子,生產者就是寄信人,因為他們產生信件。

隊列(Queue): 一個隊列就是RabbitMQ中的郵箱。盡管消息會在RabbitMQ和應用程序之間流動,但是它們只會在隊列中存儲。一個隊列僅受限於硬盤和內存大小,它是一個大的消息緩存區。許多生產者產生消息后會進入一個隊列,許多消費者也會從同一個隊列中獲取消息。以下是我們如何表示一個隊列:

消費者(Consumer):消費消息與接收消息的意思是一致的。一個消費者往往會等待接收消息。在郵局的例子中,消費者也許就是收信人。

介紹完生產者、隊列、消費者后,我們將會來學習RabbitMQ中的Hello World。
我們使用Python的Pika模塊來操作RabbitMQ。在本文中,我們將會編寫兩個小程序:一個生產者(Producer)發送一條消息,而一個消費者(Consumer)將會接收這個消息並將它輸出。這就是消息通信的“Hello World”。
在下圖中,P代表生產者,C代表消費者,中間的盒子代表隊列——消息緩存區。我們總的設計圖如下:

生產者會將消息發送至“hello”隊列,消費者從從該隊列中獲取消息。
發送消息
在這一部分中,我們將會讓生產者來發送消息。

我們的第一個程序send.py將會發送一個消息至隊列。首先我們要做的是建立與RabbitMQ Server的連接。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
我們連接到了本地機器(localhost)的一個代理。如果我們需要連接不同機器的代理,我們只需要聲明機器名稱以及IP地址即可。
接着,在我們發送消息之前,我們需要確認隊列是否存在。如果我們發送消息到一個不存在的地方,RabbitMQ將會丟失這條消息。因此,我們需要創建一個hello隊列,這里將是消息傳遞的地方。
channel.queue_declare(queue='hello')
我們已經准備好發送消息了。我們的第一條消息是字符串“Hello World!”,我們將它發送至hello隊列。
在RabbitMQ中,消息不會被直接發送至隊列,它需要通過exchange才能做到。在這里我們不需要了解exchange的原理,我們只需要知道,空字符串就代表默認的exchange。該exchange很特殊——它規定了我們的消息往哪個隊列走。隊列名稱需要用routing_key這個參數來聲明:
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
在退出程序前,我們需要確保網絡緩存被清空並且我們的消息確實被傳送至RabbitMQ。一般我們通過關閉連接來實現。
connection.close()
接受消息
在這一部分中,我們將會讓消費者來接受消息。

我們的第二個程序receive.py將會從隊列中接受消息,並把其輸出出來。
同樣地,第一步是連接到RabbitMQ Server。這部分的代碼與之前的部分相同。
下一步,更之前一樣,需要確保隊列存在。使用queue_declare來創建隊列是冪等的(idempoten) —— 我們可以運行這條命令很多次,但只會創建一個隊列。
channel.queue_declare(queue='hello')
也許你會好奇我們為什么要再一次聲明這個列隊,明明我們在之前的代碼中已經聲明過了。這里這么做主要是為了確保隊列已經存在。舉例來說,這邊是先運行send.py,但我們不能確定哪一個程序會先運行。因此在這樣的情況下,在兩個程序中反復聲明列隊是不錯的方式。
從隊列中接受消息更加復雜。他需要通過callback函數與列隊關聯。無論什么時候我們接受到消息,這個callback函數都被會Pika模塊調用。在我們的例子中,這個函數將會輸出消息的內容。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
下一步,我們需要告訴RabbitMQ,在hello隊列中,這個特定的callback函數需要接受消息。
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
auto_ack參數的含義會在后面的文章中解釋。
最后我們創建一個永不停止的循環,用於接收消息:
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
實踐出真知
上面的部分介紹了“Hello World”的理論方面,接下來,我們會分別使用Python和Java程序來分別實現這個例子。
Python
sent.py程序如下:
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World from Python!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py程序如下:
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
先啟動receive.py,程序會提示“ [*] Waiting for messages. To exit press CTRL+C”,表明該消費者在等待接收消息。在運行sent.py,該程序會發送“Hello World from Python!”至隊列,同時receive.py會輸出該消息。每運行一次sent.py,receive.py會就會輸出一個該消息,如下圖:

Java
我們使用Gradle來構建這個項目,項目結構如下:

在build.gradle中,我們引入第三方jar包,內容如下:
plugins {
id 'java'
}
group 'rabbitmq'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
// https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26'
// https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26'
}
Send.java代碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World from Java!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Recv.java的代碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
具體的操作方法同Python一樣。
Python與Java的交互
如果我們把Python的“Hello World”看成一個簡單的小系統,而Java的“Hello World”也看成一個簡單的小系統,那么RabbitMQ可以溝通這兩個系統,這也是RabbitMQ的一個特定:系統對接。
我們在Python中運行receive.py,而運行Java的Send.java三次,運行Python的sent.py兩次,結果如下:

這樣的測試結果是令人吃驚的,因為我們用RabbitMQ打通了兩個不同語言的系統!
總結
本文作為RabbitMQ入門的第一篇,希望能對大家有所幫助。筆者也是初學RabbitM,文章中肯定有不足之處,懇請大家批評指正。
感謝大家的閱讀~
參考網站
- Python操作rabbitmq系列(一): https://zhuanlan.zhihu.com/p/29800710
- RabbitMQ教程: https://blog.csdn.net/hellozpc/article/details/81436980
- RabbitMQ Tutorials:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
- 透徹rabbitmq: https://zhuanlan.zhihu.com/p/63700605
- MQ和RabbitMQ作用特點: https://blog.csdn.net/weixin_40792878/article/details/82555791
