AMQP與RabbitMQ簡介


MQ(Message Queue,消息隊列)是一種應用系統之間的通信方法。是通過讀寫出入隊列的消息來通信(RPC則是通過直接調用彼此來通信的)。

1.AMQP協議

在了解RabbitMQ之前,首先要了解AMQP協議,AMQP,即Advanced Message Queuing Protocol高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP是一個提供統一消息服務的應用層標准協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受不同客戶端/中間件產品,不同開發語言等條件的限制。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。

1.1 AMQP的實現有:

OpenAMQ

AMQP的開源實現,用C語言編寫,運行於Linux、AIX、Solaris、Windows、OpenVMS

Apache Qpid

Apache的開源項目,支持C++、Ruby、Java、JMS、Python和.NET

Redhat Enterprise MRG

實現了AMQP的最新版本0-10,提供了豐富的特征集,比如完全管理、聯合、Active-Active集群,有Web控制台,還有許多企業級特征,客戶端支持C++、Ruby、Java、JMS、Python和.NET

RabbitMQ

一個獨立的開源實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ發布在Ubuntu、FreeBSD平台

AMQP

InfrastructureLinux下,包括Broker、管理工具、Agent和客戶端

ØMQ

一個高性能的消息平台,在分布式消息網絡可作為兼容AMQP的Broker節點,綁定了多種語言,包括Python、C、C++、Lisp、Ruby等

Zyre

是一個Broker,實現了RestMS協議和AMQP協議,提供了RESTful HTTP訪問網絡AMQP的能力

1.2 以下是AMQP中的核心概念:

Broker

消息服務器的實體

虛擬主機(Virtual Host)

一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。客戶端應用程序在登錄到服務器之后,可以選擇一個虛擬主機。每個連接(包括所有channel)都必須關聯至一個虛擬主機

交換器(Exchange)

服務器中的實體,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列

消息隊列(Message Queue)

服務器中的實體,用來保存消息直到發送給消費者

生產者(Producer)

一個向交換器發布消息的客戶端應用程序

消費者(Consumer)

一個從消息隊列中請求消息的客戶端應用程序

綁定器(Binding)

將交換器和隊列連接起來,並且封裝消息的路由信息

所有這些組件的屬性各不相同,但是只有交換器和隊列被命名。客戶端可以通過交換器的名字來發送消息,也可以通過隊列的名字收取信息。因為AMQ 協議沒有一個通用的標准方法來獲得所有組件的名稱,所以客戶端對隊列和交換器的訪問被限制在僅能使用熟知的或者只有自己知道的名字。

綁定器沒有名字,它們的生命期依賴於所緊密連接的交換器和隊列。如果這兩者任意一個被刪除掉,那么綁定器便失效了。這就說明,若要知道交換器和隊列的名字,還需要設置消息路由

消息是一個不透明的數據包,這些包有如下性質:

  •  元數據,例如內容的編碼或者表明來源的字段
  •  標志位,標記消息投遞時候的一些保障機制
  •  一個特殊的字段叫做routing key

發送消息是一個非常簡單的過程。客戶端聲明一個它想要發送消息的目的交換器,然后將消息傳遞給交換器。

接受消息的最簡單辦法是設置一個訂閱。客戶端需要聲明一個隊列,並且使用一個綁定器將之前的交換器和隊列綁定起來,這樣的話,訂閱就設置完畢。

交換器的類型;

fanout交換器

不會解釋任何東西:它只是將消息投遞到所有綁定到它的隊列中

direct交換器

將消息根據其routing-key屬性投遞到包含對應key屬性的綁定器上

topic交換器

模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分成單詞。這些單詞之間用點隔開。它同樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key *.stock.#匹配routing-key usd.stcok和eur.stock.db,但是不匹配stock.nasdaq

header交換器

根據應用程序消息的特定屬性進行匹配failover和system交換器當前RabbitMQ版本中均未實現

沒有綁定器,哪怕是最簡單的消息,交換器也不能將其投遞到隊列中,只能拋棄它。通過訂閱一個隊列,消費者能夠從隊列中獲取消息,然后在使用過后將其在隊列中刪除。

不同於隊列的是,交換器有相應的類型,表明它們的投遞方式(通常是在和綁定器協作的時候)。因為交換器是命名實體,所以聲明一個已經存在的交換器, 但是試圖賦予不同類型是會導致錯誤。客戶端需要刪除這個已經存在的交換器,然后重新聲明並且賦予新的類型。

交換器也有一些性質:

  • 持久性:如果啟用,交換器將會在Broker重啟前都有效
  •  自動刪除:如果啟用,那么交換器將會在其綁定的隊列都被刪除掉之后自動刪除掉自身
  •  惰性:如果沒有聲明交換器,那么在執行到使用的時候會導致異常,並不會主動聲明

AMQP Broker都會對其支持的每種交換器類型(為每一個虛擬主機)聲明一個實例。這些交換器的命名規則是amq.前綴加上類型名。例如 amq.fanout。空的交換器名稱等於amq.direct。對這個默認的direct交換器(也僅僅是對這個交換器),Broker將會聲明一個綁定了系統中所有隊列的綁定器。

這個特點告訴我們,在系統中,任意隊列都可以和默認的direct交換器綁定在一起,只要其routing-key等於隊列名字。

默認綁定器的行為揭示了多綁定器的存在,將一個或者多個隊列和一個或者多個交換器綁定起來。這使得可以將發送到不同交換器的具有不同routing key(或者其他屬性)的消息發送到同一個隊列中。

隊列也有以下屬性,這些屬性和交換器所具有的屬性類似。

  •  持久性:如果啟用,隊列將會在Broker重啟前都有效
  •  自動刪除:如果啟用,那么隊列將會在所有的消費者停止使用之后自動刪除掉自身
  •  惰性:如果沒有聲明隊列,那么在執行到使用的時候會導致異常,並不會主動聲明
  •  排他性:如果啟用,隊列只能被聲明它的消費者使用

這些性質可以用來創建例如排他和自刪除的transient或者私有隊列。這種隊列將會在所有鏈接到它的客戶端斷開連接之后被自動刪除掉 – 它們只是短暫地連接到Broker,但是可以用於實現例如RPC或者在AMQ上的對等通信。

AMQP上的RPC是這樣的:RPC客戶端聲明一個回復隊列,唯一命名(例如用UUID19), 並且是自刪除和排他的。然后它發送請求給一些交換器,在消息的reply-to字段中包含了之前聲明的回復隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to作為routing key(之前提到過默認綁定器會綁定所有的隊列到默認交換器)發送到默認交換器。注意僅僅是慣例而已。根據和RPC服務器的約定,它可以解釋消息的任何屬性(甚至數據體)來決定回復給誰。

隊列也可以是持久的,可共享,非自動刪除以及非排他的。使用同一個隊列的多個用戶接收到的並不是發送到這個隊列的消息的一份拷貝,而是這些用戶共享這隊列中的一份數據,然后在使用完之后刪除掉。

2. RabbitMQ

RabbitMQ是一個遵循AMQP協議的消息中間件,它從生產者接收消息並遞送給消費者,在這個過程中,根據規則進行路由,緩存與持久化。

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

幾個概念說明(完全遵循AMQP中的概念):

  •  Broker:簡單來說就是消息隊列服務器實體
  •  Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列
  •  Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列
  •  Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來
  •  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞
  •  vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離
  •  producer:消息生產者,就是投遞消息的程序
  •  consumer:消息消費者,就是接受消息的程序
  •  channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

消息隊列的使用過程大概如下:

  1. 客戶端連接到消息隊列服務器,打開一個channel
  2. 客戶端聲明一個exchange,並設置相關屬性
  3. 客戶端聲明一個queue,並設置相關屬性
  4. 客戶端使用routing key,在exchange和queue之間建立好綁定關系
  5. 客戶端投遞消息到exchange

exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

RabbitMQ支持消息的持久化,消息隊列持久化包括3個部分:

  •  exchange持久化,在聲明時指定durable為true
  •  queue持久化,在聲明時指定durable為true
  •  消息持久化,在投遞時指定delivery_mode 為2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

RabbitMQ的特性:

  •  可靠性:包括消息持久化,消費者和生產者的消息確認
  •  靈活路由:遵循AMQP協議,支持多種Exchange類型實現不同路由策略
  •  分布式:集群的支持,包括本地網絡與遠程網絡
  •  高可用性:支持主從備份與鏡像隊列
  •  多語言支持:支持多語言的客戶端
  •  WEB界面管理:可以管理用戶權限,exhange,queue,binding,與實時監控
  •  訪問控制:基於vhosts實現訪問控制
  •  調試追蹤:支持tracing,方便調試

 

下面通過生產者代碼來解釋一下RabbitMQ中涉及到的概念。

public class MsgSender {  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] args) throws IOException {  
        /** 
         * 創建連接連接到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        // 設置MabbitMQ所在主機ip或者主機名  
        factory.setHost("127.0.0.1");  
        // 創建一個連接  
        Connection connection = factory.newConnection();  
        // 創建一個頻道  
        Channel channel = connection.createChannel();  
        // 指定一個隊列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        // 發送的消息  
        String message = "hello world!";  
        // 往隊列中發出一條消息  
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'");  
        // 關閉頻道和連接  
        channel.close();  
        connection.close();  
    }  
}  

ConnectionFactory、Connection、Channel這三個都是RabbitMQ對外提供的API中最基本的對象。不管是服務器端還是客戶端都會首先創建這三類對象。

       ConnectionFactory為Connection的制造工廠。

       Connection是與RabbitMQ服務器的socket鏈接,它封裝了socket協議及身份驗證相關部分邏輯。

       Channel是我們與RabbitMQ打交道的最重要的一個接口,大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。

 Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,如下圖所示:

 RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息並消費。

隊列是有Channel聲明的,而且這個操作是冪等的。同名的隊列多次聲明也只會創建一次。我們發送消息就是想這個聲明的隊列里發送消息。

然后看一下消費者的代碼:

public class MsgReceiver {  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] argv) throws IOException, InterruptedException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("127.0.0.1");  
        // 打開連接和創建頻道,與發送端一樣  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
  
        // 聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        // 創建隊列消費者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
        // 指定消費隊列  
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true) {  
            // nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(" [x] Received '" + message + "'");  
        }  
    }  
}  

從上述代碼中,我們可以看到ConnectionFactory、Connection、Channel這三個對象都還是會創建。而隊列在消費者這里又聲明了一遍。這是為了防止先啟動消費者,當為消費者指定隊列時,如果RabbitMQ服務器上未聲明過隊列,就會拋出IO異常。

 隊列消費者,用於監聽隊列中的消息。調用nextDelivery方法時,內部實現就是調用隊列的take方法。該方法的作用:獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。說白了就是如果沒有消息,就處於阻塞狀態。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM