1 消息隊列Rabbitmq介紹
Rabbitmq的整體架構圖
(1)Rabbitmq Server:中間那部分就是Rabbitmq Server,也叫broken server,主要是負責消息的傳遞,保證client A、B發送的消息Cleint 1、2、3能夠正確的接收到。
(2)Client A、B:在消息隊列里我們稱之為生產者-Producer,發送消息的客戶端。
(3)Client 1、2、3:在消息隊列里我們稱之為消費者-Consume,接收消息的客戶端。
(4)Exchange:我們可以稱之為消息隊列的路由,根據發送的消息的routing key來轉發到對應的隊列上。有四種類型的Exchange,對應四種不同的轉發策略:
direct Exchange:完全匹配,比如routing key是abc,就對應binding key為abc對應的queue。
topic Exchange:正則匹配,比如routing key是ab*,可以用來匹配binding key為abc或abd等的queue。
fanout Exchange:廣播策略,忽略掉routing key,轉發給所有綁定在這個Exchange的queue。
headers Exchange:不依賴於routing key,會根據發送的消息的內容的headers屬性來進行匹配。
(5)Queue:隊列,消息存放的地方。
(6)Connection (連接)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連接。一些應用需要多個connection,為了節省TCP 連接,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連接的連接。連接需要用戶認證,並且支持 TLS (SSL)。連接需要顯式關閉。
(7)vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離
(8)Message(消息):在通道上傳輸的二進制對象,結構為Headers(頭)、Properties (屬性)和 Data (數據)。
以下是消息的幾個重要屬性:
routing key:Exchange根據該key來轉發消息到對應的隊列中
delivery_mode:消息模式,有持久模式和非持久模式,持久模式則是將消息保存到磁盤中,非持久模式則是消息保存在內存中
reply_to:RPC調用時處理結果返回需傳送到的隊列名,稱為回調隊列
correlation_id:RPC調用返回時需要用到的參數,一個請求id
content_type:這個編碼類型是給生產者和消費者使用的,rabbitmq只是按原樣傳輸的
對應到OpenStack的平台則是:
Client端的生產者可以是nova-api,nova-conductor等,以虛擬機開機為例,則nova-api是生產者,nova-api收到一個http請求,產生一個開機消息,exchange是’nova’,發送的隊列名compute.hostname,routing key為隊列名,然后發送到Rabbitmq Server上去,消息隊列服務保存到對應的隊列上,然后將消息派發給消費者。因為消費者跟rabbitmq服務是建立了一條channel連接的,所以派發消息就相當於是通過這條channel傳送數據。
消費者則對應是nova-compute,nova-compute接收到消息后進行解析,然后調用對應的函數進行處理,然后將處理結果返回。
2 Rabbitmq集群模式
Rabbitmq集群工作原理圖:
Rabbitmq是用Erlang語言寫的,該語言天生有分布式特性,本身支持原生的HA模式。
普通的消息隊列集群會始終同步四種類型的內部元數據:
(1)隊列元數據:隊列名稱和它的屬性
(2)交換器元數據:交換器名稱、類型和屬性
(3)綁定(binding)元數據:一張簡單的表格展示如何將消息路由到對應的隊列
(4)Vhost元數據:為vhost內的隊列、交換器和綁定提供命名空間和安全屬性
但普通模式並沒有對消息隊列的消息進行同步,需要設置成鏡像模式,才會對消息進行同步。
2.1 同步原理
通過鏡像模式,Rabbitmq會將鏡像隊列放置於多個消息隊列服務節點上,消息的生產和消費都會在節點間進行同步,鏡像隊列包含一個master和多個slave,當master退出時,時間最長的則升為master。
每個消息隊列進程會創建一個gm(guaranteed multicast)進程,鏡像隊列中的所有gm進程會組成一個gm進程組用於廣播和接收消息。Gm組將集群中的節點組成一個環,主節點收到或處理完一個消息都會發起消息同步,消息沿着環形鏈走,當主節點接收到自己發的消息后則表示消息已經同步到所有的節點。
消息的發布和消費都是通過 master 隊列完成,master 隊列對消息進行處理同時將消息的處理動作通過 gm 廣播給所有 slave 隊列。
2.2 消息走向路徑
以開機一個虛擬機為例,環境狀況:244的nova-api對245上的虛擬機進行關機操作,245的compute.hostname的主隊列在242上
nova-api會將消息發到給主隊列242上的消息隊列服務器保存,242進行gm廣播,242通過channel將消息傳送到245上,245接收到消息進行處理
2.3 消息確認機制
程序中是在拿到消息后和開始處理前期間進行了message.acknowledge()的調用,調用后即是告訴消息隊列服務,該消息已經被處理完了,可以進行刪除了。
從實踐來看確實是acknowledge調用了后才刪除的,但程序是實際調用后才開始執行消息處理函數,期間如果有異常報錯沒有處理成功則也不會重新處理了。
Openstack平台沒有對no_ack進行設置,查看kombu的代碼默認no_ack是false的,也就是需要進行確認才會刪除消息。
也可以發送nack的方式表示消息處理有問題,這時如果隊列的requeue設置為true,則會重新進入隊列交由其它消費者進行處理,默認是為false。
dead lettering機制:當調用了reject或nack且requeue是false時或者消息過期時,該機制會將失敗的消息放入到dead-lettered隊列中。
3 OpenStack RPC框架
3.1 接收消息(以nova-compute服務為例)
雲平台消息隊列RPC處理框架圖:
1.這里以nova-compute服務啟動為例進行講解,nova-compute服務啟動,會通過配置文件解析獲取一個Transport類對象,Transport對象里引用了RabbitDriver類對象
(1)Transport類作用:通過配置文件獲取對應的_driver,使用_driver來發送消息
(2)RabbitDriver類作用:用於發送消息和創建監聽類
2.nova-compute通過調用get_server函數獲取RPCServer類對象,類對象聚合了Transport類對象和RPCDispatcher類對象
(1)RPCServer類作用:初始化rpc監聽服務,創建隊列
(2)RPCDispatcher類作用:收到消息后進行解析找到相對應的函數進行調用
3.調用RPCServer類對象的start方法,里面調用 _create_listener方法創建監聽者AMQPListener類對象,用於作為綁定為消費者的回調對象,該類對象引用了一個Connection類對象conn。使用conn定義隊列,最后返回一個PollStyleListenerAdapter類對象
(1)AMQPListener類作用:作為消費者綁定的回調對象,同時poll方法用於獲取消息
(2)PollStyleListenerAdapter類作用:創建線程不斷獲取消息
(3)Connection類作用:獲取了kombu的connection對象,用於進行消費者、隊列定義和重連接等邏輯相關操作,使用Consumer類來管理消費者
(4)Consumer類作用:一個Consumer類對象代表一個消費者,里面保存了消費者信息和定義消費者的方法
(5)AMQPIncomingMessage類作用:消息進行解析后初始化為該類對象,代表一個消息的結構,里面有reply方法用於返回消息處理結果給發布消息者
4.PollStyleListenerAdapter類調用start方法開啟一個線程while循環專門調用AMQPListener類對象的poll方法進行消息獲取
5.poll函數會讀取incomings隊列里有沒有消息,如果有則表示拿到一個未處理消息發給Dispatcher類去處理這個消息,如果該隊列空了,則調用drain_events方法去獲取各channel上傳過來的消息並將它們存到inconings隊列中。
6.drain_events方法機理:從strace工具看到該nova-compute服務有大量的epoll_wait方法調用,可知采用了事件觸發的方式。
3.2 發送消息
由於是發送消息,所以只要看右邊的RPCClient端那部分就可以了:
1.跟接收消息一樣,會根據target生成一個Transport類對象,該對象根據配置文件會獲取一個driver對象,我們的是RabbitDriver對象,繼承於AMQPDriverBase類;
2.獲取一個_CallContext類對象,引用了Transport類對象
(1)_CallContext類作用:用來發送消息,對消息進行序列化並調用Transport類對象的driver來進行消息發送
3.獲取Connection類對象進行消息發送
4.Connection類中通過kombu中的Producer類的發送方法進行消息發出
3.3 重連機制
每個消費者都是建立在一個channel上的,channel是建立在Tcp連接上的,如果連接的rabbitmq服務節點關閉了,則連接會斷開,因此需要重新在其它未關閉節點上建立連接,重新建立channel和消費者。
重連機制的代碼存在於impl_driver.py中的Connection類的consume函數中
這里有兩個地方是可以檢測到連接斷開了,需要重連的,一個是在讀取socket時發現,一個是在心跳檢測機制里發現。
讀socket時拋異常觸發的重連:
1. 由上面分析我們知道程序會不斷調用Connection的consume函數進行獲取消息,該函數會調用到kombu的Connection類的autoretry函數,同時傳入了_consume函數作為參數
2. autoretry又調用到ensure函數,該函數主要作用是調用傳進來的_consume函數,如果有異常拋出,則進入異常重連處理,調用on_error函數,再調用ensure_connection函數確保重新建立好一條新的連接,然后在連接上建立新的channel,最后將channel進行更新。
心跳檢測機制觸發的重連:
1. 在服務啟動后就有一個專門的線程定時發包檢測連接是否正常,超時60秒則觸發異常
2. 觸發異常后調用ensure_connection函數將當前channel置為None,從而觸發重建channel
消費者的重新建立:
1.在_consume函數中每次都會去判斷self._new_tags集合是不是不為空,如果不為空則會重新建立這些tag的消費者,執行建立函數后就會把它remove掉,關鍵代碼邏輯:
2.而_new_tags的獲取則是根據異常拋出,檢測異常類型來重新賦予之前消費者的tags,以此重新建立消費者,關鍵代碼邏輯:
4 代碼流程解析
4.1 nova-compute啟動流程
Openstack的服務啟動都是先從cmd目錄下的main函數開始執行的,比如nova-compute服務的啟動則是nova/cmd/compute.py文件中的main函數開始執行:
File:nova/cmd/compute.py
def main(): # 調用nova/service.py文件的Service類的create類方法實例化一個service類 server = service.Service.create(binary='nova-compute', topic=CONF.compute_topic) service.serve(server) service.wait()
得到server后調用server函數進行服務運行:
File:nova/service.py
def serve(server, workers=None): global _launcher if _launcher: raise RuntimeError(_('serve() can only be called once')) # 這里的service是指oslo_service包導入的service了 # 調用到oslo_service/service.py的launch方法 _launcher = service.launch(CONF, server, workers=workers)
launch函數調用oslo_service包的service.py的launch方法初始化一個ServiceLauncher實例,並調用launch_service函數:
File:oslo_service/service.py
def launch(conf, service, workers=1, restart_method='reload'): if workers is not None and workers <= 0: raise ValueError(_("Number of workers should be positive!")) # 默認傳入的是None if workers is None or workers == 1: # 這里是初始化一個繼承了Launcher類的ServiceLauncher類實例 launcher = ServiceLauncher(conf, restart_method=restart_method) else: launcher = ProcessLauncher(conf, restart_method=restart_method) # 調用Launcher類里的launch_service方法,launch_service方法運行給定的service launcher.launch_service(service, workers=workers) return launcher
Launch_service函數調用了父類的實現:
File:oslo_service/service.py Launch:launch_service
def launch_service(self, service, workers=1): if workers is not None and workers != 1: raise ValueError(_("Launcher asked to start multiple workers")) _check_service_base(service) service.backdoor_port = self.backdoor_port # 調用Services類的add方法來運行給定service # 其實最后也就是開辟了個綠色線程池和獲取一個綠色線程運行service的start方法 self.services.add(service)
我們可以直接看service的start方法:
File:nova/service.py Service:start
def start(self): ...... # 初始化oslo_messaging/target.py的Target類 target = messaging.Target(topic=self.topic, server=self.host) endpoints = [ self.manager, baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port) ] endpoints.extend(self.manager.additional_endpoints) # 獲取nova/objects/base.py中的NovaObjectSerializer類實例 # 用來序列化nova服務中的對象 serializer = objects_base.NovaObjectSerializer() # 獲取一個oslo_messaging/rpc/RPCServer類實例 self.rpcserver = rpc.get_server(target, endpoints, serializer) # 調用到oslo_messaging/server.py的MessageHandlingServer類的start方法 self.rpcserver.start() ......
看下get_server實現:
File:nova/rpc.py
def get_server(target, endpoints, serializer=None): # TRANSPORT一個transport類對象,里面包含發消息的driver實現對象,如果是rabbit則對應到實現rabbit的driver類 # 獲取TRANSPORT對象:<class 'oslo_messaging.transport.Transport'> # 更重要的是transport對象里的driver對象: oslo_messaging._drivers.impl_rabbit.RabbitDriver assert TRANSPORT is not None if profiler: serializer = ProfilerRequestContextSerializer(serializer) else: serializer = RequestContextSerializer(serializer) # get_rpc_server在oslo_messaging/rpc/server.py文件中 # 獲取一個RPCServer實例 return messaging.get_rpc_server(TRANSPORT, target, endpoints, executor='eventlet', serializer=serializer)
查看get_rpc_server實現:
File:oslo_messaging/rpc/server.py
def get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None, access_policy=None): # 獲取一個消息調度員,它能識別收到的消息的結構 # 用於接收到消息后進行消息分發處理 A message dispatcher which understands RPC messages # oslo_messaging/rpc/dispatcher.py類的RPCDispatcher類 # 解析消息然后調用相對應的方法進行處理 dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer, access_policy) # 該類有個關鍵函數_process_incoming是在接收到消息時進行回調的 return RPCServer(transport, target, dispatcher, executor)
獲取到RPCServer實例后,調用start方法,因為RPCServer繼承於MessageHandlingServer但沒有實現,所以是調用父類的start方法:
File:oslo_messaging/server.py MessageHandlingServer:start
def start(self, override_pool_size=None): ...... try: # 這里程序調用到的是oslo_messaging/rpc/server.py的RPCServer類的_create_listener方法 self.listener = self._create_listener() except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) ...... self.listener.start(self._on_incoming)
File:oslo_messaging/rpc/server.py RPCServer:_create_listener
def _create_listener(self): # oslo_messaging/transport/Transport類_listen方法 return self.transport._listen(self._target, 1, None)
File:oslo_messaging/transport.py Transport:_listen
def _listen(self, target, batch_size, batch_timeout): if not (target.topic and target.server): raise exceptions.InvalidTarget('A server\'s target must have ' 'topic and server names specified', target) # 這個_driver對應的是oslo_messaging._drivers.impl_rabbit.RabbitDriver return self._driver.listen(target, batch_size, batch_timeout)
File:oslo_messaging/_drivers/amqpdriver.py AMQPDriverBase:listen
def listen(self, target, batch_size, batch_timeout): # 這里是從連接池里獲取一個連接對象 # conn是oslo_messaging/_drivers/impl_rabbit.py的類Connection實例 conn = self._get_connection(rpc_common.PURPOSE_LISTEN) # 這個listen很關鍵,它被綁定為消費者的回調對象,也就是收到消息時是調用該對象,該對象實現了__call__方法 # 所以可直接調用 listener = AMQPListener(self, conn) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic=target.topic, callback=listener) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic='%s.%s' % (target.topic, target.server), callback=listener) conn.declare_fanout_consumer(target.topic, listener) # 返回一個實現poll模式監聽消息到來的類 return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout)
創建好監聽類后,調用start方法:
File:oslo_messaging/_drivers/base.py PollStyleListenerAdapter:start
def start(self, on_incoming_callback): super(PollStyleListenerAdapter, self).start(on_incoming_callback) self._started = True # _listen_thread在__init__方法中定義了,如下行所示 # self._listen_thread = threading.Thread(target=self._runner) # 所以是開啟一個線程運行_runner函數 self._listen_thread.start()
再來看下_runner函數:
File:oslo_messaging/_drivers/base.py PollStyleListenerAdapter:_runner
def _runner(self): while self._started: # 這里poll調用oslo_messaging/_drivers/amqpdriver.py的AMQPListener類poll函數 incoming = self._poll_style_listener.poll( batch_size=self.batch_size, batch_timeout=self.batch_timeout) # 讀到有消息,調用回調函數進行處理 if incoming: # 該on_incoming_callback是oslo_messaging/server.py中self.listener.start(self._on_incoming)中傳進來的 # 是該文件中MessageHandlingServer類的_on_incoming函數 # _on_incoming函數又調用到了RPCServer中的_process_incoming函數 self.on_incoming_callback(incoming)
這里先來看下poll函數實現:
File:oslo_messaging/_drivers/amqpdriver.py AMQPListener:poll
def poll(self, timeout=None): stopwatch = timeutils.StopWatch(duration=timeout).start() while not self._shutdown.is_set(): self._message_operations_handler.process() if self.incoming: # 從incoming列表中獲取第一個消息返回 return self.incoming.pop(0) left = stopwatch.leftover(return_none=True) if left is None: left = self._current_timeout if left <= 0: return None try: # 獲取所有隊列的消息 # oslo_messaging/dr_drivers/impl_rabbit.py的Connection類的consume函數 # 將獲取到的消息經過解析存到incoming列表中 self.conn.consume(timeout=min(self._current_timeout, left)) except rpc_common.Timeout: self._current_timeout = max(self._current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN # NOTE(sileht): listener is stopped, just processes remaining messages # and operations self._message_operations_handler.process() if self.incoming: return self.incoming.pop(0) self._shutoff.set()
這里關鍵函數是調用consume進行消息獲取,調用到了Connection類的consume函數,以下是該函數的關鍵語句:
# 調用kombu/connection.py的Connection類的drain_events方法,等待來自服務器的單個事件,所以這是事件觸發型的
# 其中里面的supports_librabbitmq()=False(因為環境支持’eventlet’,所以未采用’default’,所以返回False
# 最后是調用到kombu/transport/pyamqp.py的drain_events方法
# 再調用到amqp包的Connection類的drain_events(amqp/connection.py)
File:oslo_messaging/_drivers/impl_rabbit.py Connection:consume
self.connection.drain_events(timeout=poll_timeout)
amqp包的drain_events實現
File:amqp/connection.py Connection: drain_events
def drain_events(self, timeout=None): """Wait for an event on a channel.""" # 等待事件通知 chanmap = self.channels chanid, method_sig, args, content = self._wait_multiple( chanmap, None, timeout=timeout, ) channel = chanmap[chanid] if (content and channel.auto_decode and hasattr(content, 'content_encoding')): try: content.body = content.body.decode(content.content_encoding) except Exception: pass amqp_method = (self._method_override.get(method_sig) or channel._METHOD_MAP.get(method_sig, None)) if amqp_method is None: raise AMQPNotImplementedError( 'Unknown AMQP method {0!r}'.format(method_sig)) if content is None: return amqp_method(channel, args) else: return amqp_method(channel, args, content)
到amqp包我們就不深究下去了
4.1 收到消息時行為
接着看下有消息到來時執行的回調函數,從前面我們可知在創建消費者時我們綁定了個listen對象作為callback,如下:
File:oslo_messaging/_drivers/amqpdriver.py AMQPDriverBase:listen
listener = AMQPListener(self, conn) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic=target.topic, callback=listener)
所以當收到消息時會調用AMQPListener類的__call__函數:
def __call__(self, message): # 收到消息后解析消息結構體並構建成AMQPIncomingMessage結構 # type(message)對應類:<class 'oslo_messaging._drivers.impl_rabbit.RabbitMessage'> ctxt = rpc_amqp.unpack_context(message) unique_id = self.msg_id_cache.check_duplicate_message(message) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) else: LOG.debug("received message with unique_id: %s", unique_id) self.incoming.append(AMQPIncomingMessage( self, ctxt.to_dict(), message, unique_id, ctxt.msg_id, ctxt.reply_q, self._obsolete_reply_queues, self._message_operations_handler))
可以看到收到message后,解析message並構建為AMQPIncomingMessage實例append到incoming隊列中。
消息到隊列中之后,當我們取到一個消息后做的事情,也就是回到_runner函數中的邏輯:
File:oslo_messaging/_drivers/base.py PollStyleListenerAdapter:_runner
def _runner(self): while self._started: # 這里poll調用oslo_messaging/_drivers/amqpdriver.py的AMQPListener類poll函數 incoming = self._poll_style_listener.poll( batch_size=self.batch_size, batch_timeout=self.batch_timeout) # 讀到有消息,調用回調函數進行處理 if incoming: # 該on_incoming_callback是oslo_messaging/server.py中self.listener.start(self._on_incoming)中傳進來的 # 是該文件中MessageHandlingServer類的_on_incoming函數 # _on_incoming函數又調用到了RPCServer中的_process_incoming函數 self.on_incoming_callback(incoming)
可知是調用了_process_incoming函數來處理消息:
# 這里poll調用oslo_messaging/_drivers/amqpdriver.py的AMQPListener類poll函數
# 處理消息
File:oslo_messaging/rpc/server.py RPCServer:__process_incoming
def _process_incoming(self, incoming): message = incoming[0] try: # 這里進行了消息確認發送 # 會調用到kombu/message.py的Message類的ack函數 # 表示該消息已經進行消費了,隊列中可以刪除該消息了 message.acknowledge() except Exception: LOG.exception(_LE("Can not acknowledge message. Skip processing")) return failure = None try: # 調用oslo_messaging/rpc/dispatcher.py的RPCDispatcher類來處理消息 # 該類的職責是找到消息對應的方法並執行 res = self.dispatcher.dispatch(message) except rpc_dispatcher.ExpectedException as e: failure = e.exc_info LOG.debug(u'Expected exception during message handling (%s)', e) except Exception: # current sys.exc_info() content can be overridden # by another exception raised by a log handler during # LOG.exception(). So keep a copy and delete it later. failure = sys.exc_info() LOG.exception(_LE('Exception during message handling')) try: # 將執行結果返回 if failure is None: message.reply(res) else: message.reply(failure=failure) except Exception: LOG.exception(_LE("Can not send reply for message")) finally: # NOTE(dhellmann): Remove circular object reference # between the current stack frame and the traceback in # exc_info. del failure
核心函數是dispatch函數:
File:oslo_messaging/rpc/dispatcher.py RPCDispatcherr:dispatch
def dispatch(self, incoming): """Dispatch an RPC message to the appropriate endpoint method. :param incoming: incoming message :type incoming: IncomingMessage :raises: NoSuchMethod, UnsupportedVersion """ message = incoming.message ctxt = incoming.ctxt method = message.get('method') args = message.get('args', {}) namespace = message.get('namespace') version = message.get('version', '1.0') found_compatible = False # endpoints值是兩個類 # [<nova.compute.manager.ComputeManager object at 0x7f6eef157dd0>, <nova.baserpc.BaseRPCAPI object at 0x7f6ee4724c90>] # 從類中查找出方法進行調用 for endpoint in self.endpoints: target = getattr(endpoint, 'target', None) if not target: target = self._default_target if not (self._is_namespace(target, namespace) and self._is_compatible(target, version)): continue if hasattr(endpoint, method): if self.access_policy.is_allowed(endpoint, method): return self._do_dispatch(endpoint, method, ctxt, args) found_compatible = True if found_compatible: raise NoSuchMethod(method) else: raise UnsupportedVersion(version, method=method)
File:oslo_messaging/rpc/dispatcher.py RPCDispatcherr:dispatch
def _do_dispatch(self, endpoint, method, ctxt, args): ctxt = self.serializer.deserialize_context(ctxt) new_args = dict() for argname, arg in args.items(): new_args[argname] = self.serializer.deserialize_entity(ctxt, arg) func = getattr(endpoint, method) # 調用方法 result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result)
4.3 發送消息流程
這里看一個發送開機指令到宿主機執行的流程。
首先由novaclient發送http請求到nova-api服務,對應調用到_start_server函數:
File:nova/api/openstack/compute/server.py ServersController:_start_server
def _start_server(self, req, id, body): ..... try: # nova/compute/api.py self.compute_api.start(context, instance) except (exception.InstanceNotReady, exception.InstanceIsLocked) as e: .....
查看start方法實現:
File:nova/compute/api.py API:start
def start(self, context, instance): ..... instance.task_state = task_states.POWERING_ON instance.save(expected_task_state=[None]) self._record_action_start(context, instance, instance_actions.START) self.compute_rpcapi.start_instance(context, instance) .....
可以看到是先將task_state狀態改為了POWERING_ON后再發送消息,查看start_instance方法的實現:
File:nova/compute/rpcapi.py ComputeAPI:start_instance
def start_instance(self, ctxt, instance): version = '4.0' # self.router.by_instance(ctxt, instance)獲取oslo_messaging/rpc/client.py的RPCClient類實例 # prepare方法是用於設置一些屬性並生成一個oslo_messaging/rpc/client.py的_CallContext類實例 cctxt = self.router.by_instance(ctxt, instance).prepare( server=_compute_host(None, instance), version=version) # 該call方法是調用到oslo_messaging/rpc/client.py的_BaseCallContext(_CallContext的父類)類的call方法 return cctxt.call(ctxt, 'start_instance', instance=instance)
查看call方法實現:
File:oslo_messaging/rpc/client.py _BaseCallContext:call
def call(self, ctxt, method, **kwargs): """Invoke a method and wait for a reply. See RPCClient.call().""" if self.target.fanout: raise exceptions.InvalidTarget('A call cannot be used with fanout', self.target) # 生成一個msg msg = self._make_message(ctxt, method, kwargs) # 序列化ctxt msg_ctxt = self.serializer.serialize_context(ctxt) timeout = self.timeout if self.timeout is None: timeout = self.conf.rpc_response_timeout self._check_version_cap(msg.get('version')) try: # 調用transport中的_send方法 # _send方法中又是使用_driver對象調用send方法 # _driver對象是oslo_messaging._drivers.impl_rabbit.RabbitDriver # 所以是調用RabbitDriver類里的send方法,實際是調用它父類AMQPDriverBase的send方法 result = self.transport._send(self.target, msg_ctxt, msg, wait_for_reply=True, timeout=timeout, retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) return self.serializer.deserialize_entity(ctxt, result)
查看send方法實現:
File:oslo_messaging/_drivers/amqpdriver.py AMQPDriverBase:_send
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, envelope=True, notify=False, retry=None): msg = message if wait_for_reply: msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) # _get_reply_q方法獲取一個ReplyWaiter對象,開始poll模式等待獲取返回消息 # 同時使用waiters(ReplyWaiter實例對象)管理返回的消息 msg.update({'_reply_q': self._get_reply_q()}) # 獲取一個唯一uuid添加到msg字典中 rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] # 把ctxt字典值也更新到msg中 rpc_amqp.pack_context(msg, ctxt) if envelope: msg = rpc_common.serialize_msg(msg) if wait_for_reply: # 把該消息加到waiters里去監控管理 self._waiter.listen(msg_id) log_msg = "CALL msg_id: %s " % msg_id else: log_msg = "CAST unique_id: %s " % unique_id try: # 根據target保存的發送模式發送消息 with self._get_connection(rpc_common.PURPOSE_SEND) as conn: if notify: exchange = self._get_exchange(target) log_msg += "NOTIFY exchange '%(exchange)s'" \ " topic '%(topic)s'" % { 'exchange': exchange, 'topic': target.topic} LOG.debug(log_msg) conn.notify_send(exchange, target.topic, msg, retry=retry) elif target.fanout: log_msg += "FANOUT topic '%(topic)s'" % { 'topic': target.topic} LOG.debug(log_msg) conn.fanout_send(target.topic, msg, retry=retry) else: topic = target.topic exchange = self._get_exchange(target) if target.server: topic = '%s.%s' % (target.topic, target.server) log_msg += "exchange '%(exchange)s'" \ " topic '%(topic)s'" % { 'exchange': exchange, 'topic': topic} LOG.debug(log_msg) # 發送topic模式的隊列 # 調用oslo_messaging/_drivers/impl_rabbit.py中Connection類實例的topic_send方法 conn.topic_send(exchange_name=exchange, topic=topic, msg=msg, timeout=timeout, retry=retry) if wait_for_reply: # 等待返回或消息超時返回 # 輪詢方式檢測消息有沒有被返回並放到對應的字典中 result = self._waiter.wait(msg_id, timeout) if isinstance(result, Exception): raise result return result finally: if wait_for_reply: self._waiter.unlisten(msg_id)
當消息返回或消息超時時就返回結果給call調用,這個請求就完成了
4.4 重連機制
當比如當前連接着的rabbitmq服務斷開了時,則連接會斷開,則需要重新建立連接,建立新的channel,然后重新建立消費者。
這個邏輯在Connection的consume函數中,是由監聽類的poll函數進行不斷調用的,我們看下該函數實現:
File:oslo_messaging/_drivers/impl_rabbit.py Connection:consume
def consume(self, timeout=None): """Consume from all queues/consumers.""" timer = rpc_common.DecayingTimer(duration=timeout) timer.start() def _raise_timeout(exc): LOG.debug('Timed out waiting for RPC response: %s', exc) raise rpc_common.Timeout() def _recoverable_error_callback(exc): # 判斷異常類型是不是非Timeout類型,因為Timeout類型是由drain_events函數獲取 # 消息等待超時導致的,屬於正常的異常,除了這種異常,其它異常則都會被視作需要重建消費者 if not isinstance(exc, rpc_common.Timeout): self._new_tags = set(self._consumers.values()) timer.check_return(_raise_timeout, exc) def _error_callback(exc): # 將異常交給_recoverable_error_callback函數進行處理 _recoverable_error_callback(exc) LOG.error(_LE('Failed to consume message from queue: %s'), exc) def _consume(): # NOTE(sileht): in case the acknowledgment or requeue of a # message fail, the kombu transport can be disconnected # In this case, we must redeclare our consumers, so raise # a recoverable error to trigger the reconnection code. # 這里是判斷了連接是否還正常,如果不正常,我們需要重新獲取連接並且重新定義consumer if not self.connection.connected: # 這里拋錯以進入重連機制 raise self.connection.recoverable_connection_errors[0] while self._new_tags: for consumer, tag in self._consumers.items(): if tag in self._new_tags: # 如果是新標簽則消費者也需建立 # 在重建channel時這里就是重建消費者了 # 這里consumer是該文件的Consumer類實例,該consume函數會調用到kombu中的consume函數定義消費者 consumer.consume(self, tag=tag) self._new_tags.remove(tag) poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) while True: if self._consume_loop_stopped: return if self._heartbeat_supported_and_enabled(): # 心跳檢查,如果連不通則拋錯 # 拋錯則會在kombu中進行重連機制 self._heartbeat_check() try: # 調用kombu/connection.py的Connection類的drain_events方法,等待來自服務器的單個事件,所以這是事件觸發型的 # 其中里面的supports_librabbitmq()=False(因為環境支持’eventlet’,所以未采用’default’,所以返回False # 最后是調用到kombu/transport/pyamqp.py的drain_events方法 # 再調用到amqp包的Connection類的drain_events(amqp/connection.py) self.connection.drain_events(timeout=poll_timeout) return except socket.timeout as exc: # 超時會進入這個邏輯,check_return會raise一個Exception,從而導致ensure中拋異常被捕獲調用了error_callback函數 # error_callback函數又調用了recoverable_error_callback函數 # 從而導致日志中經常可以看到_recoverable_error_callback poll_timeout = timer.check_return( _raise_timeout, exc, maximum=self._poll_timeout) with self._connection_lock: self.ensure(_consume, recoverable_error_callback=_recoverable_error_callback, error_callback=_error_callback)
這里很多內嵌函數都會通過傳參的方式傳入到其它方法中處理,然后由其它方法在檢測到異常時執行。我們可以看到最終是執行了ensure函數,我們看下ensure函數的實現:
File:oslo_messaging/_drivers/impl_rabbit.py Connection:ensure
def ensure(self, method, retry=None, recoverable_error_callback=None, error_callback=None, timeout_is_error=True): ..... # 在kombu中如果進入了異常重連處理機制會回調該函數 def on_error(exc, interval): LOG.debug("[%s] Received recoverable error from kombu:" % self.connection_id, exc_info=True) # 執行_recoverable_error_callback函數處理異常 recoverable_error_callback and recoverable_error_callback(exc) interval = (self.kombu_reconnect_delay + interval if self.kombu_reconnect_delay > 0 else interval) info = {'err_str': exc, 'sleep_time': interval} info.update(self._get_connection_info()) if 'Socket closed' in six.text_type(exc): LOG.error(_LE('[%(connection_id)s] AMQP server' ' %(hostname)s:%(port)s closed' ' the connection. Check login credentials:' ' %(err_str)s'), info) else: LOG.error(_LE('[%(connection_id)s] AMQP server on ' '%(hostname)s:%(port)s is unreachable: ' '%(err_str)s. Trying again in ' '%(sleep_time)d seconds. Client port: ' '%(client_port)s'), info) ...... # 當在kombu中執行autoretry時拋出異常了並在異常處理時重新連接了其它節點 # 則會回調該函數 def on_reconnection(new_channel): # 更新channel self._set_current_channel(new_channel) self.set_transport_socket_timeout() def execute_method(channel): # 更新channel self._set_current_channel(channel) # 這個method指的就是_consume函數 # 注意我這里指的是consume調入的時候該method就是_consume函數 # 因為該ensure函數是很多函數都會調用的,每個函數都會傳入它的method函數進行回調 # 我這里是為了方便理解就這樣指明了,文中還有很多地方也是如此指明,就不一一解釋了 method() # NOTE(sileht): Some dummy driver like the in-memory one doesn't # have notion of recoverable connection, so we must raise the original # exception like kombu does in this case. has_modern_errors = hasattr( self.connection.transport, 'recoverable_connection_errors', ) if has_modern_errors: recoverable_errors = ( self.connection.recoverable_channel_errors + self.connection.recoverable_connection_errors) else: recoverable_errors = () try: autoretry_method = self.connection.autoretry( execute_method, channel=self.channel, max_retries=retry, errback=on_error, interval_start=self.interval_start or 1, interval_step=self.interval_stepping, interval_max=self.interval_max, on_revive=on_reconnection) ret, channel = autoretry_method() self._set_current_channel(channel) return ret except recoverable_errors as exc: LOG.debug("Received recoverable error from kombu:", exc_info=True) # 在kombu重建立連接失敗時會跑入該邏輯,調用error_callback # 如果是consume函數調入該函數的話,則該函數是_error_callback函數 error_callback and error_callback(exc) self._set_current_channel(None) # NOTE(sileht): number of retry exceeded and the connection # is still broken info = {'err_str': exc, 'retry': retry} info.update(self.connection.info()) msg = _('Unable to connect to AMQP server on ' '%(hostname)s:%(port)s after %(retry)s ' 'tries: %(err_str)s') % info LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) except rpc_amqp.AMQPDestinationNotFound: # NOTE(sileht): we must reraise this without # trigger error_callback raise except Exception as exc: error_callback and error_callback(exc) Raise
這個函數也是如此,定義了很多內嵌函數,然后作為參數傳遞到kombu中的autoretry函數中進行處理,方便有異常時就行異常處理且回調相對應的函數。查看autoretry函數實現:
File:kombu/connection.py Connection:autoretry
def autoretry(self, fun, channel=None, **ensure_options): channels = [channel] create_channel = self.channel class Revival(object): __name__ = getattr(fun, '__name__', None) __module__ = getattr(fun, '__module__', None) __doc__ = getattr(fun, '__doc__', None) def revive(self, channel): channels[0] = channel def __call__(self, *args, **kwargs): if channels[0] is None: self.revive(create_channel()) # 執行oslo_messaging中impl_rabbit的ensure中的execute_method函數 # execute_method最終又是回調到_consume函數 return fun(*args, channel=channels[0], **kwargs), channels[0] revive = Revival() # 返回了一個_ensure閉包函數 # 但oslo_messaging的ensure函數中的下一行便是執行該閉包函數 return self.ensure(revive, revive, **ensure_options)
這里最后又調用了ensure函數,該ensure函數是關鍵,它里面進行了連接重連機制。查看ensure實現:
File:kombu/connection.py Connection:ensure
def ensure(self, obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None): def _ensured(*args, **kwargs): got_connection = 0 conn_errors = self.recoverable_connection_errors chan_errors = self.recoverable_channel_errors has_modern_errors = hasattr( self.transport, 'recoverable_connection_errors', ) for retries in count(0): # for infinity try: # 調用了Revival類的__call__函數 # 在進行的一系列調用中如果有異常拋出則進入下面的重連機制 return fun(*args, **kwargs) except conn_errors as exc: if got_connection and not has_modern_errors: raise if max_retries is not None and retries > max_retries: raise self._debug('ensure connection error: %r', exc, exc_info=1) self._connection = None self._do_close_self() errback and errback(exc, 0) remaining_retries = None if max_retries is not None: remaining_retries = max(max_retries - retries, 1) # 嘗試重新建立連接,確保有連接建立成功 self.ensure_connection(errback, remaining_retries, interval_start, interval_step, interval_max) # 在連接上獲取新的channel new_channel = self.channel() self.revive(new_channel) obj.revive(new_channel) if on_revive: # 調用oslo_messaging中的on_reconnection函數 # 將獲得的新channel賦給Connection類的channel on_revive(new_channel) got_connection += 1 except chan_errors as exc: if max_retries is not None and retries > max_retries: raise self._debug('ensure channel error: %r', exc, exc_info=1) errback and errback(exc, 0) _ensured.__name__ = "%s(ensured)" % fun.__name__ _ensured.__doc__ = fun.__doc__ _ensured.__module__ = fun.__module__ return _ensured
可以看到如果是觸發了異常則進入下面的異常處理,進行重連和回調函數調用等操作,所以如果進行了重連,就會觸發到oslo_messaging那邊定義的很多內嵌函數來協助處理重連邏輯。