[源碼解析] 消息隊列 Kombu 之 基本架構
0x00 摘要
從本文開始,我們通過一個系列來介紹消息隊列 Kombu(為后續Celery分析打基礎)。
Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象,是一個把消息傳遞封裝成統一接口的庫。其特點是支持多種的符合APMQ協議的消息隊列系統。不僅支持原生的AMQP消息隊列如RabbitMQ、Qpid,還支持虛擬的消息隊列如redis、mongodb、beantalk、couchdb、in-memory等。
通過本系列,大家可以了解 Kombu 是如何實現 AMQP。本文先介紹相關概念和整體邏輯架構。
0x01 AMQP
介紹 AMQP 是因為 Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象。
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是一個進程間傳遞異步消息的網絡協議。
1.1 基本概念
AMQP的基本概念如下:
- 生產者和消費者:生產者創建消息,然后發布到代理服務器的隊列中,代理服務器會把消息發送給感興趣的接受方。消費者連接到代理服務器,並訂閱到隊列上,從而接收消息。
- 通道 channel:信道是 “真實的” TCP連接內的虛擬連接,AMQP的命令都是通過通道發送的。在一條TCP連接上可以創建多條信道。
- 有些應用需要與 AMQP 代理建立多個連接。同時開啟多個 TCP 連接不合適,因為會消耗掉過多的系統資源並且使得防火牆的配置更加困難。AMQP 0-9-1 提供了通道(channels)來處理多連接,可以把通道理解成共享一個 TCP 連接的多個輕量化連接。
- 在涉及多線程 / 進程的應用中,為每個線程 / 進程開啟一個通道(channel)是很常見的,並且這些通道不能被線程 / 進程共享。
- 一個特定通道上的通訊與其他通道上的通訊是完全隔離的,因此每個 AMQP 方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道准備的。
- 隊列:存放消息的地方,隊列通過路由鍵綁定到交換機,生產者通過交換機將消息發送到隊列中。我們可以說應用注冊了一個消費者,或者說訂閱了一個隊列。一個隊列可以注冊多個消費者,也可以注冊一個獨享的消費者(當獨享消費者存在時,其他消費者即被排除在外)。
- Exchange 和 綁定:生產者發布消息時,先將消息發送到Exchange,通過Exchange與隊列的綁定規則將消息發送到隊列。
- 交換機是用來發送消息的 AMQP 實體。交換機拿到一個消息之后將它路由給一個或零個隊列。它使用哪種路由算法是由交換機類型和綁定(Bindings)規則所決定的。
- 交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。
- 常見的Exchange有topic、fanout、direct:
- direct Exchange:direct交換機是包含空白字符串的默認交換機,當聲明隊列時會主動綁定到默認交換機,並且以隊列名稱為路由鍵;
- fanout Exchange:這種交換機會將收到的消息廣播到綁定的隊列;
- topic Exchange:topic交換機可以通過路由鍵的正則表達式將消息發送到多個隊列;
1.2 工作過程
工作過程是:
- 發布者(Publisher)發布消息(Message),經由交換機(Exchange)。消息從來不直接發送給隊列,甚至 Producers 都可能不知道隊列的存在。 消息是發送給交換機,給交換機發送消息時,需要指定消息的 routing_key 屬性;
- 交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。交換機收到消息后,根據 交換機的類型,或直接發送給隊列 (fanout), 或匹配消息的 routing_key 和 隊列與交換機之間的 banding_key。 如果匹配,則遞交消息給隊列;
- 最后 AMQP 代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。Consumers 從隊列取得消息;
基本如下圖:
+----------------------------------------------+
| AMQP Entity |
| |
| |
| |
+-----------+ | +------------+ binding +---------+ | +------------+
| | | | | | | | | |
| Publisher | +------> | Exchange | +---------> | Queue | +--------> | Consumer |
| | | | | | | | | |
+-----------+ | +------------+ +---------+ | +------------+
| |
| |
+----------------------------------------------+
0x02 Poll系列模型
Kombu 利用了 Poll 模型,所以我們有必要介紹下。這就是IO多路復用。
IO多路復用是指內核一旦發現進程指定的一個或者多個IO條件准備讀取,它就通知該進程。IO多路復用適用比如當客戶處理多個描述字時(一般是交互式輸入和網絡套接口)。
與多進程和多線程技術相比,I/O多路復用技術的最大優勢是系統開銷小,系統不必創建進程/線程,也不必維護這些進程/線程,從而大大減小了系統的開銷。
2.1 select
select 通過一個select()系統調用來監視多個文件描述符的數組(在linux中一切事物皆文件,塊設備,socket連接等)。
當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位(變成ready),使得進程可以獲得這些文件描述符從而進行后續的讀寫操作(select會不斷監視網絡接口的某個目錄下有多少文件描述符變成ready狀態【在網絡接口中,過來一個連接就會建立一個'文件'】,變成ready狀態后,select就可以操作這個文件描述符了)。
2.2 poll
poll 和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。
select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll() 的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
2.3 epoll
epoll由內核直接支持,可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表 就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了 這些文件描述符在系統調用時復制的開銷。
另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描 述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調 機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
2.4 通俗理解
2.4.1 阻塞I/O模式
阻塞I/O模式下,內核對於I/O事件的處理是阻塞或者喚醒,一個線程只能處理一個流的I/O事件。如果想要同時處理多個流,要么多進程(fork),要么多線程(pthread_create),很不幸這兩種方法效率都不高。
2.4.2 非阻塞模式
非阻塞忙輪詢的I/O方式可以同時處理多個流。我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了,但這樣的做法顯然不好,因為如果所有的流都沒有數據,那么只會白白浪費CPU。
2.4.2.1 代理模式
非阻塞模式下可以把I/O事件交給其他對象(select以及epoll)處理甚至直接忽略。
為了避免CPU空轉,可以引進一個代理(一開始有一位叫做select的代理,后來又有一位叫做poll的代理,不過兩者的本質是一樣的)。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當前線程阻塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,於是我們的程序就會輪詢一遍所有的流(於是我們可以把“忙”字去掉了)。代碼長這樣:
while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
}
於是,如果沒有I/O事件產生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至全部),我們只能無差別輪詢所有流,找出能讀出數據,或者寫入數據的流,對他們進行操作。
2.4.2.2 epoll
epoll可以理解為event poll,不同於忙輪詢和無差別輪詢,epoll只會把哪個流發生了怎樣的I/O事件通知我們。此時我們對這些流的操作都是有意義的(復雜度降低到了O(1))。
epoll版服務器實現原理類似於select版服務器,都是通過某種方式對套接字進行檢驗其是否能收發數據等。但是epoll版的效率要更高,同時沒有上限。
在select、poll中的檢驗,是一種被動的輪詢檢驗,而epoll中的檢驗是一種主動地事件通知檢測,即:當有套接字符合檢驗的要求,便會主動通知,從而進行操作。這樣的機制自然效率會高一點。
同時在epoll中要用到文件描述符,所謂文件描述符實質上是數字。
epoll的主要用處在於:
epoll_list = epoll.epoll()
如果進程在處理while循環中的代碼時,一些套接字對應的客戶端如果發來了數據,那么操作系統底層會自動的把這些套接字對應的文件描述符寫入該列表中,當進程再次執行到epoll時,就會得到了這個列表,此時這個列表中的信息就表示着哪些套接字可以進行收發了。因為epoll沒有去依次的查看,而是直接拿走已經可以收發的fd,所以效率高!
0x03 Kombu 基本概念
Kombu的最初的實現叫做carrot,后來經過重構才成了Kombu。
3.1 用途
Kombu 主要用途如下:
- Celery是Python中最流行的異步消息隊列框架,支持RabbitMQ、Redis、ZoopKeeper等作為Broker,而對這些消息隊列的抽象,都是通過Kombu實現的:
- Celery一開始先支持的RabbitMQ,也就是使用 AMQP 協議。由於要支持越來越多的消息代理,但是這些消息代理是不支持 AMQP 協議的,需要一個東西把所有的消息代理的處理方式統一起來,甚至可以理解為把它們「偽裝成支持AMQ協議」。
- Kombu實現了對AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。
- OpenStack 默認 是使用kombu連接rabbitmq服務器。OpenStack使用kombu作為消息隊列使用的client庫而沒有用廣泛使用的pika庫有兩個原因:
- kombu除了支持純AMQP的實現還支持虛擬AMQP的實現作為消息隊列系統,如redis、mongodb、beantalk等。
- kombu可以通過配置設置AMQP連接的底層庫,比如librabbitmq或者pyamqp。前者是一個python嫁接C庫的實現,后者是一個純python的實現。如果用純python實現的AMQP庫,就可以應用eventlet的框架將設計網絡IO的部分變為協程,提高整體的網絡IO性能。如openstack內部使用的就是eventlet的框架。
3.2 術語
在 Kombu 中,存在多個概念(部分和AMQP類似),他們分別是:
-
Message:消息,發送和消費的主體,生產消費的基本單位,其實就是我們所謂的一條條消息;
-
Connection:對 MQ 連接的抽象,一個 Connection 就對應一個 MQ 的連接;Connection 是 AMQP 對 連接的封裝;
-
Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連接;Channel 是 AMQP 對 MQ 的操作的封裝;
-
Transport:kombu 支持將不同的消息中間件以插件的方式進行靈活配置,使用transport這個術語來表示一個具體的消息中間件,可以認為是對broker的抽象:
- 對 MQ 的操作必然離不開連接,但是,Kombu 並不直接讓 Channel 使用 Connection 來發送/接受請求,而是引入了一個新的抽象 Transport,Transport 負責具體的 MQ 的操作,也就是說 Channel 的操作都會落到 Transport 上執行。引入transport這個抽象概念可以使得后續添加對non-AMQP的transport非常簡單;
- Transport是真實的 MQ 連接,也是真正連接到 MQ(redis/rabbitmq) 的實例;
- 當前Kombu中build-in支持有Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ和Pyro;
-
Producers: 發送消息的抽象類;
-
Consumers:接受消息的抽象類。consumer需要聲明一個queue,並將queue與指定的exchange綁定,然后從queue里面接收消息;
-
Exchange:MQ 路由,這個和 RabbitMQ 差不多,支持 5 類型。消息發送者將消息發至Exchange,Exchange負責將消息分發至隊列。用於路由消息(消息發給exchange,exchange發給對應的queue)。路由就是比較routing-key(這個message提供)和binding-key(這個queue注冊到exchange的時候提供)。使用時,需要指定exchange的名稱和類型(direct,topic和fanout)。
交換機通過匹配消息的 routing_key 和 binding_key來轉發消息,binding_key 是consumer 聲明隊列時與交換機的綁定關系。
-
Queue:對應的 queue 抽象,存儲着即將被應用消費掉的消息,Exchange負責將消息分發Queue,消費者從Queue接收消息。
-
Routing keys: 每個消息在發送時都會聲明一個routing_key。routing_key的含義依賴於exchange的類型。一般說來,在AMQP標准里定義了四種默認的exchange類型,此外,vendor還可以自定義exchange的類型。最常用的三類exchange為:
- Direct exchange: 如果message的routing_key和某個consumer中的routing_key相同,就會把消息發送給這個consumer監聽的queue中。
- Fan-out exchange: 廣播模式。exchange將收到的message發送到所有與之綁定的queue中。
- Topic exchange: 該類型exchange會將message發送到與之routing_key類型相匹配的queue中。routing_key由一系列“.”隔開的word組成,“*”代表匹配任何word,“#”代表匹配0個或多個word,類似於正則表達式。
0x04 概念具體說明
4.1 概述
以 redis 為 broker,我們簡要說明:
- 發送消息的對象,稱為生產者Producer。
- connections建立 redis 連接,channel 是一次連接會話。
- Exchange 負責交換消息,消息通過channel發送到Exchange,由於Exchange綁定Queue和routing_key。消息會被轉發到 redis 中匹配routing_key的Queue中。
- 在Queue另一側的消費者Consumer 一直對Queue進行監聽,一旦Queue中存在數據,則調用callback方法處理消息。
4.2 Connection
Connection是對 MQ 連接的抽象,一個 Connection 就對應一個 MQ 的連接。現在就是對 'redis://localhost:6379'
連接進行抽象。
conn = Connection('redis://localhost:6379')
由之前論述可知,Connection是到broker的連接。從具體代碼可以看出,Connection更接近是一個邏輯概念,具體功能都委托給別人完成。
Connection主要成員變量是:
- _connection:kombu.transport.redis.Transport 類型,就是真正用來負責具體的 MQ 的操作,也就是說對 Channel 的操作都會落到 Transport 上執行。
- _transport:就是上面提到的對 broker 的抽象。
- cycle:與broker交互的調度策略。
- failover_strategy:在連接失效時,選取其他hosts的策略。
- heartbeat:用來實施心跳。
精簡版定義如下:
class Connection:
"""A connection to the broker"""
port = None
_connection = None
_default_channel = None
_transport = None
#: Iterator returning the next broker URL to try in the event
#: of connection failure (initialized by :attr:`failover_strategy`).
cycle = None
#: Additional transport specific options,
#: passed on to the transport instance.
transport_options = None
#: Strategy used to select new hosts when reconnecting after connection
#: failure. One of "round-robin", "shuffle" or any custom iterator
#: constantly yielding new URLs to try.
failover_strategy = 'round-robin'
#: Heartbeat value, currently only supported by the py-amqp transport.
heartbeat = None
failover_strategies = failover_strategies
4.3 Channel
Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連接。就是真正的連接。
- Connection 是 AMQP 對 連接的封裝;
- Channel 是 AMQP 對 MQ 的操作的封裝;
Channel 可以認為是 redis 操作和連接的封裝。每個 Channel 都可以與 redis 建立一個連接,在此連接之上對 redis 進行操作,每個連接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。
4.3.1 定義
簡化版定義如下:
class Channel(virtual.Channel):
"""Redis Channel."""
QoS = QoS
_client = None
_subclient = None
keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
keyprefix_fanout = '/{db}.'
sep = '\x06\x16'
_fanout_queues = {}
unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
unacked_mutex_expire = 300 # 5 minutes
unacked_restore_limit = None
visibility_timeout = 3600 # 1 hour
max_connections = 10
queue_order_strategy = 'round_robin'
_async_pool = None
_pool = None
from_transport_options = (
virtual.Channel.from_transport_options +
('sep',
'ack_emulation',
'unacked_key',
......
'max_connections',
'health_check_interval',
'retry_on_timeout',
'priority_steps') # <-- do not add comma here!
)
connection_class = redis.Connection if redis else None
self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
4.3.2 redis消息回調函數
關於上面成員變量,這里需要說明的是
handlers = {dict: 2}
{
'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>,
'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>
}
這是redis有消息時的回調函數,即:
- BPROP 有消息時候,調用 Channel._brpop_read;
- LISTEN 有消息時候,調用 Channel._receive;
大約如下:
+---------------------------------------------------------------------------------------------------------------------------------------+
| +--------------+ 6 parse_response |
| +--> | Linux Kernel | +---+ |
| | +--------------+ | |
| | | |
| | | event |
| | 1 | |
| | | 2 |
| | | |
+-------+---+ socket + | |
| redis | <------------> port +--> fd +--->+ v |
| | | +------+--------+ |
| | socket | | Hub | |
| | <------------> port +--> fd +--->----------> | | |
| port=6379 | | | | |
| | socket | | readers +-----> Transport.on_readable |
| | <------------> port +--> fd +--->+ | | + |
+-----------+ +---------------+ | |
| |
3 | |
+----------------------------------------------------------------------------------------+ |
| v
| _receive_callback
| 5 +-------------+ +-----------+
+------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer |
| Transport | | MultiChannelPoller | +------> channel . handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+
| | | | | 8 |
| | on_readable(fileno) | | | ^ |
| cycle +---------------------> | _fd_to_chan +----------------> channel . handlers 'BRPOP' = Channel._brpop_read | |
| | 4 | | | 'LISTEN' = Channel._receive | |
| _callbacks[queue]| | | | | on_m | 9
| + | +-------------------------+ +------> channel . handlers 'BRPOP' = Channel._brpop_read | |
+-------------------+ 'LISTEN' = Channel._receive | |
| | v
| 7 _callback |
+-----------------------------------------------------------------------------------------------------------------------------------------+ User Function
手機如圖:
4.4 Transport
Transport:真實的 MQ 連接,也是真正連接到 MQ(redis/rabbitmq) 的實例。就是存儲和發送消息的實體,用來區分底層消息隊列是用amqp、Redis還是其它實現的。
我們順着上文理一下:
- Connection 是 AMQP 對 連接的封裝;
- Channel 是 AMQP 對 MQ 的操作的封裝;
- 那么兩者的關系就是對 MQ 的操作必然離不開連接,但是 Kombu 並不直接讓 Channel 使用 Connection 來發送/接受請求,而是引入了一個新的抽象 Transport,Transport 負責具體的 MQ 的操作,也就是說 Channel 的操作都會落到 Transport 上執行;
在Kombu 體系中,用 transport 對所有的 broker 進行了抽象,為不同的 broker 提供了一致的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。
Transport負責具體操作,但是 很多操作移交給 loop 與 MultiChannelPoller 進行。
其主要成員變量為:
- 本transport的驅動類型,名字;
- 對應的 Channel;
- cycle:MultiChannelPoller,具體下文會提到;
其中重點是MultiChannelPoller。一個Connection有一個Transport, 一個Transport有一個MultiChannelPoller,對poll操作都是由MultiChannelPoller完成,redis操作由channel完成。
定義如下:
class Transport(virtual.Transport):
"""Redis Transport."""
Channel = Channel
polling_interval = None # disable sleep between unsuccessful polls.
default_port = DEFAULT_PORT
driver_type = 'redis'
driver_name = 'redis'
implements = virtual.Transport.implements.extend(
asynchronous=True,
exchange_type=frozenset(['direct', 'topic', 'fanout'])
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# All channels share the same poller.
self.cycle = MultiChannelPoller()
4.5 MultiChannelPoller
MultiChannelPoller 定義如下,可以理解為 執行 engine,主要作用是:
- 收集 channel;
- 建立 socks fd 到 channel 的映射;
- 建立 channel 到 socks fd 的映射;
- 使用 poll;
或者從邏輯上這么理解,MultiChannelPoller 就是:
- 把 Channel 對應的 socket 同 poll 聯系起來,一個 socket 在 linux 系統中就是一個file,就可以進行 poll 操作;
- 把 poll 對應的 fd 添加到 MultiChannelPoller 這里,這樣 MultiChannelPoller 就可以 打通
Channel ---> socket ---> poll ---> fd ---> 讀取 redis
這條通路了,就是如果 redis 有數據來了,MultiChannelPoller 就馬上通過 poll 得到通知,就去 redis 讀取;
具體定義如下:
class MultiChannelPoller:
"""Async I/O poller for Redis transport."""
eventflags = READ | ERR
def __init__(self):
# active channels
self._channels = set()
# file descriptor -> channel map.
self._fd_to_chan = {}
# channel -> socket map
self._chan_to_sock = {}
# poll implementation (epoll/kqueue/select)
self.poller = poll()
# one-shot callbacks called after reading from socket.
self.after_read = set()
4.6 Consumer
Consumer 是消息接收者。Consumer & 相關組件 的作用主要如下:
- Exchange:MQ 路由,消息發送者將消息發至 Exchange,Exchange 負責將消息分發至隊列。
- Queue:對應的隊列抽象,存儲着即將被應用消費掉的消息,Exchange 負責將消息分發 Queue,消費者從Queue 接收消息;
- Consumers 是接受消息的抽象類,consumer 需要聲明一個 queue,並將 queue 與指定的 exchange 綁定,然后從 queue 里面接收消息。就是說,從用戶角度,知道了一個 exchange 就可以從中讀取消息,而具體這個消息就是從 queue 中讀取的。
在具體 Consumer 的實現中,它把 queue 與 channel 聯系起來。queue 里面有一個 channel,用來訪問redis,也有 Exchange,知道訪問具體 redis 哪個key(就是queue對應的那個key)。
Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉嫁給 Channel。
所以服務端的邏輯大致為:
- 建立連接;
- 創建Exchange ;
- 創建Queue,並將Exchange與Queue綁定,Queue的名稱為routing_key ;
- 創建Consumer對Queue監聽;
Consumer 定義如下:
class Consumer:
"""Message consumer.
Arguments:
channel (kombu.Connection, ChannelT): see :attr:`channel`.
queues (Sequence[kombu.Queue]): see :attr:`queues`.
no_ack (bool): see :attr:`no_ack`.
auto_declare (bool): see :attr:`auto_declare`
callbacks (Sequence[Callable]): see :attr:`callbacks`.
on_message (Callable): See :attr:`on_message`
on_decode_error (Callable): see :attr:`on_decode_error`.
prefetch_count (int): see :attr:`prefetch_count`.
"""
ContentDisallowed = ContentDisallowed
#: The connection/channel to use for this consumer.
channel = None
#: A single :class:`~kombu.Queue`, or a list of queues to
#: consume from.
queues = None
#: Flag for automatic message acknowledgment.
no_ack = None
#: By default all entities will be declared at instantiation, if you
#: want to handle this manually you can set this to :const:`False`.
auto_declare = True
#: List of callbacks called in order when a message is received.
callbacks = None
#: Optional function called whenever a message is received.
on_message = None
#: Callback called when a message can't be decoded.
on_decode_error = None
#: List of accepted content-types.
accept = None
#: Initial prefetch count
prefetch_count = None
#: Mapping of queues we consume from.
_queues = None
_tags = count(1) # global
此時總體邏輯如下圖:
+----------------------+ +-------------------+
| Consumer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +--------------------> | | +-----------------------------------------------------------+
| | | pool |
| | +---------> | | <------------------------------------------------------------+
| queues | | | | |
| | | +----> | connection +---------------+ |
| | | | | | | | |
+----------------------+ | | +-------------------+ | |
| | | v |
| | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| | | | | | | +-------+
| | | | | | _poller+---------> | poll |
v | | +------------------+ | | +-------+
| | +-------------+
+-------------------+ | +----------------+
| Queue | | | | Exchange |
| _chann+l | +----+ | |
| | | |
| exchange +----------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手機如下:
現在我們知道:
4.7 Producer
Producer 是消息發送者。Producer中,主要變量是:
- _channel :就是channel;
- exchange :exchange;
class Producer:
"""Message Producer.
Arguments:
channel (kombu.Connection, ChannelT): Connection or channel.
exchange (kombu.entity.Exchange, str): Optional default exchange.
routing_key (str): Optional default routing key.
"""
#: Default exchange
exchange = None
#: Default routing key.
routing_key = ''
#: Default serializer to use. Default is JSON.
serializer = None
#: Default compression method. Disabled by default.
compression = None
#: By default, if a defualt exchange is set,
#: that exchange will be declare when publishing a message.
auto_declare = True
#: Basic return callback.
on_return = None
#: Set if channel argument was a Connection instance (using
#: default_channel).
__connection__ = None
邏輯如圖:
+----------------------+ +-------------------+
| Producer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +------------------> | | +-----------------------------------------------------------+
| | | pool |
| exchange | +---------> | | <------------------------------------------------------------+
| | | | | |
| connection | | +----> | connection +---------------+ |
| + | | | | | | |
+--+-------------------+ | | +-------------------+ | |
| | | | v |
| | | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | | Connection | | redis.Transport | | MultiChannelPoller | |
| +----------------------> | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| publish | | | | | | +-------+
+--------------------------------+ | | | _poller+---------> | poll |
| | | +------------------+ | | +-------+
| | | +-------------+
+-------------------+ | +-----> +----------------+
| Queue | | | | Exchange |
| _channel | +---------+ | |
| | | |
| exchange +--------------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手機如圖:
4.8 Hub
用戶可以通過同步方式自行讀取消息,如果不想自行讀取,也可以通過Hub(本身構建了一個異步消息引擎)讀取。
4.8.1 自己的poller
Hub 是一個eventloop,擁有自己的 poller。
前面在 MultiChannelPoller 中間提到了,MultiChannelPoller 會建立了自己內部的 poller。但是實際上在注冊時候,Transport 會使用 hub 的 poller,而非 MultiChannelPoller 內部的 poller。
4.8.2 Connection
Connection注冊到Hub,一個Connection對應一個Hub。
hub = Hub()
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
4.8.3 聯系
在注冊過程中,Hub 把自己內部的 poller 配置在 Transport 之中。這樣就通過 transport 內部的 MultiChannelPoller 可以把 Hub . poller 和 Channel 對應的 socket 同poll聯系起來,一個 socket 在 linux 系統中就是一個file,就可以進行 poll 操作;
因而,如前面所述,這樣 MultiChannelPoller 就可以 打通 Channel ---> socket ---> poll ---> fd ---> 讀取 redis
這條通路了,就是如果 redis 有數據來了,MultiChannelPoller 就馬上通過 poll 得到通知,就去 redis 讀取。
def register_with_event_loop(self, loop):
self.transport.register_with_event_loop(self.connection, loop)
4.8.4 定義
Hub定義如下:
class Hub:
"""Event loop object.
"""
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer()
self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = set()
self._create_poller()
@property
def poller(self):
if not self._poller:
self._create_poller()
return self._poller
def _create_poller(self):
self._poller = poll()
self._register_fd = self._poller.register
self._unregister_fd = self._poller.unregister
def add(self, fd, callback, flags, args=(), consolidate=False):
fd = fileno(fd)
try:
self.poller.register(fd, flags)
except ValueError:
self._remove_from_loop(fd)
raise
else:
dest = self.readers if flags & READ else self.writers
if consolidate:
self.consolidate.add(fd)
dest[fd] = None
else:
dest[fd] = callback, args
def run_forever(self):
self._running = True
try:
while 1:
try:
self.run_once()
except Stop:
break
finally:
self._running = False
def run_once(self):
try:
next(self.loop)
except StopIteration:
self._loop = None
def create_loop(self, ...):
readers, writers = self.readers, self.writers
poll = self.poller.poll
while 1:
for fd, event in events or ():
cb, cbargs = readers[fd]
if isinstance(cb, generator):
next(cb)
cb(*cbargs)
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
0x05 總結
我們通過文字和圖例來總結下本文。
5.1 邏輯
-
Message:消息,發送和消費的主體,其實就是我們所謂的一條條消息;
-
Connection是AMQP對消息隊列連接的封裝抽象,那么兩者的關系就是:對MQ的操作必然離不開連接。
-
Channel是AMQP對MQ的操作的封裝,可以理解成共享一個Connection的多個輕量化連接。
Channel
將Consumer
標簽,Consumer
要消費的隊列,以及標簽與隊列的映射關系都記錄下來,等待循環調用。- 還通過
Transport
將隊列與回調函數列表的映射關系記錄下來。 - Kombu對所有需要監聽的隊列
_active_queues
都查詢一遍,直到查詢完畢或者遇到一個可以使用的Queue,然后就獲取消息,回調此隊列對應的callback。 - Channel初始化的過程就是連接的過程。
-
Kombu並不直接讓Channel使用Connection來發送/接受請求,而是引入了一個新的抽象Transport,Transport負責具體的MQ的操作,也就是說Channel的操作都會落到Transport上執行。是以Transport為中心,把Channel代表的真實redis與Hub其中的poll聯系起來。
-
Queue:消息隊列,消息內容的載體,存儲着即將被應用消費掉的消息。Exchange 負責將消息分發 Queue,消費者從 Queue 接收消息;
-
Exchange:交換機,消息發送者將消息發至 Exchange,Exchange 負責將消息分發至 Queue;
- 消息發送是交給 Exchange 來做的,但Exchange只是將發送的
routing_key
轉化為queue
的名字,這樣發送就知道應該發給哪個queue;實際發送還是得 channel 來干活, - 即從 exchange 得到 routing_key ---> queue 的規則,然后再依據 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依據哪個 queue 交換消息。
- 每個不同的 Transport 都有對應的 Channel;生產者將消息發送到Exchange,Exchange通過匹配BindingKey和消息中的RouteKey來將消息路由到隊列,最后隊列將消息投遞給消費者。
- 消息發送是交給 Exchange 來做的,但Exchange只是將發送的
-
Producers: 發送消息的抽象類,Producer 包含了很多東西,有 Exchange、routing_key 和 channel 等等;
-
Consumers:接受消息的抽象類,consumer需要聲明一個queue,並將queue與指定的exchange綁定,然后從queue里面接收消息;
- Consumer綁定了消息的處理函數,每一個Consumer初始化的時候都是和Channel綁定的,也就是說我們Consumer包含了Queue也就和Connection關聯起來了。
- Consumer消費消息是通過Queue來消費,然后Queue又轉嫁給Channel,再轉給connection。
-
用戶可以通過同步方式自行讀取消息,如果不想自行讀取,也可以通過Hub(本身構建了一個異步消息引擎)讀取。
-
Hub是一個eventloop,Connection注冊到Hub,一個Connection對應一個Hub。Hub 把自己內部的 poller 配置在 Transport 之中。這樣就通過 transport 內部的 MultiChannelPoller 可以把 Hub . poller 和 Channel 對應的 socket 同poll聯系起來,一個 socket 在 linux 系統中就是一個file,就可以進行 poll 操作;
-
MultiChannelPoller
是Connection 和 Hub的樞紐,它負責找出哪個 Channel 是可用的,但是這些 Channel 都是來自同一個 Connection。
5.2 示例圖
具體如圖,可以看到,
- 目前是以Transport為中心,把Channel代表的真實redis與Hub其中的poll聯系起來,但是具體如何使用則尚未得知。
- 用戶是通過Connection來作為API入口,connection可以得到Transport。
+-------------------+
| Channel |
| | +-----------------------------------------------------------+
| client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| | +-----------------------------------------------------------+
| |
| | +---------------------------------------------------+-+
| pool +--------------> |ConnectionPool<Connection<host=localhost,port=6379 > |
| | +---------------------------------------------------+-+
| |
| | <------------------------------------------------------------+
| | |
| connection +---------------+ |
| | | |
+-------------------+ | |
v |
+-------------------+ +---+-----------------+ +--------------------+ |
| Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | |
| | | | | _channels +--------+
| | | cycle +------------> | _fd_to_chan |
| transport +---------> | | | _chan_to_sock |
| | | | +<----+ poller |
+-------------------+ +---------------------+ | | after_read |
| | |
+------------------+ +--------------+ +--------------------+
| Hub | |
| | v
| | +-------+-----+
| poller +---------------> | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
+----------------+ +-------------------+
| Exchange | | Queue |
| | | |
| | | |
| channel | <------------+ exchange |
| | | |
| | | |
+----------------+ +-------------------+
我們下文用實例來介紹Kombu的啟動過程。
因為本文是一個綜述,所以大家會發現,一些概念講解文字會同時出現在后續文章和綜述之中。