RabbitMQ入門之Hello World


RabbitMQ簡介

  在介紹RabbitMQ之前,我們需要了解一些最基礎的概念,相信使用過或者聽說過RabbitMQ的人都不會陌生,但筆者還是不厭其煩地在這里講述,因為筆者的理念是self contained。

  • Queue: 隊列。計算機數據結構中的一種基本類型,遵循“先入先出”(FIFO)的原則,比如我們日常生活中常見的排隊時的隊伍就是一個隊列。
  • Message Queue: 消息隊列,簡稱MQ。消息隊列本質上也是隊列,只不過隊列中的元素為Message(消息),而消息則是服務之間最常見的通信方式。流行的MQ框架主要有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。
  • AMQP:Advanced Message Queuing Protocol,是一個提供統一消息服務的應用層標准高級消息隊列協議,簡單來說,它就是一個消息列隊的協議,其標准高,要求嚴。
  • Erlang:Erlang是一種通用的面向並發的編程語言,它由瑞典電信設備制造商愛立信所轄的CS-Lab開發,目的是創造一種可以應對大規模並發活動的編程語言和運行環境。
  • RabbitMQ:RabbitMQ是一個實現了AMQP高級消息隊列協議的消息隊列服務,用Erlang語言實現。RabbitMQ的運行原理如下圖(后續我們會解釋其中的含義,現階段只作為瀏覽):

RabbitMQ運行原理

以上是我們對RabbitMQ的最初認識。接下來我們還需要了解RabbitMQ的下載與安裝,如下:

  1. RabbitMQ的下載頁面:https://www.rabbitmq.com/download.html
  2. RabbitMQ的安裝過程:https://www.rabbitmq.com/download.html#installation-guides
  3. RabbitMQ入門教程: https://www.rabbitmq.com/getstarted.html

  說了這么多,我們為什么要選擇RabbitMQ,也就是說它的優勢又是什么呢?RabbitMQ的強大之處在於:

  • 可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認及發布確認等。
  • 靈活的路由:在消息進入隊列之前,通過交換器來路由消息。對於典型的路由功能,RabbitMQ己經提供了一些內置的交換器來實現。針對更復雜的路由功能,可以將多個交換器綁定在一起,也可以通過插件機制來實現自己的交換器。
  • 擴展性:多個RabbitMQ節點可以組成一個集群,也可以根據實際業務情況動態地擴展集群中節點。
  • 高可用性:隊列可以在集群中的機器上設置鏡像,使得在部分節點出現問題的情況下隊仍然可用。
  • 多種協議:RabbitMQ除了原生支持AMQP協議,還支持STOMP,MQTT等多種消息中間件協議。
  • 多語言客戶端:RabbitMQ幾乎支持所有常用語言,比如Java、Python、Ruby、PHP、C#、JavaScript等。
  • 管理界面:RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息、集群中的節點等。
  • 插件機制:RabbitMQ提供了許多插件,以實現從多方面進行擴展,當然也可以編寫自己的插件。

  帶着對RabbitMQ的初次見面,我們不妨再了解下如何簡單地使用RabbitMQ。

RabbitMQ入門之Hello World

  在計算機領域中,每次學習一個新事物的驚喜,往往都是伴隨着Hello World。在編程語言中,會有輸出“Hello World”;在大數據中,“Hello World”就是統計單詞的詞頻;在Docker中,就是使用“Hello World”鏡像;在RabbitMQ,這次的“Hello World”就是生產者發送“Hello World”,而消費者輸出“Hello World”
  RabbitMQ就是消息代理,它接受並推動消息流動。你可以把它想象成一個郵局:當你把一封信塞進郵箱,你需要確保它能送到收信人的手里。而RabbitMQ就是一個郵箱,郵局,郵遞員。不同於真實的郵局(處理信件),RabbitMQ處理接受、存儲、推動消息。
  在RabbitMQ,或者消息隊列領域中,有如下術語。

  • 生產者(Producer):生產者僅產生消息,也就說一個產生消息的程序就是生產者。對應於郵局的例子,生產者就是寄信人,因為他們產生信件。

生產者

  • 隊列(Queue): 一個隊列就是RabbitMQ中的郵箱。盡管消息會在RabbitMQ和應用程序之間流動,但是它們只會在隊列中存儲。一個隊列僅受限於硬盤和內存大小,它是一個大的消息緩存區。許多生產者產生消息后會進入一個隊列,許多消費者也會從同一個隊列中獲取消息。以下是我們如何表示一個隊列:

隊列

  • 消費者(Consumer):消費消息與接收消息的意思是一致的。一個消費者往往會等待接收消息。在郵局的例子中,消費者也許就是收信人。

消費者

  介紹完生產者、隊列、消費者后,我們將會來學習RabbitMQ中的Hello World。
  我們使用Python的Pika模塊來操作RabbitMQ。在本文中,我們將會編寫兩個小程序:一個生產者(Producer)發送一條消息,而一個消費者(Consumer)將會接收這個消息並將它輸出。這就是消息通信的“Hello World”。
  在下圖中,P代表生產者,C代表消費者,中間的盒子代表隊列——消息緩存區。我們總的設計圖如下:

總設計圖
生產者會將消息發送至“hello”隊列,消費者從從該隊列中獲取消息。

發送消息

  在這一部分中,我們將會讓生產者來發送消息。

發送消息
  我們的第一個程序send.py將會發送一個消息至隊列。首先我們要做的是建立與RabbitMQ Server的連接。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

我們連接到了本地機器(localhost)的一個代理。如果我們需要連接不同機器的代理,我們只需要聲明機器名稱以及IP地址即可。
  接着,在我們發送消息之前,我們需要確認隊列是否存在。如果我們發送消息到一個不存在的地方,RabbitMQ將會丟失這條消息。因此,我們需要創建一個hello隊列,這里將是消息傳遞的地方。

channel.queue_declare(queue='hello')

我們已經准備好發送消息了。我們的第一條消息是字符串“Hello World!”,我們將它發送至hello隊列。
  在RabbitMQ中,消息不會被直接發送至隊列,它需要通過exchange才能做到。在這里我們不需要了解exchange的原理,我們只需要知道,空字符串就代表默認的exchange。該exchange很特殊——它規定了我們的消息往哪個隊列走。隊列名稱需要用routing_key這個參數來聲明:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

在退出程序前,我們需要確保網絡緩存被清空並且我們的消息確實被傳送至RabbitMQ。一般我們通過關閉連接來實現。

connection.close()

接受消息

  在這一部分中,我們將會讓消費者來接受消息。

接受消息

我們的第二個程序receive.py將會從隊列中接受消息,並把其輸出出來。
  同樣地,第一步是連接到RabbitMQ Server。這部分的代碼與之前的部分相同。
  下一步,更之前一樣,需要確保隊列存在。使用queue_declare來創建隊列是冪等的(idempoten) —— 我們可以運行這條命令很多次,但只會創建一個隊列。

channel.queue_declare(queue='hello')

也許你會好奇我們為什么要再一次聲明這個列隊,明明我們在之前的代碼中已經聲明過了。這里這么做主要是為了確保隊列已經存在。舉例來說,這邊是先運行send.py,但我們不能確定哪一個程序會先運行。因此在這樣的情況下,在兩個程序中反復聲明列隊是不錯的方式。
  從隊列中接受消息更加復雜。他需要通過callback函數與列隊關聯。無論什么時候我們接受到消息,這個callback函數都被會Pika模塊調用。在我們的例子中,這個函數將會輸出消息的內容。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

  下一步,我們需要告訴RabbitMQ,在hello隊列中,這個特定的callback函數需要接受消息。

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

auto_ack參數的含義會在后面的文章中解釋。
  最后我們創建一個永不停止的循環,用於接收消息:

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

實踐出真知

  上面的部分介紹了“Hello World”的理論方面,接下來,我們會分別使用Python和Java程序來分別實現這個例子。

Python

  sent.py程序如下:

# -*- coding: utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World from Python!')
print(" [x] Sent 'Hello World!'")
connection.close()

  receive.py程序如下:

# -*- coding: utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

先啟動receive.py,程序會提示“ [*] Waiting for messages. To exit press CTRL+C”,表明該消費者在等待接收消息。在運行sent.py,該程序會發送“Hello World from Python!”至隊列,同時receive.py會輸出該消息。每運行一次sent.py,receive.py會就會輸出一個該消息,如下圖:

Java

  我們使用Gradle來構建這個項目,項目結構如下:

Java項目結構

  在build.gradle中,我們引入第三方jar包,內容如下:

plugins {
    id 'java'
}

group 'rabbitmq'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    // https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
    compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
    // https://mvnrepository.com/artifact/org.slf4j/slf4j-api
    compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26'
    // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
    testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26'

}

  Send.java代碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
             String message = "Hello World from Java!";
             channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
             System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

  Recv.java的代碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

具體的操作方法同Python一樣。

Python與Java的交互

  如果我們把Python的“Hello World”看成一個簡單的小系統,而Java的“Hello World”也看成一個簡單的小系統,那么RabbitMQ可以溝通這兩個系統,這也是RabbitMQ的一個特定:系統對接。
  我們在Python中運行receive.py,而運行Java的Send.java三次,運行Python的sent.py兩次,結果如下:

Python與Java的交互

這樣的測試結果是令人吃驚的,因為我們用RabbitMQ打通了兩個不同語言的系統!

總結

  本文作為RabbitMQ入門的第一篇,希望能對大家有所幫助。筆者也是初學RabbitM,文章中肯定有不足之處,懇請大家批評指正。
  感謝大家的閱讀~

參考網站

  1. Python操作rabbitmq系列(一): https://zhuanlan.zhihu.com/p/29800710
  2. RabbitMQ教程: https://blog.csdn.net/hellozpc/article/details/81436980
  3. RabbitMQ Tutorials:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
  4. 透徹rabbitmq: https://zhuanlan.zhihu.com/p/63700605
  5. MQ和RabbitMQ作用特點: https://blog.csdn.net/weixin_40792878/article/details/82555791


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM