本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什么。第二部分將介紹其在 OpenStack 中的使用。
1 RabbitMQ 的基本概念
RabbitMQ 是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。
AMQP 是一個定義了在應用或者組織之間傳送消息的協議的開放標准 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送消息存在的下列問題:
- 網絡是不可靠的 =>消息需要保存后再轉發並有出錯處理機制
- 與本地調用相比,網絡速度慢 =>得異步調用
- 應用之間是不同的(比如不同語言實現、不同操作系統等) =>得與應用無關
- 應用會經常變化 =>同上
AMQP 使用異步的、應用對應用的、二進制數據通信來解決這些問題。
RabbitMQ 是 AMQP 的一種實現,它包括Server (服務器端)、Client (客戶端) 和 Plugins (插件)。RabbitMQ 服務器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)發布的 3.4.4,而 OpenStack Juno 中使用的 Server 是 2014年3月發布的 3.2.4 版本。現在 RabbitMQ 支持的 AMQP 版本依然是0.9.1。
1.1 RabbitMQ 的概念非常清晰、簡潔
其基本概念參見下圖:
RabbitMQ 官網 和其它網站上有很多文章來描述其基本概念。簡單說明如下:
- Message (消息):RabbitMQ 轉發的二進制對象,包括Headers(頭)、Properties (屬性)和 Data (數據),其中數據部分不是必要的。具體見 1.2 部分的描述。
- Producer(生產者): 消息的生產者,負責產生消息並把消息發到交換機 Exhange的應用。
- Consumer (消費者):使用隊列 Queue 從 Exchange 中獲取消息的應用。
- Exchange (交換機):負責接收生產者的消息並把它轉到到合適的隊列 Queue 。下面有 1.3 部分描述。
-
Queue (隊列):一個存儲Exchange 發來的消息的緩沖,並將消息主動發送給Consumer,或者 Consumer 主動來獲取消息。見 1.4 部分的描述。
- Binding (綁定):隊列 和 交換機 之間的關系。Exchange 根據消息的屬性和 Binding 的屬性來轉發消息。綁定的一個重要屬性是 binding_key。
- Connection (連接)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連接。一些應用需要多個connection,為了節省TCP 連接,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連接的連接。連接需要用戶認證,並且支持 TLS (SSL)。連接需要顯式關閉。
- Virtual Host (虛擬主機) :RabbitMQ 用來進行資源隔離的機制。一個虛機主機會隔離用戶、exchange、queue 等。默認的虛擬主機為 "/"。
1.2 關於消息 message
消息結構:
消息的幾個重要屬性:
- routing_key:Direct 和 Topic 類型的 exchange 會根據本屬性來轉發消息。
-
delivery_mode: 將其值設置為 2 將用於消息的持久化,持久化的消息會被保存到磁盤上來防止其丟失。下面章節 3 有描述。
- reply_to:一般用來表示RPC實現中客戶端的回調隊列的名字。下面章節 4 有描述。
- correlation_id:用於使用 RabbitMQ 來實現 RPC的情形。下面章節 4 有描述。
- content_type:表示消息data的編碼格式名稱。實際上RabbitMQ只負責原樣傳送消息因此不會使用該屬性,該屬性只被 Publisher 和 Consumer 使用。
消息的確認/刪除機制:
Consumer 處理消息可能會失敗,那么 RabbitMQ 怎么知道什么時候來刪除 queue 中的消息呢?它使用兩種機制:
- 當 RabbitMQ 主動將消息發給 Consumer 以后,它會刪除消息
- 當 Consumer 發回一個確認后,RabbitMQ 會刪除消息。
第二種情況下,如果 RabbitMQ 沒收到確認,它會把消息重新放進隊列(re-queued)並添加標識 'redelivered' 表明該消息之前已經發送過 ,如果沒有Consumer的話,消息將保持到有下一個 Consumer 為止。
Consumer 可以主動告訴 RabbitMQ 消息處理失敗了(拒絕消息),並告知RabbitMQ 是刪除消息還是重新放進隊列。
1.3 exchange 交換機
exchange 有幾個重要的屬性:
- Name 名字:交換機名字。空字符串名字的exchange為默認的exchange。
- Type 類型:Direct, Fanout, Topic, Headers。類型決定 exchange 的消息轉發能力。下面 章節2 有描述。
- durable:值為 True/False。值為 true 的 exchange 在 rabbitmq 重啟后會被自動創建。OpenStack 使用的exchange的該值都為false。
- auto_delete:值為 True/False。設置為 true 的話,當所有消費者的連接都關閉后,該 exchange 會被自動刪除。OpenStack 使用的exchange的該值都為false。
- exclusive:值為 True/False。設置為 true 的話,該 exchange 只允許被創建的connection使用,並且在該 connection 關閉后它會被自動刪除。
RabbitMQ 默認會為每一種類型生成一個或者兩個的默認的 exchange:
- Fanout 類型:名字為 amq.fanout
- Topic 類型: 名字為 amq.topic
- Headers 類型:名字為 amq.match 和 amq.headers
- Direct 類型:名字為空字符串的exchange 以及 amq.direct。其中名字為空的exchange比較特殊。在一個 Queue 被創建后,RabbitMQ 會自動建立它和該 exchange 之間的binding,並且設置其 binding_key 為該queue 的名字。這樣,該語句 channel.basic_publish(exchange='', routing_key='hello', body=message) 會讓該默認的 exchange 將該 message 轉發到名字為 'hello' 的隊列中。
1.4 隊列 Queue
隊列同樣有類似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,並且含義相同。
Exchange 會將消息分發(copy)到符合要求的所有隊列中。
Consumer 可以主動獲取或者被動接受Queue里面的消息:
- 被動接收消息(訂閱消息 "push API"):使用 basic.consume(short reserved-1, queue-name queue, consumer-tag consumer-tag,no-local no-local, no-ack no-ack, bit exclusive, no-wait no-wait,table arguments)
方法。見 5.1 示例代碼。 - 主動獲取消息 ("pull API"): 使用 basic.get(short reserved-1, queue-name queue, no-ack no-ack) 方法。
一個 Queue 允許有多個 Consumer,比如利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,消息會在這些 Consumer 之間根據 channel 的 prefetch level 做分發(請參見AQMP: QoS or message prefetching),如果該值一樣的話,消息會被平均分發給這些Consumer。
1.5 rabbitmqctl Cli
RabbitMQ 提供Cli rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。常用到的命令有:
- stop/start_app
- add/delete/list_vhosts
- list_queues/exchanges/bindings/connections/channels
- trace_on/off
2 消息轉發機制
Exchange 根據它自身的類型 type、消息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發消息。
Exchange 的類型 Type | 使用的消息屬性 | 使用的Binding 屬性 | 轉發模式 |
Fanout | - (忽略消息的轉發屬性) | - (忽略binding的轉發屬性) | Exchange 將消息轉發到所有與它有 binding 關系的隊列中。 這種方法轉發效率較高。OpenStack 大量使用這種類型的 exchange。 |
Direct | routing_key (任意的字符串,比如 "abc") | binding_key (任意的字符串,比如 "abc") | Exchange 只將消息轉到 binding 的 binding_key 等於消息的 routing_key 的隊列中。 |
Topic | routing_key (以 "." 分割的多單詞字符串,比如 abc.efg.hij) | binding_key (包含 "#" 和 "*" 的以 “.” 分割的多單詞字符串,比如 *.efg.*) | Exchange 只將消息轉到消息的 routing_key 和 binding 的 binding_key 匹配的隊列中。匹配規則如下: (1)兩者以"."分割的單詞數目相同 (2)"*"可代表一個單詞 (3)"#“可代表零個或多個單詞 |
Headers | headers (消息頭) | binding_key | Exchange 只將消息轉到消息的 headers 和 binding 的 binding_key 匹配的隊列中。匹配規則待研究。 OpenStack不使用該類型的exchange。 |
參考文檔:
https://www.rabbitmq.com/getstarted.html 這里有詳細的闡述和示例源代碼。
http://www.cnblogs.com/starof/p/4173413.html 這里有官網文檔的中文版。
3 持久化
消息的持久化意味着在 RabbitMQ 被重啟后,消息依然還在。要實現持久化,得實現幾個相關組件的持久化:
(1). 交換機的持久化,需要將其 durable 屬性設為 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
(2). 隊列的持久化,需要將其 durable 屬性設置為 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
4 RPC
可以使用 RabbitMQ 來實現 RPC 機制,這里說說其實現原理:
過程:
(1). 客戶端 Client 設置消息的 routing key 為 Service 的隊列 op_q;設置消息的 reply-to 屬性為返回的 response 的目標隊列 reponse_q,設置其 correlation_id 為以隨機UUID,然后將消息發到 exchange。比如 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
(2). Exchange 將消息轉發到 Service 的 op_q
(3). Service 收到該消息后進行處理,然后將response 發到 exchange,並設置消息的 routing_key 為原消息的 reply_to 屬性,以及設置其 correlation_id 為原消息的 correlation_id 。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))
(4). Exchange 將消息轉發到 reponse_q
(5). Client 逐一接受 response_q 中的消息,檢查消息的 correlation_id 是否為等於它發出的消息的correlation_id,是的話表明該消息為它需要的response。
這里有詳細的闡述。
5 Python AMQP SDK
常用的Python AMQP SDK包括:
- py-amqplib (支持 AMQP 0.8): http://barryp.org/software/py-amqplib/
- pika (支持 AMQP 0.9.1,Python 2.6 和 2.7):https://github.com/pika/pika
- txamqp: https://launchpad.net/txamqp
5.1 一個簡單的使用 py-amqplib 的 Consumer 實現
#創建Connection和Channel連接到 RabbitMQ 服務器 conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel() #創建 queue result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False) #創建 exchange result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,) #創建 binding result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug") #回調函數,當有 message 到達 queue 后,該函數會被調用 def recv_callback(msg): print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag) #如果no_ack=False的話,可以需要發回一個確認
#啟動一個 consumer,consumer_tag 是該 consumer 的一個唯一標識符
#no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有消息發到 queue while True: chan.wait()
#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()
5.2 一個簡單的使用 py-amqplib 的 Producer 實現代碼
from amqplib import client_0_8 as amqp import sys
#創建 connection 和 channel conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#創建 message msg = amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2
#發送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel chan.close() conn.close()
5.3 使用 pika
5.3.1 安裝 pika
wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c tar zxvf pika-0.9.14.tar.gz cd pika-0.9.14 python setup.py install
5.3.2 使用 pika 編程(來源)
#!/usr/bin/env python ''' rabbitmq trace scripts. require (rabbitmq_tracing): $ sudo rabbitmq-plugins enable rabbitmq_tracing usage: $ sudo rabbitmqctl trace_on $ ./rabbitmqtrace.py << output >> ''' import sys import time from optparse import OptionParser import pika __AUTHOR__ = 'smallfish' __VERSION__ = '0.0.1' def _out(args): print time.strftime('%Y-%m-%d %H:%M:%S'), args def _run(host, port, vhost, user, password): conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost, credentials=pika.PlainCredentials(user, password))) chan = conn.channel() def _on_message(ch, method, properties, body): ret = {} ret['routing_key'] = method.routing_key ret['headers'] = properties.headers ret['body'] = body _out(ret) _out('start subscribe amq.rabbitmq.trace') ret = chan.queue_declare(exclusive=False, auto_delete=True) queue = ret.method.queue chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#') chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#') chan.basic_consume(_on_message, queue=queue, no_ack=True) chan.start_consuming() def main(): parser = OptionParser('usage: %prog') parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default') parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default') parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default') parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default') parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default') (options, args) = parser.parse_args() _run(options.host, options.port, options.vhost, options.user, options.password) if __name__ == '__main__': main()
6 插件和消息追蹤
RabbitMQ 支持使用插件來支持 Management, Federation, Shovel 和 STOMP。所有的插件都在這里。
6.1 rabbitmq-management 插件
它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還可以通過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。
6.2 RabbitMQ 的 firehose 機制
該機制提供了一個查看被轉發的消息的途徑。當打開 firehose 的時候,RabbitMQ 會自動建立 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你可以編程創建queue 從這兩個 exchange 里面獲取 trace 和 log,從而觀察每一個被處理的消息。這里有一個開源代碼實現。
6.3 rabbitmq_tracing 插件
rabbitmq_tracing 插件在 management 插件增加了消息追蹤的方法,它是從 firehose 中獲取數據。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你可以在 management GUI 中追蹤消息:
自此,RabbitMQ 基本上算熟悉了,接下來可以開始分析 OpenStack 中是如何使用 RabbitMQ 了。