RPC通信原理(未完,先睡覺)


一 背景

    OpenStack 各組件之間是通過 REST 接口進行相互通信,比如Nova、Cinder、Neutron、Glance直間的通信都是通過keystone獲取目標的endpoint,即api(至於到底什么是restful風格的api請點擊紅色鏈接)

    而各組件內部則采用了基於 AMQP 模型的 RPC 通信。

    為了讓大家有更為直觀的認識,我們單拿出cinder的架構來舉例,至於cinder內部包含的具體組件,大家不必過於糾結,我們在后續的章節會詳細介紹,這里只需要有一個直觀的體會:rest和rpc分別在openstack的組件間和組件內通信所處位置。

 

 

1.Cinder-api 是 cinder 服務的 endpoint,提供 rest 接口,負責處理 client 請求,並將 RPC 請求發送至 cinder-scheduler 組件。


2.  Cinder-scheduler 負責 cinder 請求調度,其核心部分就是 scheduler_driver, 作為 scheduler manager 的 driver,負責 cinder-volume 具體的調度處理,發送 cinder RPC 請求到選擇的 cinder-volume。


3. Cinder-volume 負責具體的 volume 請求處理,由不同后端存儲提供 volume 存儲空間。


目前各大存儲廠商已經積極地將存儲產品的 driver 貢獻到 cinder 社區。目前支持的后端存儲系統,可參見:https://wiki.Openstack.org/wiki/CinderSupportMatrix
cinder內部各組件功能:選看

(注意:glance的內部組件的通信是直接調用的自己的api,除此之外,cinder,nova,neutron的內部組件通信都是基於rpc機制)

二 為何

 二 OpenStack RPC通信

Openstack 組件內部的 RPC(Remote Producer Call)機制的實現是基於 AMQP(Advanced Message Queuing Protocol)作為通訊模型,從而滿足組件內部的松耦合性。AMQP 是用於異步消息通訊的消息中間件協議,AMQP 模型有四個重要的角色:

  • Exchange:根據 Routing key 轉發消息到對應的 Message Queue 中
  • Routing key:用於 Exchange 判斷哪些消息需要發送對應的 Message Queue
  • Publisher:消息發送者,將消息發送的 Exchange 並指明 Routing Key,以便 Message Queue 可以正確的收到消息 
  • Consumer:消息接受者,從 Message Queue 獲取消息 (收郵件的人)

消息發布者 Publisher 將 Message 發送給 Exchange 並且說明 Routing Key。Exchange 負責根據 Message 的 Routing Key 進行路由,將 Message 正確地轉發給相應的 Message Queue。監聽在 Message Queue 上的 Consumer 將會從 Queue 中讀取消息。

Routing Key 是 Exchange 轉發信息的依據,因此每個消息都有一個 Routing Key 表明可以接受消息的目的地址,而每個 Message Queue 都可以通過將自己想要接收的 Routing Key 告訴 Exchange 進行 binding,這樣 Exchange 就可以將消息正確地轉發給相應的 Message Queue。

圖  AMQP 消息模型

圖 2. AMQP 消息模型

 

Publisher(發郵件的人

Exchange(相當於郵局

Message Queue(相當於自己家門口的郵筒

Routing Key(自己的家門口郵筒都需要在郵局注冊/binding

Consumer(收郵件的人

 

AMQP 定義了三種類型的 Exchange,不同類型 Exchange 實現不同的 routing 算法:

  • Direct Exchange:Point-to-Point 消息模式,消息點對點的通信模式,Direct Exchange 根據 Routing Key 進行精確匹配,只有對應的 Message Queue 會接受到消息
  • Topic Exchange:Publish-Subscribe(Pub-sub)消息模式,Topic Exchange 根據 Routing Key 進行模式匹配,只要符合模式匹配的 Message Queue 都會收到消息
  • Fanout Exchange:廣播消息模式,Fanout Exchange 將消息轉發到所有綁定的 Message Queue

OpenStack 目前支持的基於 AMQP 模型的 RPC backend 有 RabbitMQ、QPid、ZeroMQ,對應的具體實現模塊在 cinder 項目下 Openstack/common/RPC/目錄下,impl_*.py 分別為對應的不同 backend 的實現。其中RabbitMQ是最常用的一個。

三 RabbitMQ詳解

RabbitMQ是一個消息代理。它的主要原理相當簡單:接收並且轉發消息。

你可以把它想象成一個郵局:當你把你的郵箱放到油筒里時,肯定有郵遞員幫你把它分發給你的接收者。RabiitMQ同時是:一個郵筒、一個郵局,一個郵遞員。

RabbitMQ和郵局的主要不同是RabbitMQ處理的不是紙質,它接收、存儲並且分發二進制數據(即消息)

3.1 RabbitMQ的常用術語如下

producer(生產者):即一個生產消息的程序,只負責生產(sending),簡稱"P"

queue(隊列):即一個隊列就是一個郵筒的名字,它集成在RabbitMQ內部,雖然消息流通過RabbitMQ和你的應用程序,但其實消息可以只存儲在一個隊列里。隊列不受任何限制,它可以如你所願存放很多消息,它本質上就是一個無限的緩沖區。許多生產者producer可以發消息到一個隊列,許多消費者consumer可以嘗試從一個隊列queue里接收數據。

consumer(消費者):即一個接等待接收消息的程序,簡稱“C”

注意:producer,consumer,以及broker(即消息代理,指的就是RabbitMQ)不是必須要在同一台機器上,實際上,大多數情況下,三者是分布在不同的機器上

3.2 最簡單的用法:'Hello World'

RabbitMQ libraries

RabbitMQ遵循AMQP 0.9.1,AMQP是開源的,通用的消息協議,RabbitMQ支持很多不同語言的客戶端,作為python用戶,我們使用Pika模塊,可以使用pip工具安裝之,目前pika最新版本為0.10.0。

 

這里舉一個很簡單的例子,發送一個消息,接收它並且打印到屏幕。我們需要做的是:寫兩個程序,一個負責發消息,另外一個負責接收消息並且打印,如下圖

=============發送端=============

我們的第一個程序名send.py,用來發送一個消息到隊列queue中

第一步:第一件我們需要做的事情就是與RabbitMQ服務建立鏈接

#!/usr/bin/env python
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/', #虛擬主機,起到一個命名空間的作用
    credentials=pika.PlainCredentials('admin','admin') #用戶名,密碼
))

channel=connection.channel()

現在我們可以連接RabbitMQ服務了(broker),只不過此時連接的是本機的,如果RabbitMQ位於遠程主機,那么host="遠程主機ip"

第二步:接下來,我們需要確定接收消息的隊列是存在的,如果我們發送一條消息給一個不存在的隊列,RabbitMQ將把這條消息當做垃圾處理。因此就讓我們為將要分發的消息創建一個隊列,我們可以將該隊列命名為hello

channel.queue_declare(queue='hello')

第三步:此時,我們就已經為發送消息做好准備了,我們的第一個消息就只包含一個字符串“Hello world”吧,並且我們將這條消息發送到hello隊列。

需要強調的一點是:在RabbitMQ中,任何消息都不可能直接發送到隊列queue,消息必須先發送給exchange,后面我們會詳細介紹exchane,此處不必細究,我們只需要知道如何使用默認的exchange就可以了,即定義參數exchange='',空代表使用默認的。exchange允許我們明確地標識消息應該發往哪個隊列,對列名需要通過routing_key這個參數被定義。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

第四步:在退出這個程序之前,我們需要確定網絡緩存被刷新並且我們的消息真的被傳給了RabbitMQ,因此我們需要優雅地退出這個連接(這種退出方式可以確定緩存刷新和消息傳到了RabbitMQ)

connection.close()

注意: 如果執行send.py后沒有看到'Sent'這條打印內容,可能的原因是RabbitMQ的啟動盤沒有足夠的剩余空間(默認情況,安裝RabbitMQ的磁盤最小需要1Gb的剩余空間),沒有足夠的剩余空間就會拒絕接收消息。我們可以堅持RabbitMQ的日志文件來減少這種空間的限制,點擊來查看如何設置disk_free_limit

 

=============接收端=============

我們的第二個程序receive.py將從隊列中接收消息並且打印

第一步:同樣的,首先我們需要做的也是連接RabbitMQ,負責連接RabbitMQ的代碼和send.py中的一樣。

第二步:這一步需要做的也要確定隊列的存在,我們可以執行多次queue_declare,但是無論執行多少次,將只創建一次隊列

channel.queue_declare(queue='hello') #這個操作是冪等的

 

你肯定會問:為什么我們要再次聲明隊列呢,我們已經在send.py中聲明過一次了啊,沒錯,如果你能確定隊列已經存在了,完全沒有必要重新再定義一次,比如send.py先運行了,那么隊列肯定是存在的。

但問題的所在就在於,我們根本無法確定,到底是send.py先運行還是receive.py先運行,因此最保險的做法就是在兩個程序中都定義上這一條,反正是如果隊列已經有了就不會重新創建。

查看RabbitMQ有多少條消息,(授權用戶)可以使用rabbitmqctl tool:

$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.

第三步:比起發消息來說,從隊列中收消息是更加復雜一點,它訂閱一個callback函數到一個隊列,一旦收到消息,這個callback函數就會被(Pika庫)調用。此處我們就寫一個簡單的callback函數(只完成打印功能)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

 

第四步:我們需要告訴RabbitMQ,這個特殊的callback函數應該從我們的隊列接收消息。

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

 

要想讓上面這條命令正確執行,我們必須保證隊列我們訂閱的隊列是存在的,幸運的是,我們很有信心,因為我們已經在上面創建了一個隊列‒使用queue_declare。 

no_ack參數將在后面描述

第五步:最后,我們進入一個無休止的循環,等待數據,並且在必要時運行回調callback

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

=============完整版=============

send.py

#python AMQP SDK
import pika

#獲得連接對象
connection=pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/', #虛擬主機,起到一個命名空間的作用
    credentials=pika.PlainCredentials('admin','admin') #用戶名,密碼
))

#連接rabbitmq
channel=connection.channel()
channel.queue_declare(queue='hello') #創建隊列hello

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World')

print(" [x] Sent 'Hello World!'")
connection.close()

receive.py

import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/', #虛擬主機,起到一個命名空間的作用
    credentials=pika.PlainCredentials('admin','admin') #用戶名,密碼
))
channel=connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                     queue='hello',
                     no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

=============測試=============

運行send.py發消息

 $ python send.py
 [x] Sent 'Hello World!'

 

producer程序,即send.py每次運行完畢后都會停止,我們可以接收信息,即運行receive.py

$ python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

 

此時reveive.py不會退出,我們可以在其他終端開啟send.py來發消息,然后觀察receive.py收消息

3.3 Work Queues

    使用pika 0.10.0 python客戶端

    在3.2小節我們編寫了程序:從一個被命名的隊列中send和receive消息。本節,我們將創建一個Work隊列,它將被用來在多個wokers中分發耗費時間的任務。

Work隊列(又稱為任務隊列)背后的主要思想是為了避免立即執行一個資源密集型任務(耗時),並等待它完成。相反,我們不會等它的完成,我們會調度它之后要完成的任務。我們將任務封裝為消息並將其發送給隊列。后台運行的worker進程將彈出任務並最終執行任務。當您運行許多worker矜持的時候,這些任務將在它們之間共享。

    這個概念在web應用領域非常有用,比如web應用不能在一個短HTTP請求窗口期間處理一個復雜的任務

=============發送端=============

    在3.2小節,我們發送一個消息“Hello World”,現在我們將發送代表復雜任務的字符串(用來模擬耗時的任務,即將揭曉)。我們沒有一個真實的任務,如圖像進行調整或PDF文件被渲染,所以讓我們通過time.sleep()函數偽造一個耗時的任務,比如,一個偽造的任務被描述成Hello...,該任務將花費三秒鍾(幾個.就花費幾秒鍾)。

    我們稍微修改下3.2小節中send.py的代碼,來允許任意的數據能通過命令行被發送,這個程序就將調度任務給work隊列,程序的文件名new_task.py

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)

 =============接收端=============

    同樣我們在3.2小節定義的receive.py也需要做一些修改:消息體中包含幾個點,它就需要偽造執行幾秒鍾。它將從隊列中彈出消息並且執行,文件名worker.py

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

 

 =============round-robin調度=============

new_task.py

#_*_coding:utf-8_*_
#!/usr/bin/env python
import pika
import sys

#獲得連接對象
connection=pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.31.106',
    port=5672,
    virtual_host='/', #虛擬主機,起到一個命名空間的作用
    credentials=pika.PlainCredentials('admin','admin') #用戶名,密碼
))

#連接rabbitmq
channel=connection.channel()
channel.queue_declare(queue='task_queue') #創建隊列hello

message=''.join(sys.argv[1:]) or 'Hello World'
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2, #make message persistent
                      ))

print(" [x] Sent %r" % message)
connection.close()

 

worker.py(建立多個這種文件)

#_*_coding:utf-8_*_
#!/usr/bin/env python
import pika
import time
connection=pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.31.106',
    port=5672,
    virtual_host='/', #虛擬主機,起到一個命名空間的作用
    credentials=pika.PlainCredentials('admin','admin') #用戶名,密碼
))
channel=connection.channel()
channel.queue_declare(queue='task_queue')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

channel.basic_consume(callback,
                     queue='task_queue',
                     no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

同時啟動多個worker.py

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

 

多次運行new_tasks.py

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

 

查看workers,即worker.py的執行結果

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

 

結論:默認,rabbitmq將發送每個消息到下一個consumer(即worker.py),按順序一個個的來。平均每個consumer將得到相同數量的消息。這種分發消息的方式叫做round-robin。

 =============Message acknowledgment消息確認 =============

    處理一個任務可能需要花費幾秒鍾,你肯定會好奇,如果consumers中的一個(即woker.py),在開始一個耗時很長的任務但是還沒來得及完成任務的情況下就死掉了,那應該怎么辦。就我們當前的代碼而言,RabbitMQ一旦發消息分發給consumer了,它就會立即從內存中移除這條消息。這種情況下,如果你殺死了一個woker(比如worker.py,我們將丟失這條正在處理的消息),我們也將丟失發送給該特定woker但尚未處理的所有信息。

    很明顯,我們並不想丟失任何消息/任務,如果一個woker死了,我們希望把這個任務分發給另外一個woker。

    為了確定消息永不丟失,RabbitMQ支持消息確認( message acknowledgments

    consumer返回一個ack給RabbitMQ,告知RabbitMQ一條特定的消息已經被接收了、執行完了、並且RabbitMQ可以自由刪除該任務/消息了

    如果一個consumer死了(它的channel是掛壁了,連接是關閉了,或者TCP連接丟失)並且沒有發送ack,RabbitMQ將會知道一個消息沒有被完全執行完畢,並且將該消息重新放入隊列中。這種方式你可以確認沒有消息會丟失 。

    這里沒有任何消息超時;當consumer死掉以后,RabbitMQ將重新分發這個消息。即使執行一個消息需要花很長很長的時間,這種方式仍然是好的處理方式。

    消息確認(Message acknowledgments)默認是被打開的,在前面的例子里,我們明確地將其關閉掉了,通過no_ack=True.移除這條配置那么就默認開啟了。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag) #發送ack

channel.basic_consume(callback,
                      queue='hello') #去掉no_ack=True,默認就是False

 

    使用這種代碼,我們可以確定是否你殺死了一個worker(當它正在處理一個消息的時候通過CTRL+C殺死它),沒有消息會丟失,沒返回ack就死掉的worker,不久后消息就會被重新分發

    注意:一個經常性的錯誤是,callback函數中缺少back_ack,它是一個簡單的工作,但是結果是五花八門的,當你的客戶端退出后,消息將會重新分發,rabbitmq將占用越來越多的內存由於它不能夠釋任何沒有ack回應(unacked)的消息

    為了調試這種錯誤,你可以使用rabbitmqctl來打印messages_unacknowledged

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

 

 

 

 

=============Message durability消息持久化 =============

     我們已經學習了如何確定該

 =============Fair dispatch合理調度 =============

3.2小節我們介紹了

3.4

3.5

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

四 參考文檔

http://www.rabbitmq.com/tutorials/tutorial-one-python.html

https://www.ibm.com/developerworks/cn/cloud/library/1403_renmm_opestackrpc/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM