Kombu源碼分析(一)概述


Celery是Python中最流行的異步消息隊列框架,支持RabbitMQ、Redis、ZoopKeeper等作為Broker,而對這些消息隊列的抽象,都是通過Kombu實現的。Kombu實現了對AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。

AMQP中的各種概念,Message、Producer、Exchange、Queue、Consumer、Connection、Channel在Kombu中都相應做了實現,另外Kombu還實現了Transport,就是存儲和發送消息的實體,用來區分底層消息隊列是用amqp、Redis還是其它實現的。

  • Message:消息,發送和消費的主體
  • Producer: 消息發送者
  • Consumer:消息接收者
  • Exchange:交換機,消息發送者將消息發至Exchange,Exchange負責將消息分發至隊列
  • Queue:消息隊列,存儲着即將被應用消費掉的消息,Exchange負責將消息分發Queue,消費者從Queue接收消息
  • Connection:對消息隊列連接的抽象
  • Channel:與AMQP中概念類似,可以理解成共享一個Connection的多個輕量化連接
  • Transport:真實的MQ連接,區分底層消息隊列的實現

對於不同的Transport的支持:

代碼示例

先從官網示例代碼開始:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

基本上,各種角色都出場了。各種角色的使用都要從建立Connection開始。

Connection

獲取連接很簡單:

>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')

現在的連接其實並未真正建立,只有在需要使用的時候才真正建立連接並將連接緩存:

@property
def connection(self):
    """The underlying connection object.
    Warning:
        This instance is transport specific, so do not
        depend on the interface of this object.
    """
    if not self._closed:
        if not self.connected:
            self.declared_entities.clear()
            self._default_channel = None
            self._connection = self._establish_connection()
            self._closed = False
        return self._connection

也可以主動連接:

>>> connection.connect()
def connect(self):
    """Establish connection to server immediately."""
    self._closed = False
    return self.connection

當然,連接底層是由各自使用的不同的Transport建立的:

conn = self.transport.establish_connection() 

連接需要顯式的關閉:

>>> connection.release()

由於Connection實現了上下文生成器:

def __enter__(self):
    return self

def __exit__(self, *args):
    self.release()

所以可以使用with語句,以免忘記關閉連接:

with Connection() as connection:
    # work with connection

可以使用Connection直接建立ProcuderConsumer,其實就是調用了各自的創建類:

def Producer(self, channel=None, *args, **kwargs):
    """Create new :class:`kombu.Producer` instance."""
    from .messaging import Producer
    return Producer(channel or self, *args, **kwargs)

def Consumer(self, queues=None, channel=None, *args, **kwargs):
    """Create new :class:`kombu.Consumer` instance."""
    from .messaging import Consumer
    return Consumer(channel or self, queues, *args, **kwargs)

Producer

連接創建后,可以使用連接創建Producer

producer = conn.Producer(serializer='json')

也可以直接使用Channel創建:

with connection.channel() as channel:
    producer = Producer(channel, ...)

Producer實例初始化的時候會檢查第一個channel參數:

self.revive(self.channel)
channel = self.channel = maybe_channel(channel)

這里會檢查channel是不是Connection實例,是的話會將其替換為Connection實例的default_channel屬性:

def maybe_channel(channel):
    """Get channel from object.
    Return the default channel if argument is a connection instance,
    otherwise just return the channel given.
    """
    if is_connection(channel):
        return channel.default_channel
    return channel

所以Producer還是與Channel聯系在一起的。

Producer發送消息:

producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                  exchange=media_exchange, routing_key='video',
                  declare=[video_queue])

pulish做的事情,主要是由Channel完成的:

def _publish(self, body, priority, content_type, content_encoding,
┆   ┆   ┆   ┆headers, properties, routing_key, mandatory,
┆   ┆   ┆   ┆immediate, exchange, declare):
┆   channel = self.channel
┆   message = channel.prepare_message(
┆   ┆   body, priority, content_type,
┆   ┆   content_encoding, headers, properties,
┆   )   
┆   if declare:
┆   ┆   maybe_declare = self.maybe_declare
┆   ┆   [maybe_declare(entity) for entity in declare]

┆   # handle autogenerated queue names for reply_to
┆   reply_to = properties.get('reply_to')
┆   if isinstance(reply_to, Queue):
┆   ┆   properties['reply_to'] = reply_to.name
┆   return channel.basic_publish(
┆   ┆   message,
┆   ┆   exchange=exchange, routing_key=routing_key,
┆   ┆   mandatory=mandatory, immediate=immediate,
┆   )

Channel組裝消息prepare_message,並且發送消息basic_publish

Channel又是Transport創建的:

chan = self.transport.create_channel(self.connection)

Transport

當創建Connection時,需要傳入hostname,類似於:

amqp://guest:guest@localhost:5672//

然后獲取hostnamescheme,比如redis:

transport = transport or urlparse(hostname).scheme

以此來區分創建的Transport的類型。

創建過程:

self.transport_cls = transport

transport_cls = get_transport_cls(transport_cls)

def get_transport_cls(transport=None):
    """Get transport class by name.

    The transport string is the full path to a transport class, e.g.::

    ┆   "kombu.transport.pyamqp:Transport"

    If the name does not include `"."` (is not fully qualified),
    the alias table will be consulted.
    """
    if transport not in _transport_cache:
    ┆   _transport_cache[transport] = resolve_transport(transport)
    return _transport_cache[transport]

transport = TRANSPORT_ALIASES[transport]

TRANSPORT_ALIASES = {
    ...

    'redis': 'kombu.transport.redis:Transport',
    
    ...
}

Redis為例,Transport類在/kombu/transport/redis.py文件,繼承自/kombu/transport/virtual/base.py中的Transport類。

創建Channel:

channel = self.Channel(connection)

然后Channel組裝消息prepare_message,並且發送消息basic_publish

Channel

Channel實例有幾個屬性關聯着Consumer、Queue等,virtual.Channel

class Channel(AbstractChannel, base.StdChannel):
    def __init__(self, connection, **kwargs):
        self.connection = connection
        self._consumers = set()
        self._cycle = None 
        self._tag_to_queue = {} 
        self._active_queues = [] 
        ... 

其中,_consumers是相關聯的消費者標簽集合,_active_queues是相關聯的Queue列表,_tag_to_queue則是消費者標簽與Queue的映射:

self._tag_to_queue[consumer_tag] = queue
self._consumers.add(consumer_tag)
self._active_queues.append(queue)

Channel對於不同的底層消息隊列,也有不同的實現,以Redis為例:

class Channel(virtual.Channel):
    """Redis Channel."""

繼承自virtual.Channel

組裝消息函數prepare_message:

def prepare_message(self, body, priority=None, content_type=None,
┆   ┆   ┆   ┆   ┆   content_encoding=None, headers=None, properties=None):
┆   """Prepare message data."""
┆   properties = properties or {}
┆   properties.setdefault('delivery_info', {})
┆   properties.setdefault('priority', priority or self.default_priority)

┆   return {'body': body,
┆   ┆   ┆   'content-encoding': content_encoding,
┆   ┆   ┆   'content-type': content_type,
┆   ┆   ┆   'headers': headers or {},
┆   ┆   ┆   'properties': properties or {}}

基本上是為消息添加各種屬性。

發送消息basic_publish方法是調用_put方法:

def _put(self, queue, message, **kwargs):
┆   """Deliver message."""
┆   pri = self._get_message_priority(message, reverse=False)

┆   with self.conn_or_acquire() as client:
┆   ┆   client.lpush(self._q_for_pri(queue, pri), dumps(message))

client是一個redis.StrictRedis連接:

def _create_client(self, asynchronous=False):
┆   if asynchronous:
┆   ┆   return self.Client(connection_pool=self.async_pool)
┆   return self.Client(connection_pool=self.pool)

self.Client = self._get_client()

def _get_client(self):
┆   if redis.VERSION < (3, 2, 0):
┆   ┆   raise VersionMismatch(
┆   ┆   ┆   'Redis transport requires redis-py versions 3.2.0 or later. '
┆   ┆   ┆   'You have {0.__version__}'.format(redis))
┆   return redis.StrictRedis

Redis將消息置於某個列表(lpush)中。還會根據是否異步的選項選擇不同的connection_pool

Consumer

現在消息已經被放置與隊列中,那么消息又被如何使用呢?

Consumer初始化需要聲明Channel和要消費的隊列列表以及處理消息的回調函數列表:

with Consumer(connection, queues, callbacks=[process_media], accept=['json']):
    connection.drain_events(timeout=1)

Consumer實例被當做上下文管理器使用時,會調用consume方法:

def __enter__(self):
    self.consume()
    return self

consume方法代碼:

def consume(self, no_ack=None):
    """Start consuming messages.

    Can be called multiple times, but note that while it
    will consume from new queues added since the last call,
    it will not cancel consuming from removed queues (
    use :meth:`cancel_by_queue`).

    Arguments:
        no_ack (bool): See :attr:`no_ack`.
    """
    queues = list(values(self._queues))
    if queues:
        no_ack = self.no_ack if no_ack is None else no_ack

        H, T = queues[:-1], queues[-1]
        for queue in H:
            self._basic_consume(queue, no_ack=no_ack, nowait=True)
        self._basic_consume(T, no_ack=no_ack, nowait=False)

使用_basic_consume方法處理相關的隊列列表中的每一項,其中處理最后一個Queue時設置標志nowait=False

_basic_consume方法代碼:

def _basic_consume(self, queue, consumer_tag=None,
                   no_ack=no_ack, nowait=True):
    tag = self._active_tags.get(queue.name)
    if tag is None:
        tag = self._add_tag(queue, consumer_tag)
        queue.consume(tag, self._receive_callback,
                      no_ack=no_ack, nowait=nowait)
    return tag

是將消費者標簽以及回調函數傳給Queueconsume方法。

Queueconsume方法代碼:

def consume(self, consumer_tag='', callback=None, 
            no_ack=None, nowait=False):     
    """Start a queue consumer.      

    Consumers last as long as the channel they were created on, or
    until the client cancels them.

    Arguments:             
        consumer_tag (str): Unique identifier for the consumer.
            The consumer tag is local to a connection, so two clients
            can use the same consumer tags. If this field is empty
            the server will generate a unique tag.

        no_ack (bool): If enabled the broker will automatically
            ack messages.

        nowait (bool): Do not wait for a reply.

        callback (Callable): callback called for each delivered message.
    """
    if no_ack is None:
        no_ack = self.no_ack            
    return self.channel.basic_consume(
        queue=self.name,
        no_ack=no_ack,
        consumer_tag=consumer_tag or '',
        callback=callback,
        nowait=nowait,
        arguments=self.consumer_arguments)

又回到了ChannelChannelbasic_consume代碼:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
    """Consume from `queue`."""     
    self._tag_to_queue[consumer_tag] = queue
    self._active_queues.append(queue)

    def _callback(raw_message):         
        message = self.Message(raw_message, channel=self)
        if not no_ack:
            self.qos.append(message, message.delivery_tag)
        return callback(message)

    self.connection._callbacks[queue] = _callback
    self._consumers.add(consumer_tag)

    self._reset_cycle()   

ChannelConsumer標簽,Consumer要消費的隊列,以及標簽與隊列的映射關系都記錄下來,等待循環調用。另外,還通過Transport將隊列與回調函數列表的映射關系記錄下來,以便於從隊列中取出消息后執行回調函數。

真正的調用是下面這行代碼實現的:

connection.drain_events(timeout=1)

現在來到Transportdrain_events方法:

def drain_events(self, connection, timeout=None):
    time_start = monotonic()
    get = self.cycle.get
    polling_interval = self.polling_interval
    if timeout and polling_interval and polling_interval > timeout:
        polling_interval = timeout
    while 1:
        try: 
            get(self._deliver, timeout=timeout)
        except Empty:
            if timeout is not None and monotonic() - time_start >= timeout:
                raise socket.timeout()
            if polling_interval is not None:
                sleep(polling_interval)
        else:
            break

看上去是在無限執行get(self._deliver, timeout=timeout)

getself.cycle的一個方法,cycle是一個FairCycle實例:

self.cycle = self.Cycle(self._drain_channel, self.channels, Empty)

@python_2_unicode_compatible
class FairCycle(object):
    """Cycle between resources.

    Consume from a set of resources, where each resource gets
    an equal chance to be consumed from.

    Arguments: 
        fun (Callable): Callback to call.
        resources (Sequence[Any]): List of resources.
        predicate (type): Exception predicate.
    """

    def __init__(self, fun, resources, predicate=Exception):
        self.fun = fun
        self.resources = resources
        self.predicate = predicate
        self.pos = 0

    def _next(self):
        while 1:
            try:
                resource = self.resources[self.pos]
                self.pos += 1
                return resource
            except IndexError:
                self.pos = 0
                if not self.resources:
                    raise self.predicate()

    def get(self, callback, **kwargs):
        """Get from next resource."""
        for tried in count(0):  # for infinity
            resource = self._next()
            try:
                return self.fun(resource, callback, **kwargs)
            except self.predicate:
                # reraise when retries exchausted.
                if tried >= len(self.resources) - 1:
                    raise

FairCycle接受兩個參數,fun是要執行的函數fun,而resources作為一個迭代器,每次提供一個item供fun調用。

此處的fun_drain_channelresourceschannels:

def _drain_channel(self, channel, callback, timeout=None):
    return channel.drain_events(callback=callback, timeout=timeout)

Transport相關聯的每一個channel都要執行drain_events

Channeldrain_events代碼:

def drain_events(self, timeout=None, callback=None):
    callback = callback or self.connection._deliver
    if self._consumers and self.qos.can_consume():
        if hasattr(self, '_get_many'):
            return self._get_many(self._active_queues, timeout=timeout)
        return self._poll(self.cycle, callback, timeout=timeout)
    raise Empty()

_poll代碼:

def _poll(self, cycle, callback, timeout=None):
    """Poll a list of queues for available messages."""
    return cycle.get(callback)

又回到了FairCycleChannelFairCycle實例:

def _reset_cycle(self):
    self._cycle = FairCycle(
        self._get_and_deliver, self._active_queues, Empty)

_get_and_deliver方法從隊列中取出消息,然后調用Transport傳遞過來的_deliver方法:

def _get_and_deliver(self, queue, callback):
    message = self._get(queue)
    callback(message, queue)

_deliver代碼:

def _deliver(self, message, queue):
    if not queue:
        raise KeyError(
            'Received message without destination queue: {0}'.format(
                message))
    try:
        callback = self._callbacks[queue]
    except KeyError:
        logger.warning(W_NO_CONSUMERS, queue)
        self._reject_inbound_message(message)
    else:
        callback(message)

做的事情是根據隊列取出注冊到此隊列的回調函數列表,然后對消息執行列表中的所有回調函數。

回顧

可見,Kombu中ChannelTransport非常重要,Channel記錄了隊列列表、消費者列表以及兩者的映射關系,而Transport記錄了隊列與回調函數的映射關系。Kombu對所有需要監聽的隊列_active_queues都查詢一遍,直到查詢完畢或者遇到一個可以使用的Queue,然后就獲取消息,回調此隊列對應的callback。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM