AMQP(Advanced Message Queuing Protocol, 高級消息隊列協議)是一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。
RabbitMQ是一個實現了AMQP協議標准的開源消息代理和隊列服務器。
1、基本概念
在服務器中,三個主要功能模塊連接成一個處理鏈完成預期的功能:
1)“exchange”接收發布應用程序發送的消息,並根據一定的規則將這些消息路由到“消息隊列”。
2)“message queue”存儲消息,直到這些消息被消費者安全處理完為止。
3)“binding”定義了exchange和message queue之間的關聯,提供路由規則。
使用這個模型我們可以很容易的模擬出存儲轉發隊列和主題訂閱這些典型的消息中間件概念。
上圖中各個模塊的說明如下:
-
Broker: 接收和分發消息的應用,RabbitMQ Server就是Message Broker。
-
Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本組件划分到一個虛擬的分組中,類似於網絡中的namespace概念。當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange、queue等。
-
Connection: publisher、consumer和broker之間的TCP連接。斷開連接的操作只會在client端進行,Broker不會斷開連接,除非出現網絡故障或broker服務出現問題。
-
Channel: 如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。
-
Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
-
Queue: 消息最終被送到這里等待consumer取走。一個message可以被同時拷貝到多個queue中。
-
Binding: exchange和queue之間的虛擬連接,binding中可以包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。
2、核心概念
1)Exchange和Binding
交換機Exchange拿到一個消息之后會將它路由給隊列。Exchange使用哪種方式路由是由Binding規則決定的。
a)直連交換機
根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列。直連交換機用來處理消息的單播路由。
Message中的“routing key”如果和Binding中的“binding key”一致, Direct exchange則將message發到對應的queue中。
b)主題交換機
通過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機用來實現消息的多播路由。
c)扇形交換機
將消息路由給綁定到它身上的所有隊列,且不理會路由鍵。扇形交換機用來處理消息的廣播路由。
2)ACK - 消息確認
默認情況下,如果Message 已經被某個Consumer正確的接收到了,那么該Message就會被從queue中移除。當然也可以讓同一個Message發送到很多的Consumer。
如果一個queue沒被任何的Consumer Subscribe(訂閱),那么,如果這個queue有數據到達,那么這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被立即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。
那么什么是正確收到呢?通過ack。每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:
RabbitMQ Server會把這個信息發送到下一個Consumer。
如果這個app有bug,忘記了ack,那么RabbitMQ Server不會再發送數據給它,因為Server認為這個Consumer處理能力有限。
而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的balance Consumer的load。
當然對於實際的例子,比如我們可能會對某些數據進行merge,比如merge 4s內的數據,然后sleep 4s后再獲取數據。特別是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端用戶也不會感覺到。
3)創建隊列
Consumer和Procuder都可以通過 queue.declare 創建queue。對於某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以創建私有的queue。這樣只有app本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最后一個Consumer unsubscribe后就會被自動刪除。那么如果是創建一個已經存在的queue呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是queue的屬性並不會被修改。
那么誰應該負責創建這個queue呢?是Consumer,還是Producer?
如果queue不存在,當然Consumer不會得到任何的Message。但是如果queue不存在,那么Producer Publish的Message會被丟棄。所以,還是為了數據不丟失,Consumer和Producer都try to create the queue!反正不管怎么樣,這個接口都不會出問題。
queue對load balance的處理是完美的。對於多個Consumer來說,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不同的Consumer。
3、RabbitMQ
send.py
# -*- coding:utf-8 -*- import pika from constant import rabbit_config as config from constant import app_name # 權限驗證 credentials = pika.PlainCredentials( config.get('username'), config.get('password') ) # 鏈接參數 # virtual_host, 在多租戶系統中隔離exchange, queue params = pika.ConnectionParameters( host=config.get('host'), port=config.get('port'), virtual_host=app_name, credentials=credentials ) # 建立鏈接 connection = pika.BlockingConnection(parameters=params) # 從鏈接中獲得信道 channel = connection.channel() # 聲明交換機 channel.exchange_declare( exchange='exchangeA', exchange_type='direct', passive=False, durable=True, auto_delete=False ) # consumer創建隊列, 如果沒有就創建 # 隊列一旦被創建, 再進行的重復創建會簡單的失效, 所以建議在producer和consumer同時創建隊列, 避免隊列創建失敗 # 創建隊列回調函數, callback. # auto_delete=True, 如果queue失去了最后一個subscriber會自動刪除, 隊列中的message也會失效. # 默認auto_delete=False, 沒有subscriber的隊列會cache message, subscriber出現后將緩存的message發送. channel.queue_declare(queue='standard', auto_delete=True) # delivery_mode=2表示讓消息持久化, 重啟RabbitMQ也不丟失. # 考慮成本, 開啟此功能, 建議把消息存儲到SSD上. props = pika.BasicProperties(content_type='text/plain', delivery_mode=2) # 發布消息到exchange channel.basic_publish( exchange='exchangeA', routing_key='a_routing_key', body='Hello World!', properties=props ) print(" [x] Sent 'Hello World!'") # 關閉鏈接 connection.close()
receive.py
# -*- coding:utf-8 -*- import pika from constant import rabbit_config as config from constant import app_name # 權限驗證 credentials = pika.PlainCredentials( config.get('username'), config.get('password') ) # 鏈接參數 # virtual_host, 在多租戶系統中隔離exchange, queue params = pika.ConnectionParameters( host=config.get('host'), port=config.get('port'), virtual_host=app_name, credentials=credentials ) # 建立鏈接 connection = pika.BlockingConnection(parameters=params) # 從鏈接中獲得信道 channel = connection.channel() # 聲明交換機, 直連方式, 后面將會創建binding將exchange和queue綁定在一起 channel.exchange_declare( exchange='exchangeA', exchange_type='direct', passive=False, durable=True, auto_delete=False, ) # consumer創建隊列, 如果沒有就創建 # 隊列一旦被創建, 再進行的重復創建會簡單的失效, 所以建議在producer和consumer同時創建隊列, 避免隊列創建失敗 # 創建隊列回調函數, callback. # auto_delete=True, 如果queue失去了最后一個subscriber會自動刪除, 隊列中的message也會失效. # 默認auto_delete=False, 沒有subscriber的隊列會cache message, subscriber出現后將緩存的message發送. channel.queue_declare(queue='standard', auto_delete=True) # 通過binding將隊列queue和交換機exchange綁定 channel.queue_bind( queue='standard', exchange='exchangeA', routing_key='a_routing_key' ) # 處理接收到的消息的回調函數 # method_frame攜帶了投遞標記, header_frame表示AMQP信息頭的對象 def callback(channel, method_frame, header_frame, body): channel.basic_ack(delivery_tag=method_frame.delivery_tag) print(" [x] Received %r" % body) # 訂閱隊列, 我們設置了不進行ACK, 而把ACK交給了回調函數來完成 channel.basic_consume( callback, queue='standard', no_ack=True, ) try: print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # 關閉鏈接 connection.close()