OpenStack RPC框架解析


1  消息隊列Rabbitmq介紹

Rabbitmq的整體架構圖

1)Rabbitmq Server:中間那部分就是Rabbitmq Server,也叫broken server,主要是負責消息的傳遞,保證client A、B發送的消息Cleint 1、2、3能夠正確的接收到。

2)Client A、B:在消息隊列里我們稱之為生產者-Producer,發送消息的客戶端。

3)Client 1、2、3:在消息隊列里我們稱之為消費者-Consume,接收消息的客戶端。

4)Exchange:我們可以稱之為消息隊列的路由,根據發送的消息的routing key來轉發到對應的隊列上。有四種類型的Exchange,對應四種不同的轉發策略:

direct Exchange:完全匹配,比如routing key是abc,就對應binding key為abc對應的queue。

topic Exchange:正則匹配,比如routing key是ab*,可以用來匹配binding key為abc或abd等的queue。

fanout Exchange:廣播策略,忽略掉routing key,轉發給所有綁定在這個Exchange的queue。

headers Exchange:不依賴於routing key,會根據發送的消息的內容的headers屬性來進行匹配。

5)Queue:隊列,消息存放的地方。

6)Connection (連接)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連接。一些應用需要多個connection,為了節省TCP 連接,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連接的連接。連接需要用戶認證,並且支持 TLS (SSL)。連接需要顯式關閉。

7)vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離

8)Message(消息):在通道上傳輸的二進制對象,結構為Headers(頭)、Properties (屬性)和 Data (數據)。

以下是消息的幾個重要屬性:

routing key:Exchange根據該key來轉發消息到對應的隊列中

delivery_mode:消息模式,有持久模式和非持久模式,持久模式則是將消息保存到磁盤中,非持久模式則是消息保存在內存中

reply_to:RPC調用時處理結果返回需傳送到的隊列名,稱為回調隊列

correlation_id:RPC調用返回時需要用到的參數,一個請求id

content_type這個編碼類型是給生產者和消費者使用的,rabbitmq只是按原樣傳輸的

 

對應到OpenStack的平台則是:

Client端的生產者可以是nova-api,nova-conductor等,以虛擬機開機為例,則nova-api是生產者,nova-api收到一個http請求,產生一個開機消息,exchange是’nova’,發送的隊列名compute.hostname,routing key為隊列名,然后發送到Rabbitmq Server上去,消息隊列服務保存到對應的隊列上,然后將消息派發給消費者。因為消費者跟rabbitmq服務是建立了一條channel連接的,所以派發消息就相當於是通過這條channel傳送數據。

消費者則對應是nova-compute,nova-compute接收到消息后進行解析,然后調用對應的函數進行處理,然后將處理結果返回。

 

2  Rabbitmq集群模式

Rabbitmq集群工作原理圖:

Rabbitmq是用Erlang語言寫的,該語言天生有分布式特性,本身支持原生的HA模式。

普通的消息隊列集群會始終同步四種類型的內部元數據:

(1)隊列元數據:隊列名稱和它的屬性

(2)交換器元數據:交換器名稱、類型和屬性

(3)綁定(binding)元數據:一張簡單的表格展示如何將消息路由到對應的隊列

(4)Vhost元數據:為vhost內的隊列、交換器和綁定提供命名空間和安全屬性

 

但普通模式並沒有對消息隊列的消息進行同步,需要設置成鏡像模式,才會對消息進行同步。

 

2.1  同步原理

通過鏡像模式,Rabbitmq會將鏡像隊列放置於多個消息隊列服務節點上,消息的生產和消費都會在節點間進行同步,鏡像隊列包含一個master和多個slave,當master退出時,時間最長的則升為master。

 

每個消息隊列進程會創建一個gm(guaranteed multicast)進程,鏡像隊列中的所有gm進程會組成一個gm進程組用於廣播和接收消息。Gm組將集群中的節點組成一個環,主節點收到或處理完一個消息都會發起消息同步,消息沿着環形鏈走,當主節點接收到自己發的消息后則表示消息已經同步到所有的節點。

 

消息的發布和消費都是通過 master 隊列完成,master 隊列對消息進行處理同時將消息的處理動作通過 gm 廣播給所有 slave 隊列。

 

2.2  消息走向路徑

以開機一個虛擬機為例,環境狀況:244的nova-api245上的虛擬機進行關機操作,245compute.hostname的主隊列在242

nova-api會將消息發到給主隊列242上的消息隊列服務器保存,242進行gm廣播,242通過channel將消息傳送到245上,245接收到消息進行處理

 

2.3  消息確認機制

程序中是在拿到消息后和開始處理前期間進行了message.acknowledge()的調用,調用后即是告訴消息隊列服務,該消息已經被處理完了,可以進行刪除了。

從實踐來看確實是acknowledge調用了后才刪除的,但程序是實際調用后才開始執行消息處理函數,期間如果有異常報錯沒有處理成功則也不會重新處理了。

Openstack平台沒有對no_ack進行設置,查看kombu的代碼默認no_ack是false的,也就是需要進行確認才會刪除消息。

也可以發送nack的方式表示消息處理有問題,這時如果隊列的requeue設置為true,則會重新進入隊列交由其它消費者進行處理,默認是為false。

 

dead lettering機制:當調用了reject或nack且requeue是false時或者消息過期時,該機制會將失敗的消息放入到dead-lettered隊列中。

 

3  OpenStack RPC框架

3.1  接收消息(以nova-compute服務為例)

雲平台消息隊列RPC處理框架圖:

1.這里以nova-compute服務啟動為例進行講解,nova-compute服務啟動,會通過配置文件解析獲取一個Transport類對象,Transport對象里引用了RabbitDriver類對象

(1)Transport類作用:通過配置文件獲取對應的_driver,使用_driver來發送消息

(2)RabbitDriver類作用:用於發送消息和創建監聽類

 

2.nova-compute通過調用get_server函數獲取RPCServer類對象,類對象聚合了Transport類對象和RPCDispatcher類對象

(1)RPCServer類作用:初始化rpc監聽服務,創建隊列

(2)RPCDispatcher類作用:收到消息后進行解析找到相對應的函數進行調用

 

3.調用RPCServer類對象的start方法,里面調用 _create_listener方法創建監聽者AMQPListener類對象,用於作為綁定為消費者的回調對象,該類對象引用了一個Connection類對象conn。使用conn定義隊列,最后返回一個PollStyleListenerAdapter類對象

(1)AMQPListener類作用:作為消費者綁定的回調對象,同時poll方法用於獲取消息

(2)PollStyleListenerAdapter類作用:創建線程不斷獲取消息

(3)Connection類作用:獲取了kombuconnection對象,用於進行消費者、隊列定義和重連接等邏輯相關操作,使用Consumer類來管理消費者

(4)Consumer類作用:一個Consumer類對象代表一個消費者,里面保存了消費者信息和定義消費者的方法

(5)AMQPIncomingMessage類作用:消息進行解析后初始化為該類對象,代表一個消息的結構,里面有reply方法用於返回消息處理結果給發布消息者

 

4.PollStyleListenerAdapter類調用start方法開啟一個線程while循環專門調用AMQPListener類對象的poll方法進行消息獲取

 

5.poll函數會讀取incomings隊列里有沒有消息,如果有則表示拿到一個未處理消息發給Dispatcher類去處理這個消息,如果該隊列空了,則調用drain_events方法去獲取各channel上傳過來的消息並將它們存到inconings隊列中。

 

6.drain_events方法機理:從strace工具看到該nova-compute服務有大量的epoll_wait方法調用,可知采用了事件觸發的方式。

 

3.2  發送消息

由於是發送消息,所以只要看右邊的RPCClient端那部分就可以了:

1.跟接收消息一樣,會根據target生成一個Transport類對象,該對象根據配置文件會獲取一個driver對象,我們的是RabbitDriver對象,繼承於AMQPDriverBase類;

2.獲取一個_CallContext類對象,引用了Transport類對象

(1)_CallContext類作用:用來發送消息,對消息進行序列化並調用Transport類對象的driver來進行消息發送

3.獲取Connection類對象進行消息發送

4.Connection類中通過kombu中的Producer類的發送方法進行消息發出

 

3.3  重連機制

每個消費者都是建立在一個channel上的,channel是建立在Tcp連接上的,如果連接的rabbitmq服務節點關閉了,則連接會斷開,因此需要重新在其它未關閉節點上建立連接,重新建立channel和消費者。

重連機制的代碼存在於impl_driver.py中的Connection類的consume函數中

這里有兩個地方是可以檢測到連接斷開了,需要重連的,一個是在讀取socket時發現,一個是在心跳檢測機制里發現。

 

讀socket時拋異常觸發的重連:

1.  由上面分析我們知道程序會不斷調用Connection的consume函數進行獲取消息,該函數會調用到kombuConnection類的autoretry函數,同時傳入了_consume函數作為參數

2.  autoretry又調用到ensure函數,該函數主要作用是調用傳進來的_consume函數,如果有異常拋出,則進入異常重連處理,調用on_error函數,再調用ensure_connection函數確保重新建立好一條新的連接,然后在連接上建立新的channel,最后將channel進行更新。

 

心跳檢測機制觸發的重連:

1.  在服務啟動后就有一個專門的線程定時發包檢測連接是否正常,超時60秒則觸發異常

2.  觸發異常后調用ensure_connection函數將當前channel置為None,從而觸發重建channel

 

消費者的重新建立:

1.在_consume函數中每次都會去判斷self._new_tags集合是不是不為空,如果不為空則會重新建立這些tag的消費者,執行建立函數后就會把它remove掉,關鍵代碼邏輯:

2.而_new_tags的獲取則是根據異常拋出,檢測異常類型來重新賦予之前消費者的tags,以此重新建立消費者,關鍵代碼邏輯:

 

4  代碼流程解析

4.1  nova-compute啟動流程

Openstack的服務啟動都是先從cmd目錄下的main函數開始執行的,比如nova-compute服務的啟動則是nova/cmd/compute.py文件中的main函數開始執行:

File:nova/cmd/compute.py

def main():
    # 調用nova/service.py文件的Service類的create類方法實例化一個service類
    server = service.Service.create(binary='nova-compute',
                                    topic=CONF.compute_topic)
    service.serve(server)
service.wait()

 

得到server后調用server函數進行服務運行:

File:nova/service.py

def serve(server, workers=None):
    global _launcher
    if _launcher:
        raise RuntimeError(_('serve() can only be called once'))

    # 這里的service是指oslo_service包導入的service了
    # 調用到oslo_service/service.py的launch方法
    _launcher = service.launch(CONF, server, workers=workers)

 

launch函數調用oslo_service包的service.py的launch方法初始化一個ServiceLauncher實例,並調用launch_service函數:

File:oslo_service/service.py

def launch(conf, service, workers=1, restart_method='reload'):

    if workers is not None and workers <= 0:
        raise ValueError(_("Number of workers should be positive!"))

    # 默認傳入的是None
    if workers is None or workers == 1:
        # 這里是初始化一個繼承了Launcher類的ServiceLauncher類實例
        launcher = ServiceLauncher(conf, restart_method=restart_method)
    else:
        launcher = ProcessLauncher(conf, restart_method=restart_method)
    # 調用Launcher類里的launch_service方法,launch_service方法運行給定的service
    launcher.launch_service(service, workers=workers)

    return launcher

 

Launch_service函數調用了父類的實現:

File:oslo_service/service.py  Launch:launch_service

def launch_service(self, service, workers=1):
    if workers is not None and workers != 1:
        raise ValueError(_("Launcher asked to start multiple workers"))
    _check_service_base(service)
    service.backdoor_port = self.backdoor_port
    # 調用Services類的add方法來運行給定service
    # 其實最后也就是開辟了個綠色線程池和獲取一個綠色線程運行service的start方法
    self.services.add(service)

 

我們可以直接看service的start方法:

File:nova/service.py  Service:start

def start(self):
    ......

    # 初始化oslo_messaging/target.py的Target類
    target = messaging.Target(topic=self.topic, server=self.host)

    endpoints = [
        self.manager,
        baserpc.BaseRPCAPI(self.manager.service_name,         
                self.backdoor_port)
    ]
    endpoints.extend(self.manager.additional_endpoints)

    # 獲取nova/objects/base.py中的NovaObjectSerializer類實例
    # 用來序列化nova服務中的對象
    serializer = objects_base.NovaObjectSerializer()

    # 獲取一個oslo_messaging/rpc/RPCServer類實例
    self.rpcserver = rpc.get_server(target, endpoints, serializer)
    # 調用到oslo_messaging/server.py的MessageHandlingServer類的start方法
    self.rpcserver.start()

    ......

 

看下get_server實現:

File:nova/rpc.py

def get_server(target, endpoints, serializer=None):

    # TRANSPORT一個transport類對象,里面包含發消息的driver實現對象,如果是rabbit則對應到實現rabbit的driver類
    # 獲取TRANSPORT對象:<class 'oslo_messaging.transport.Transport'>
    # 更重要的是transport對象里的driver對象:   oslo_messaging._drivers.impl_rabbit.RabbitDriver
    assert TRANSPORT is not None

    if profiler:
        serializer = ProfilerRequestContextSerializer(serializer)
    else:
        serializer = RequestContextSerializer(serializer)

    # get_rpc_server在oslo_messaging/rpc/server.py文件中
    # 獲取一個RPCServer實例
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer)

 

查看get_rpc_server實現:

File:oslo_messaging/rpc/server.py

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None, access_policy=None):
    # 獲取一個消息調度員,它能識別收到的消息的結構
    # 用於接收到消息后進行消息分發處理 A message dispatcher which understands RPC messages
    # oslo_messaging/rpc/dispatcher.py類的RPCDispatcher類
    # 解析消息然后調用相對應的方法進行處理
    dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,
                                              access_policy)
    # 該類有個關鍵函數_process_incoming是在接收到消息時進行回調的
    return RPCServer(transport, target, dispatcher, executor)

 

獲取到RPCServer實例后,調用start方法,因為RPCServer繼承於MessageHandlingServer但沒有實現,所以是調用父類的start方法:

File:oslo_messaging/server.py  MessageHandlingServer:start

def start(self, override_pool_size=None):
    ......

    try:
        # 這里程序調用到的是oslo_messaging/rpc/server.py的RPCServer類的_create_listener方法
        self.listener = self._create_listener()
    except driver_base.TransportDriverError as ex:
        raise ServerListenError(self.target, ex)
    ......

    self.listener.start(self._on_incoming)

 

File:oslo_messaging/rpc/server.py  RPCServer:_create_listener

def _create_listener(self):
    # oslo_messaging/transport/Transport類_listen方法
    return self.transport._listen(self._target, 1, None)

 

File:oslo_messaging/transport.py  Transport:_listen

def _listen(self, target, batch_size, batch_timeout):
    if not (target.topic and target.server):
        raise exceptions.InvalidTarget('A server\'s target must have '
                                       'topic and server names specified',
                                       target)
    # 這個_driver對應的是oslo_messaging._drivers.impl_rabbit.RabbitDriver
    return self._driver.listen(target, batch_size,
                               batch_timeout)

 

File:oslo_messaging/_drivers/amqpdriver.py  AMQPDriverBase:listen

def listen(self, target, batch_size, batch_timeout):
    # 這里是從連接池里獲取一個連接對象
    # conn是oslo_messaging/_drivers/impl_rabbit.py的類Connection實例
    conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

    # 這個listen很關鍵,它被綁定為消費者的回調對象,也就是收到消息時是調用該對象,該對象實現了__call__方法
    # 所以可直接調用
    listener = AMQPListener(self, conn)

    conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                topic=target.topic,
                                callback=listener)
    conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                topic='%s.%s' % (target.topic,
                                                 target.server),
                                callback=listener)
    conn.declare_fanout_consumer(target.topic, listener)

    # 返回一個實現poll模式監聽消息到來的類
    return base.PollStyleListenerAdapter(listener, batch_size,
                                         batch_timeout)

 

創建好監聽類后,調用start方法:

File:oslo_messaging/_drivers/base.py  PollStyleListenerAdapter:start

def start(self, on_incoming_callback):
    super(PollStyleListenerAdapter, self).start(on_incoming_callback)
    self._started = True
    # _listen_thread在__init__方法中定義了,如下行所示
    # self._listen_thread = threading.Thread(target=self._runner)
    # 所以是開啟一個線程運行_runner函數
    self._listen_thread.start()

 

再來看下_runner函數:

File:oslo_messaging/_drivers/base.py  PollStyleListenerAdapter:_runner

def _runner(self):
    while self._started:
        # 這里poll調用oslo_messaging/_drivers/amqpdriver.py的AMQPListener類poll函數
        incoming = self._poll_style_listener.poll(
            batch_size=self.batch_size, batch_timeout=self.batch_timeout)

        # 讀到有消息,調用回調函數進行處理
        if incoming:
            # 該on_incoming_callback是oslo_messaging/server.py中self.listener.start(self._on_incoming)中傳進來的
            # 是該文件中MessageHandlingServer類的_on_incoming函數
            # _on_incoming函數又調用到了RPCServer中的_process_incoming函數
            self.on_incoming_callback(incoming)

 

這里先來看下poll函數實現:

File:oslo_messaging/_drivers/amqpdriver.py  AMQPListener:poll

def poll(self, timeout=None):
    stopwatch = timeutils.StopWatch(duration=timeout).start()

    while not self._shutdown.is_set():
        self._message_operations_handler.process()

        if self.incoming:
            # 從incoming列表中獲取第一個消息返回
            return self.incoming.pop(0)

        left = stopwatch.leftover(return_none=True)
        if left is None:
            left = self._current_timeout
        if left <= 0:
            return None

        try:
            # 獲取所有隊列的消息
            # oslo_messaging/dr_drivers/impl_rabbit.py的Connection類的consume函數
            # 將獲取到的消息經過解析存到incoming列表中
            self.conn.consume(timeout=min(self._current_timeout, left))
        except rpc_common.Timeout:
            self._current_timeout = max(self._current_timeout * 2,
                                        ACK_REQUEUE_EVERY_SECONDS_MAX)
        else:
            self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN

    # NOTE(sileht): listener is stopped, just processes remaining messages
    # and operations
    self._message_operations_handler.process()
    if self.incoming:
        return self.incoming.pop(0)

    self._shutoff.set()

 

這里關鍵函數是調用consume進行消息獲取,調用到了Connection類的consume函數,以下是該函數的關鍵語句:

# 調用kombu/connection.pyConnection類的drain_events方法,等待來自服務器的單個事件,所以這是事件觸發型的

# 其中里面的supports_librabbitmq()=False(因為環境支持’eventlet’,所以未采用’default’,所以返回False

# 最后是調用到kombu/transport/pyamqp.pydrain_events方法

# 再調用到amqp包的Connection類的drain_events(amqp/connection.py)

File:oslo_messaging/_drivers/impl_rabbit.py  Connection:consume

self.connection.drain_events(timeout=poll_timeout)

 

amqp包的drain_events實現

File:amqp/connection.py  Connection: drain_events

def drain_events(self, timeout=None):
    """Wait for an event on a channel."""
    # 等待事件通知
    chanmap = self.channels
    chanid, method_sig, args, content = self._wait_multiple(
        chanmap, None, timeout=timeout,
    )

    channel = chanmap[chanid]

    if (content and
            channel.auto_decode and
            hasattr(content, 'content_encoding')):
        try:
            content.body = content.body.decode(content.content_encoding)
        except Exception:
            pass

    amqp_method = (self._method_override.get(method_sig) or
                   channel._METHOD_MAP.get(method_sig, None))

    if amqp_method is None:
        raise AMQPNotImplementedError(
            'Unknown AMQP method {0!r}'.format(method_sig))

    if content is None:
        return amqp_method(channel, args)
    else:
        return amqp_method(channel, args, content)

到amqp包我們就不深究下去了

 

4.1  收到消息時行為

接着看下有消息到來時執行的回調函數,從前面我們可知在創建消費者時我們綁定了個listen對象作為callback,如下:

File:oslo_messaging/_drivers/amqpdriver.py  AMQPDriverBase:listen

listener = AMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                            topic=target.topic,
                            callback=listener)

 

所以當收到消息時會調用AMQPListener類的__call__函數:

def __call__(self, message):
    # 收到消息后解析消息結構體並構建成AMQPIncomingMessage結構
    # type(message)對應類:<class 'oslo_messaging._drivers.impl_rabbit.RabbitMessage'>
    ctxt = rpc_amqp.unpack_context(message)
    unique_id = self.msg_id_cache.check_duplicate_message(message)
    if ctxt.msg_id:
        LOG.debug("received message msg_id: %(msg_id)s reply to "
                  "%(queue)s", {'queue': ctxt.reply_q,
                                'msg_id': ctxt.msg_id})
    else:
        LOG.debug("received message with unique_id: %s", unique_id)

    self.incoming.append(AMQPIncomingMessage(
        self,
        ctxt.to_dict(),
        message,
        unique_id,
        ctxt.msg_id,
        ctxt.reply_q,
        self._obsolete_reply_queues,
        self._message_operations_handler))

 

可以看到收到message后,解析message並構建為AMQPIncomingMessage實例appendincoming隊列中。

消息到隊列中之后,當我們取到一個消息后做的事情,也就是回到_runner函數中的邏輯:

File:oslo_messaging/_drivers/base.py  PollStyleListenerAdapter:_runner

def _runner(self):
    while self._started:
        # 這里poll調用oslo_messaging/_drivers/amqpdriver.py的AMQPListener類poll函數
        incoming = self._poll_style_listener.poll(
            batch_size=self.batch_size, batch_timeout=self.batch_timeout)

        # 讀到有消息,調用回調函數進行處理
        if incoming:
            # 該on_incoming_callback是oslo_messaging/server.py中self.listener.start(self._on_incoming)中傳進來的
            # 是該文件中MessageHandlingServer類的_on_incoming函數
            # _on_incoming函數又調用到了RPCServer中的_process_incoming函數
            self.on_incoming_callback(incoming)

 

可知是調用了_process_incoming函數來處理消息:

# 這里poll調用oslo_messaging/_drivers/amqpdriver.pyAMQPListenerpoll函數

# 處理消息

File:oslo_messaging/rpc/server.py  RPCServer:__process_incoming

def _process_incoming(self, incoming):
    message = incoming[0]
    try:
        # 這里進行了消息確認發送
        # 會調用到kombu/message.py的Message類的ack函數
        # 表示該消息已經進行消費了,隊列中可以刪除該消息了
        message.acknowledge()
    except Exception:
        LOG.exception(_LE("Can not acknowledge message. Skip processing"))
        return

    failure = None
    try:
        # 調用oslo_messaging/rpc/dispatcher.py的RPCDispatcher類來處理消息
        # 該類的職責是找到消息對應的方法並執行
        res = self.dispatcher.dispatch(message)
    except rpc_dispatcher.ExpectedException as e:
        failure = e.exc_info
        LOG.debug(u'Expected exception during message handling (%s)', e)
    except Exception:
        # current sys.exc_info() content can be overridden
        # by another exception raised by a log handler during
        # LOG.exception(). So keep a copy and delete it later.
        failure = sys.exc_info()
        LOG.exception(_LE('Exception during message handling'))

    try:
        # 將執行結果返回
        if failure is None:
            message.reply(res)
        else:
            message.reply(failure=failure)
    except Exception:
        LOG.exception(_LE("Can not send reply for message"))
    finally:
            # NOTE(dhellmann): Remove circular object reference
            # between the current stack frame and the traceback in
            # exc_info.
            del failure

 

核心函數是dispatch函數:

File:oslo_messaging/rpc/dispatcher.py  RPCDispatcherr:dispatch

def dispatch(self, incoming):
    """Dispatch an RPC message to the appropriate endpoint method.

    :param incoming: incoming message
    :type incoming: IncomingMessage
    :raises: NoSuchMethod, UnsupportedVersion
    """
    message = incoming.message
    ctxt = incoming.ctxt

    method = message.get('method')
    args = message.get('args', {})
    namespace = message.get('namespace')
    version = message.get('version', '1.0')

    found_compatible = False
    # endpoints值是兩個類
    # [<nova.compute.manager.ComputeManager object at 0x7f6eef157dd0>, <nova.baserpc.BaseRPCAPI object at 0x7f6ee4724c90>]
    # 從類中查找出方法進行調用
    for endpoint in self.endpoints:
        target = getattr(endpoint, 'target', None)
        if not target:
            target = self._default_target

        if not (self._is_namespace(target, namespace) and
                self._is_compatible(target, version)):
            continue

        if hasattr(endpoint, method):
            if self.access_policy.is_allowed(endpoint, method):
                return self._do_dispatch(endpoint, method, ctxt, args)

        found_compatible = True

    if found_compatible:
        raise NoSuchMethod(method)
    else:
        raise UnsupportedVersion(version, method=method)

 

File:oslo_messaging/rpc/dispatcher.py  RPCDispatcherr:dispatch

def _do_dispatch(self, endpoint, method, ctxt, args):
    ctxt = self.serializer.deserialize_context(ctxt)
    new_args = dict()
    for argname, arg in args.items():
        new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
    func = getattr(endpoint, method)
    # 調用方法
    result = func(ctxt, **new_args)
    return self.serializer.serialize_entity(ctxt, result)

 

4.3  發送消息流程

這里看一個發送開機指令到宿主機執行的流程。

首先由novaclient發送http請求到nova-api服務,對應調用到_start_server函數:

File:nova/api/openstack/compute/server.py  ServersController:_start_server

def _start_server(self, req, id, body):
    .....
    try:
        # nova/compute/api.py
        self.compute_api.start(context, instance)
    except (exception.InstanceNotReady, exception.InstanceIsLocked) as e:
        .....

 

查看start方法實現:

File:nova/compute/api.py  API:start

def start(self, context, instance):
    .....
    instance.task_state = task_states.POWERING_ON
    instance.save(expected_task_state=[None])
    self._record_action_start(context, instance, instance_actions.START)
    self.compute_rpcapi.start_instance(context, instance)
    .....

 

可以看到是先將task_state狀態改為了POWERING_ON后再發送消息,查看start_instance方法的實現:

File:nova/compute/rpcapi.py  ComputeAPI:start_instance

def start_instance(self, ctxt, instance):
    version = '4.0'
    # self.router.by_instance(ctxt, instance)獲取oslo_messaging/rpc/client.py的RPCClient類實例
    # prepare方法是用於設置一些屬性並生成一個oslo_messaging/rpc/client.py的_CallContext類實例
    cctxt = self.router.by_instance(ctxt, instance).prepare(
            server=_compute_host(None, instance), version=version)
    # 該call方法是調用到oslo_messaging/rpc/client.py的_BaseCallContext(_CallContext的父類)類的call方法
    return cctxt.call(ctxt, 'start_instance', instance=instance)

 

查看call方法實現:

File:oslo_messaging/rpc/client.py  _BaseCallContext:call

def call(self, ctxt, method, **kwargs):
    """Invoke a method and wait for a reply. See RPCClient.call()."""
    if self.target.fanout:
        raise exceptions.InvalidTarget('A call cannot be used with fanout',
                                       self.target)

    # 生成一個msg
    msg = self._make_message(ctxt, method, kwargs)
    # 序列化ctxt
    msg_ctxt = self.serializer.serialize_context(ctxt)

    timeout = self.timeout
    if self.timeout is None:
        timeout = self.conf.rpc_response_timeout

    self._check_version_cap(msg.get('version'))

    try:
        # 調用transport中的_send方法
        # _send方法中又是使用_driver對象調用send方法
        # _driver對象是oslo_messaging._drivers.impl_rabbit.RabbitDriver
        # 所以是調用RabbitDriver類里的send方法,實際是調用它父類AMQPDriverBase的send方法
        result = self.transport._send(self.target, msg_ctxt, msg,
                                      wait_for_reply=True, timeout=timeout,
                                      retry=self.retry)
    except driver_base.TransportDriverError as ex:
        raise ClientSendError(self.target, ex)

    return self.serializer.deserialize_entity(ctxt, result)

 

查看send方法實現:

File:oslo_messaging/_drivers/amqpdriver.py  AMQPDriverBase:_send

def _send(self, target, ctxt, message,
          wait_for_reply=None, timeout=None,
          envelope=True, notify=False, retry=None):

    msg = message

    if wait_for_reply:
        msg_id = uuid.uuid4().hex
        msg.update({'_msg_id': msg_id})
        # _get_reply_q方法獲取一個ReplyWaiter對象,開始poll模式等待獲取返回消息
        # 同時使用waiters(ReplyWaiter實例對象)管理返回的消息
        msg.update({'_reply_q': self._get_reply_q()})

    # 獲取一個唯一uuid添加到msg字典中
    rpc_amqp._add_unique_id(msg)
    unique_id = msg[rpc_amqp.UNIQUE_ID]

    # 把ctxt字典值也更新到msg中
    rpc_amqp.pack_context(msg, ctxt)

    if envelope:
        msg = rpc_common.serialize_msg(msg)

    if wait_for_reply:
        # 把該消息加到waiters里去監控管理
        self._waiter.listen(msg_id)
        log_msg = "CALL msg_id: %s " % msg_id
    else:
        log_msg = "CAST unique_id: %s " % unique_id

    try:
        # 根據target保存的發送模式發送消息
        with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
            if notify:
                exchange = self._get_exchange(target)
                log_msg += "NOTIFY exchange '%(exchange)s'" \
                           " topic '%(topic)s'" % {
                               'exchange': exchange,
                               'topic': target.topic}
                LOG.debug(log_msg)
                conn.notify_send(exchange, target.topic, msg, retry=retry)
            elif target.fanout:
                log_msg += "FANOUT topic '%(topic)s'" % {
                    'topic': target.topic}
                LOG.debug(log_msg)
                conn.fanout_send(target.topic, msg, retry=retry)
            else:
                topic = target.topic
                exchange = self._get_exchange(target)
                if target.server:
                    topic = '%s.%s' % (target.topic, target.server)
                log_msg += "exchange '%(exchange)s'" \
                           " topic '%(topic)s'" % {
                               'exchange': exchange,
                               'topic': topic}
                LOG.debug(log_msg)
                # 發送topic模式的隊列
                # 調用oslo_messaging/_drivers/impl_rabbit.py中Connection類實例的topic_send方法
                conn.topic_send(exchange_name=exchange, topic=topic,
                                msg=msg, timeout=timeout, retry=retry)

        if wait_for_reply:
            # 等待返回或消息超時返回
            # 輪詢方式檢測消息有沒有被返回並放到對應的字典中
            result = self._waiter.wait(msg_id, timeout)
            if isinstance(result, Exception):
                raise result
            return result
    finally:
        if wait_for_reply:
            self._waiter.unlisten(msg_id) 

當消息返回或消息超時時就返回結果給call調用,這個請求就完成了 

 

4.4  重連機制

當比如當前連接着的rabbitmq服務斷開了時,則連接會斷開,則需要重新建立連接,建立新的channel,然后重新建立消費者。

這個邏輯在Connection的consume函數中,是由監聽類的poll函數進行不斷調用的,我們看下該函數實現:

File:oslo_messaging/_drivers/impl_rabbit.py  Connection:consume

def consume(self, timeout=None):
    """Consume from all queues/consumers."""

    timer = rpc_common.DecayingTimer(duration=timeout)
    timer.start()

    def _raise_timeout(exc):
        LOG.debug('Timed out waiting for RPC response: %s', exc)
        raise rpc_common.Timeout()

    def _recoverable_error_callback(exc):
        # 判斷異常類型是不是非Timeout類型,因為Timeout類型是由drain_events函數獲取
        # 消息等待超時導致的,屬於正常的異常,除了這種異常,其它異常則都會被視作需要重建消費者
        if not isinstance(exc, rpc_common.Timeout):
            self._new_tags = set(self._consumers.values())
        timer.check_return(_raise_timeout, exc)

    def _error_callback(exc):
        # 將異常交給_recoverable_error_callback函數進行處理
        _recoverable_error_callback(exc)
        LOG.error(_LE('Failed to consume message from queue: %s'),
                  exc)

    def _consume():
        # NOTE(sileht): in case the acknowledgment or requeue of a
        # message fail, the kombu transport can be disconnected
        # In this case, we must redeclare our consumers, so raise
        # a recoverable error to trigger the reconnection code.
        # 這里是判斷了連接是否還正常,如果不正常,我們需要重新獲取連接並且重新定義consumer
        if not self.connection.connected:
            # 這里拋錯以進入重連機制
            raise self.connection.recoverable_connection_errors[0]

        while self._new_tags:
            for consumer, tag in self._consumers.items():
                if tag in self._new_tags:
                    # 如果是新標簽則消費者也需建立
                    # 在重建channel時這里就是重建消費者了
                    # 這里consumer是該文件的Consumer類實例,該consume函數會調用到kombu中的consume函數定義消費者
                    consumer.consume(self, tag=tag)
                    self._new_tags.remove(tag)

        poll_timeout = (self._poll_timeout if timeout is None
                        else min(timeout, self._poll_timeout))
        while True:
            if self._consume_loop_stopped:
                return

            if self._heartbeat_supported_and_enabled():
                # 心跳檢查,如果連不通則拋錯
                # 拋錯則會在kombu中進行重連機制
                self._heartbeat_check()

            try:
                # 調用kombu/connection.py的Connection類的drain_events方法,等待來自服務器的單個事件,所以這是事件觸發型的
                # 其中里面的supports_librabbitmq()=False(因為環境支持’eventlet’,所以未采用’default’,所以返回False
                # 最后是調用到kombu/transport/pyamqp.py的drain_events方法
                # 再調用到amqp包的Connection類的drain_events(amqp/connection.py)
                self.connection.drain_events(timeout=poll_timeout)
                return
            except socket.timeout as exc:
                # 超時會進入這個邏輯,check_return會raise一個Exception,從而導致ensure中拋異常被捕獲調用了error_callback函數
                # error_callback函數又調用了recoverable_error_callback函數
                # 從而導致日志中經常可以看到_recoverable_error_callback
                poll_timeout = timer.check_return(
                    _raise_timeout, exc, maximum=self._poll_timeout)

    with self._connection_lock:
        self.ensure(_consume,
                    recoverable_error_callback=_recoverable_error_callback,
                    error_callback=_error_callback)

 

這里很多內嵌函數都會通過傳參的方式傳入到其它方法中處理,然后由其它方法在檢測到異常時執行。我們可以看到最終是執行了ensure函數,我們看下ensure函數的實現:

File:oslo_messaging/_drivers/impl_rabbit.py  Connection:ensure

def ensure(self, method, retry=None,
           recoverable_error_callback=None, error_callback=None,
           timeout_is_error=True):
    .....

    # 在kombu中如果進入了異常重連處理機制會回調該函數
    def on_error(exc, interval):
        LOG.debug("[%s] Received recoverable error from kombu:"
                  % self.connection_id,
                  exc_info=True)

        # 執行_recoverable_error_callback函數處理異常
        recoverable_error_callback and recoverable_error_callback(exc)

        interval = (self.kombu_reconnect_delay + interval
                    if self.kombu_reconnect_delay > 0
                    else interval)

        info = {'err_str': exc, 'sleep_time': interval}
        info.update(self._get_connection_info())

        if 'Socket closed' in six.text_type(exc):
            LOG.error(_LE('[%(connection_id)s] AMQP server'
                          ' %(hostname)s:%(port)s closed'
                          ' the connection. Check login credentials:'
                          ' %(err_str)s'), info)
        else:
            LOG.error(_LE('[%(connection_id)s] AMQP server on '
                          '%(hostname)s:%(port)s is unreachable: '
                          '%(err_str)s. Trying again in '
                          '%(sleep_time)d seconds. Client port: '
                          '%(client_port)s'), info)

    ......

    # 當在kombu中執行autoretry時拋出異常了並在異常處理時重新連接了其它節點
    # 則會回調該函數
    def on_reconnection(new_channel):
        # 更新channel
        self._set_current_channel(new_channel)
        self.set_transport_socket_timeout()

    def execute_method(channel):
        # 更新channel
        self._set_current_channel(channel)
        # 這個method指的就是_consume函數
        # 注意我這里指的是consume調入的時候該method就是_consume函數
        # 因為該ensure函數是很多函數都會調用的,每個函數都會傳入它的method函數進行回調
        # 我這里是為了方便理解就這樣指明了,文中還有很多地方也是如此指明,就不一一解釋了
        method()

    # NOTE(sileht): Some dummy driver like the in-memory one doesn't
    # have notion of recoverable connection, so we must raise the original
    # exception like kombu does in this case.
    has_modern_errors = hasattr(
        self.connection.transport, 'recoverable_connection_errors',
    )
    if has_modern_errors:
        recoverable_errors = (
            self.connection.recoverable_channel_errors +
            self.connection.recoverable_connection_errors)
    else:
        recoverable_errors = ()

    try:
        autoretry_method = self.connection.autoretry(
            execute_method, channel=self.channel,
            max_retries=retry,
            errback=on_error,
            interval_start=self.interval_start or 1,
            interval_step=self.interval_stepping,
            interval_max=self.interval_max,
            on_revive=on_reconnection)
        ret, channel = autoretry_method()
        self._set_current_channel(channel)
        return ret
    except recoverable_errors as exc:
        LOG.debug("Received recoverable error from kombu:",
                  exc_info=True)
        # 在kombu重建立連接失敗時會跑入該邏輯,調用error_callback
        # 如果是consume函數調入該函數的話,則該函數是_error_callback函數
        error_callback and error_callback(exc)
        self._set_current_channel(None)
        # NOTE(sileht): number of retry exceeded and the connection
        # is still broken
        info = {'err_str': exc, 'retry': retry}
        info.update(self.connection.info())
        msg = _('Unable to connect to AMQP server on '
                '%(hostname)s:%(port)s after %(retry)s '
                'tries: %(err_str)s') % info
        LOG.error(msg)
        raise exceptions.MessageDeliveryFailure(msg)
    except rpc_amqp.AMQPDestinationNotFound:
        # NOTE(sileht): we must reraise this without
        # trigger error_callback
        raise
    except Exception as exc:
        error_callback and error_callback(exc)
        Raise

 

這個函數也是如此,定義了很多內嵌函數,然后作為參數傳遞到kombu中的autoretry函數中進行處理,方便有異常時就行異常處理且回調相對應的函數。查看autoretry函數實現:

File:kombu/connection.py  Connection:autoretry

def autoretry(self, fun, channel=None, **ensure_options):
    channels = [channel]
    create_channel = self.channel

    class Revival(object):
        __name__ = getattr(fun, '__name__', None)
        __module__ = getattr(fun, '__module__', None)
        __doc__ = getattr(fun, '__doc__', None)

        def revive(self, channel):
            channels[0] = channel

        def __call__(self, *args, **kwargs):
            if channels[0] is None:
                self.revive(create_channel())
            # 執行oslo_messaging中impl_rabbit的ensure中的execute_method函數
            # execute_method最終又是回調到_consume函數
            return fun(*args, channel=channels[0], **kwargs), channels[0]

    revive = Revival()
    # 返回了一個_ensure閉包函數
    # 但oslo_messaging的ensure函數中的下一行便是執行該閉包函數
    return self.ensure(revive, revive, **ensure_options)

 

這里最后又調用了ensure函數,該ensure函數是關鍵,它里面進行了連接重連機制。查看ensure實現:

File:kombu/connection.py  Connection:ensure

def ensure(self, obj, fun, errback=None, max_retries=None,
           interval_start=1, interval_step=1, interval_max=1,
           on_revive=None):
           
    def _ensured(*args, **kwargs):
        got_connection = 0
        conn_errors = self.recoverable_connection_errors
        chan_errors = self.recoverable_channel_errors
        has_modern_errors = hasattr(
            self.transport, 'recoverable_connection_errors',
        )
        for retries in count(0):  # for infinity
            try:
                # 調用了Revival類的__call__函數
                # 在進行的一系列調用中如果有異常拋出則進入下面的重連機制
                return fun(*args, **kwargs)
            except conn_errors as exc:
                if got_connection and not has_modern_errors:
                    raise
                if max_retries is not None and retries > max_retries:
                    raise
                self._debug('ensure connection error: %r', exc, exc_info=1)
                self._connection = None
                self._do_close_self()
                errback and errback(exc, 0)
                remaining_retries = None
                if max_retries is not None:
                    remaining_retries = max(max_retries - retries, 1)
                # 嘗試重新建立連接,確保有連接建立成功
                self.ensure_connection(errback,
                                       remaining_retries,
                                       interval_start,
                                       interval_step,
                                       interval_max)
                # 在連接上獲取新的channel
                new_channel = self.channel()
                self.revive(new_channel)
                obj.revive(new_channel)
                if on_revive:
                    # 調用oslo_messaging中的on_reconnection函數
                    # 將獲得的新channel賦給Connection類的channel
                    on_revive(new_channel)
                got_connection += 1
            except chan_errors as exc:
                if max_retries is not None and retries > max_retries:
                    raise
                self._debug('ensure channel error: %r', exc, exc_info=1)
                errback and errback(exc, 0)
    _ensured.__name__ = "%s(ensured)" % fun.__name__
    _ensured.__doc__ = fun.__doc__
    _ensured.__module__ = fun.__module__
    return _ensured        

可以看到如果是觸發了異常則進入下面的異常處理,進行重連和回調函數調用等操作,所以如果進行了重連,就會觸發到oslo_messaging那邊定義的很多內嵌函數來協助處理重連邏輯。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM