關於這篇指導文檔
本文提供了AMQP 0-9-1協議的一個概述,它是RabbitMQ所支持的協議之一。
AMQP 0-9-1是什么
AMQP 0-9-1(Advanced Message Queuing Protocol)高級消息隊列協議是一個消息協議,它支持符合標准的客戶端請求程序與符合標准的消息中間件代理進行通信。
broker與他們的角色
消息代理接收來自publishers(發布消息的應用程序,同時也可以稱之為producers)的消息並且將消息路由至consumers(處理消息的應用程序)
因為它是一個網絡協議,publishers,consumers與broker可以存在於不同的機器上
AMQP 0-9-1 Model概要
AMQP 0-9-1 Model的整體視圖如下:
- 消息被發布到exchanges,通常可將exchanges比作郵局或者郵箱
- exchanges將消息副本分發到queues,按照bindings中的規則
- AMQP brokers傳遞消息給與queues關聯的consumers,或者consumers按照需求從queues拉取信息
當發布一條消息時,publishers可以指定多個消息屬性(message meta-data)。部分的meta-data可能會被broker使用,但是剩下消息對broker是完全不透明的,僅僅會被接收消息的應用程序使用。
網絡不穩定,應用程序可能會無法處理消息,因此AMQP model有一個消息確認的概念:當一條消息被傳輸到consumer,consumer自動通知broker或者應用程序開發人員選擇這么做。當消息確認被使用時,broker只在接收到該消息(群消息)的通知時才會從queue中完全移除該消息。
在某些場合中,比如當一條消息不能被路由時,消息可能會被返回給publishers,或者被丟棄,或者如果broker實現了一個擴展將消息放在“dead letter queue”中。publishers通過使用某些參數發布消息選擇如何處理這些情況
queues,exchanges與bindings統稱為AMQP實體
AMQP是一個可編程的協議
AMQP 0-9-1是一個可編程的協議,AMQP實體與路由方案都在自身應用程序中被定義,而不是被broker管理員定義。因此,為申明queues,exchanges,定義它們之間的bindings,訂閱queues等協議操作進行了規定。
這給應用程序開發者很多自由空間但是也要求他們小心潛在的定義沖突。實際上定義沖突是很少見的並且通常表明是配置錯誤。
應用程序申明他們需要的AMQP實體,定義需要的路由方案,並且可能選擇刪除AMQP實體在他們不再需要使用時。
Exchanges與exchanges類型
當消息被發送時exchanges是AMQP實體。exchanges會傳遞信息並且將它路由到0個或者多個queue中,路由算法是取決於exchange類型與bingding規則。AMQP 0-9-1 broker提供4個exchange類型。
除exchange類型之外,exchange聲明了一系列的屬性,最重要的如下:
- Name
- Durability(broker重啟時exchanges是否survive)
- Auto-delete(當所有queue完成使用后exchange被刪除)
- Arguments(這些都是broker相關的)
Exchanges可以是持久的也可以是短暫的。持久的exchanges可以在broker重啟時存活而暫時性的exchanges則不可以(當broker重新在線時必須重新申明它們),並非所有的場景與使用情況都需要持久的exchanges。
Default exchange
default exchange是一個沒有名稱的(空字符串)被broker預先申明的direct exchange。它所擁有的一個特殊屬性使它對於簡單的應用程序很有作用:每個創建的queue會與它自動綁定,使用queue名稱作為routing key。
舉例說,當你申明一個名稱為“search-indexing-online”的queue時,AMQP broker使用“search-indexing-online”作為routing key將它綁定到default exchange。因此,一條被發布到default exchange並且routing key為"search-indexing-online"將被路由到名稱為"search-indexing-online"的queue。換句話說,default exchange使直接傳送消息到queue成為可能,即使從技術角度上而言,事實並不是這樣。
Direct exchange
direct exchange根據消息的routing key來傳送消息。direct exchange是單一傳播路由消息的最佳選擇(盡管他們也可以用於多路傳播路由),以下是它們的工作原理:
- 一個routing key為K的queue與exchange進行綁定
- 當一條新的routing key為R的消息到達direct exchange時,exchange 將它路由至該queue如果K=R
direct exchange經常用於在多個工作者(同一應用程序的多個實例)之間分配任務。當做這些的時候,重要的是明白在AMQP 0-9-1中,消息是在consumer間負載均衡,而不是在queue。direct exchange可以用圖形方式表示如下:
Fanout exchange
fanout exchange路由消息到所有的與其綁定的queue中,忽略routing key。如果N個queue被綁定到一個fanout exchange,當一條新消息被發布到exchange時,消息會被復制並且傳送到這N個queue。fanout exchange是廣播路由的最佳選擇。
因為一個fanout exchange傳送消息的副本到每一個與其綁定的queue,它的使用情況很相似:
- 大量的多用戶在線(multi-player online MMO)游戲使用它更新排行榜或者其他的全體事件
- 體育新聞網站使用fanout exchange向手機客戶端實時發送比分更新
- 分布式系統可以廣播各種狀態與配置更新
- 群聊可以使用fanout exchange讓消息在參與者之間傳輸(即使AMQP沒有內在的概念,所以XMPP也許是一個更好地選擇)
一個fanout exchange圖形化的表述如下
Topic exchange
Topic exchange路由消息到一個或者多個queue,基於消息的routing key和queue與exchange之間的綁定模式的匹配。Topic exchange經常被用於實現各種發布/訂閱模式的變化。Topic exchanges通常被用於多路廣播路由消息。
topic exchange有非常多的應用場景。當一個問題牽涉到多個consumer/應用程序,他們有選擇的選擇他們接收何種何種類型的消息。可以考慮使用topic exchange。
使用示例:
- 銷售與特定地理位置相關的數據,比如銷售點
- 由多個工作者完成的后台任務處理,每個都能夠負責處理指定的任務
- 庫存價格更新(更新其他的財務數據)
- 包含分類與標簽的新聞更新(例如只針對某一個特定的運動或團隊)
- 不同種類的雲服務編制
- 分布式結構/特定操作系統軟件的構建與包裝,每個處理者只能處理一個結構或者系統
Headers exchange
header exchange為在多個屬性進行路由而設計的,這些屬性更容易描述為消息頭,而不是routing key。headers exchanges忽略routing key屬性,相反用於路由的屬性是從headers屬性中獲取的。如果消息頭的值等於指定的綁定值,則認為消息是匹配的。
可以使用多個header匹配將一個queue綁定到header exchange。在這種情況下,broker需要從應用程序開發者那邊獲取多條信息,也就是說,是否應該考慮任何headers匹配的消息,還是所有headers都匹配的消息?這就是所謂的“x-match”綁定參數。當“x-match”參數的值被設為“any”,只要一個匹配的header值就足夠了。相反的,設置“x-match”的值為“all”需要所有的headers值匹配。
Headers exchanges被視為“direct exchanges on steroids”。因為他們依據headers值路由消息,他們可以被當做direct exchanges使用,routing key不必是一個字符串;舉例來說它可以是一個整數或者一個hash(dictionary)
Queues
AMQP模型中的Queues相似余其他massage-和task-queueing系統中的queues:它們存儲被應用程序消耗的消息。Queues與exchanges分享一些數據,但是也有一些其他的附加屬性:
- Name
- Durable(當broker重啟時,queue是否存在)
- Exclusive(只被一個connection使用並且在connection關閉時queue被刪除)
- Auto-delete(當最后一個consumer取消訂閱時queue被刪除)
- Arguments(一些broker使用它去實現message TTL之類的附加功能)
queue必須在使用前被申明。如果它不存在的話申明一個queue將會創建一個queue。如果queue已經存在並且屬性與申明的屬性相同的情況下,申明無效。當現有的queue屬性與什么的屬性不相同時,一個錯誤碼為406(先決條件失敗)的channel-level異常被拋出。
Queue Names
應用程序可以選擇queue名稱或者請求broker重新為其生成一個名稱。Queueu名稱最多可達255字節。為了讓AMQP broker為你生成一個唯一的queue名稱,傳遞一個空字符串作為queue的名稱參數。在相同的通道中,通過使用queue名稱預期的空字符串,可以在隨后的方法中獲取相同的名稱。這是有效的,因為channel記得最后的服務器生成的queue名稱。
以“amq.”開頭的queue是broker為內部使用而保留的。嘗試使用違反此規則的queue名稱將會導致錯誤碼403的channel-level異常。
Queue Durability
持久的queue被持久化於磁盤中因此broker重啟后仍存在。非持久的queue是暫時的。並非所有的場景與使用情況都要求queue是持久的。
queue的持久性不能讓路由到queue中的消息持久化。如果broker被關閉然后重新啟動,持久的queue將會在broker啟動期間被重新申明,但是僅僅持久的消息會恢復。
Bindings
bindings是exchanges用來路由消息到queues的規則。為了命令exchange E路由消息到 queue Q,Q必須綁定到E。Bindings可能有一個選擇性的routing key屬性,被某些類型的exchanges使用。routing key是為了選擇被發布到exchange的消息,路由到綁定的queue中。換句話說,routing key的功能就像一個過濾器。
畫一個類比:
- queue就像是你在紐約的目的地
- Exchanges就像是JFK機場
- Bindings是JFK到你目的地的路線。有0或者多條路線到達目的地
有了間接層,就可以通過直接發布到queue來實現不可能或者很難實現的場景,並且減少應用程序開發者必須要做的某些重復工作。
如果AMQP消息不能路由到任何queue(例如,因為沒有綁定到它被發布的exchange),它會被丟棄還是返回給publisher,取決於publisher設置的消息屬性。
Consumers
存儲消息到queue中是無用的,除非應用程序可以消耗它們。在AMQP模型中,應用程序有兩種方法這么做:
- 將消息傳送給它們(push API)
- 按照需要拉取消息(pull API)
使用“push API”,應用程序必須表現出對從特定queue中消耗消息的興趣,當它們這么做時,我們說它們注冊了一個消費者或者簡單的訂閱了一個queue。一個queue可能同時有多個consumer或者注冊一個獨占的消費者(當它在消耗消息時排除隊列中所有其他的consumer)
Message Acknowledgements
消費者應用程序,接收處理消息的應用程序可能偶爾在處理消息時失敗或者崩潰。這也可能是網絡問題所導致的。這引發了一個問題:AMQP何時從queue中移除消息?AMQP 0-9-1提供了兩個選擇:
- broker發送消息給應用程序之后(使用basic.deliver或者basic.get-ok AMQP方法)
- 應用程序發送確認之后(使用basci.ack AMQP方法)
前一種選擇被稱作自動確認模型,而后者被稱為顯示確認模型。使用顯示確認模型,應用程序可以選擇何時發送確認,可以在接收消息后,或者在處理之前將其持久化到數據存儲之后,或者在完全處理完成消息之后(例如成功拉取一個web頁面,處理並且將其存儲到持久化的數據存儲中)。
如果一個consumer沒有發送確認就死亡,AMQP broker將其重新發送到另外一個consumer,或者如果此時沒有任何consumer,broker在嘗試重新發送之前,會等待到至少有一個consumer注冊到相同的queue
Rejecting Messages
當一個consumer接收到一條消息,處理消息可能成功也可能失敗。一個應用程序通過拒絕消息可以向broker表明消息處理失敗(或者不能在那時候完成)。當拒絕一條消息時,應用程序可以請求broker丟棄或者重新將其加入queue中。當queue中僅有一個consumer時,確保你沒有重復的通過拒絕並且將消息重新加入queue中來創建無限的消息傳遞循環。
Negative Acknowledgements
使用AMQP的basic.reject方法拒絕消息。basci.reject方法有一個限制:就像你執行確認操作一樣不可能同時拒絕多條消息。然而,如果你使用RabbitMQ,有一個解決方法。RabbitMQ提供了被稱為是否定確認或者nacks的AMQP 0-9-1擴展。獲取更多信息,請參照the help page.
Prefetching Messages
對於多個consumer共享一個queue的情況,在發送確認之前能指定多少條消息可以同時發送給每個消費者是非常有用的。這可以被當做一個簡單的負載均衡技術使用或者提高生產量如果消息是批量發布的。例如,如果生產的應用程序每分鍾發送消息是因為它的工作性質
注意RabbitMQ僅支持channel-level的預取數值,而不是鏈接或者預取的次數。
Messages Attributes and Payload
AMQP模型中的消息擁有屬性,AMQP定義的一些屬性是很常見的,應用程序開發者不需要考慮准確的屬性名稱,一些示例如下:
- Content type
- Content encoding
- Routing key
- Delivery mode(持久或非持久)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
一些屬性被AMQP broker使用,但是大多數被接收它們的應用程序所解譯。有些屬性是可選的被稱為headers。它們與HTTP中的X-Headers相似。當消息被發布時它們的屬性被設置。
AMQP消息也有有效負荷(它們所攜帶的數據),被AMQP brokers視為不透明字節數組。broker不會檢查有效負荷。消息只包含屬性沒有效負荷是可能的。使用序列化格式比如JSON是很常見的。緩沖協議與數據包序列化結構數據,以便於將數據作為有效負荷發布。AMQP通常使用“content-type”與“content-encoding”字段來傳達這些信息,但是僅按照規定進行。
消息可能以持久化的形式發布,這使得AMQP broker將它們持久化到磁盤中。如果服務被重啟系統確保它們所接收的持久化的消息不會丟失。簡單的將消息發布到一個持久的exchange或者它們被路由到的queue是持久的並不能使消息持久。消息的持久性完全取決於它本身。發布持久化的消息會影響性能(就像數據存儲,持久性在性能上有一定的成本)。
Message acknowledgements
因為網絡是不可靠的,應用程序也會失敗,所以需要某種類型的處理確認。有時僅僅需要確認消息已被接收的事實,有時確認表示消息被確認有效並且被consumer所處理,例如被確認為是強制性的數據且被持久化到數據存儲或索引中。
這種情況是非常常見的,所以AMQP 0-9-1有一個內置的被稱為消息確認的功能(有時被稱為acks),consumers用來確認消息傳送或者處理。如果應用程序崩潰(當連接關閉時AMQP broker發出通知)。如果對於消息的確認是預期的但是沒有被AMQP broker接收到,消息會被重新加入queue(可能理解被傳送到另外一個consumer,如果有其他consumer存在)。
協議中的消息確認幫助開發者開發出更強壯的軟件。
AMQP 0-9-1 方法
AMQP 0-9-1被構造成一系列的方法。方法即操作(像HTTP方法),而且與面向對象開發語言的方法不相同。AMQP方法被分組為類。類僅僅是AMQP方法的邏輯分組。AMQP 0-9-1 reference提供了所有關於AMQP方法的完整介紹。
讓我們看一下exchange類,一系列有關exchange操作的方法。它包含了一下的操作。
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
(請注意,RabbitMQ網站參考資料也包含了對exchange類特屬於RabbitMQ的擴展,這些我們不在這篇文章中討論)
以上的操作行成了邏輯隊:exchange.declare和exchange.declare-ok,exchange.delete和exchange.delete-ok。這些操作是“請求”(被客戶端發送)和“響應”(brokers在響應前面提到的“請求”時發送)。
例如,客戶端請求broker使用exchange.declare方法申明一個新的exchange:
如上圖所示,exchange.declare攜帶了幾個參數。它們使客戶端能夠指定exchange名稱,類型,持久性等待。
如果操作成功,broker使用exchange.declare-ok方法響應。
除了channel號碼exchange.declare-ok不攜帶任何參數(稍后將在這篇文章后面描述channel)
事件的順序與另外一組AMQP queue類中的方法很相似:queue.declare和queue.declare-ok:
並不是所有的AMQP方法都有與之相對應的方法。一些(basic.publish是使用最廣發的一個)沒有相對於的“響應”方法和其他一些(例如basic.get)有多個可能的“響應”。
Connections
AMQP連接通常是長期存活的。AMQP是一個應用級協議,它使用TCP保持穩定傳輸。AMQP連接使用身份認證並且可以使用TLS(SSL)來保護連接。當應用程序不再需要連接到AMQP broker時,它應該優雅的關閉AMQP連接而不是突然關閉底層的TCP連接。
Channels
一些應用程序需要多余AMQP broker進行多連接。然而在同一時間保持多個TCP連接是不可取的,因為這么做消耗系統資源並且讓配置防火牆變得更加困難。AMQP 0-9-1連接是與channel多路復用的,channel被認為是“共享單個TCP連接的輕量級連接”。
對於使用多線程或者多進程進行處理的應用程序,在每一個線程或者進程中打開一個新的channel而不是彼此間共享一個channel是非常常見的。
一個特定channel上的通信與另一個channel上的通信是完全分開的,所以每個AMQP方法也包含channel號碼,客戶端用該channel號碼來計算方法用於哪個通道(比如哪個事件處理程序需要被激活)
Virtual Hosts
為了讓單個broker能夠托管多個隔離的“環境”(成群的用戶,exchanges,queues等等),AMQP加入了virtual host的概念(vhosts)。它們與許多流行的web服務器使用的virtual hosts相似,並且為AMQP實體提供完全的隔離環境。AMQP客戶端指定了在AMQP連接回話期間需要用的vhosts。
AMQP 是可擴展的
AMQP 0-9-1有幾個擴展點:
- Custom exchange types允許開發者實現路由選擇策略,而exchange類型提供的開箱即用方式不能完全的覆蓋,例如geodata-based路由。
- 申明exchanges與queues可以包含broker可使用的附加屬性。例如在RabbitMQ中,per-queue message TTL就是按這種方式實現的。
- 對於協議中特定於broker的擴展,例如,extensions that RabbitMQ implements.
- New AMQP 0-9-1 method classes可以被引入。
- Broker可以使用additional plugins擴展代理,例如,RabbitMQ management前端和HTTP API是以插件的形式實現的。
這些特性讓AMQP 0-9-1模型更加靈活,適用於非常廣泛的問題。
AMQP 0-9-1客戶生態系統
許多流行的開發語言與平台都有很多RabbitMQ management。它們中的某些嚴格的遵循了AMQP術語並且僅僅提供AMQP方法的實現。其他的有附加的功能,方便的方法與抽象。一些客戶端是異步的(非阻塞的)。一些是同步的(阻塞的),一些支持兩種模型的。一些客戶端支持特定於供應商的擴展(比如特定於RabbitMQ的擴展)。
因為AMQP的主要目標之一是可互操作性,對於開發者而言,理解協議操作是一個非常好的主意,而不不是受特定客戶端術語的限制。用這種方式與使用不同的庫的開發者溝通將會很簡單。
文章來源:http://www.rabbitmq.com/tutorials/amqp-concepts.html