RabbitMQ介紹
一、RabbitMQ使用場景
RabbitMQ他是一個消息中間件,說道消息中間件【最主要的作用:信息的緩沖區】還是的從應用場景來看下:
1、系統集成與分布式系統的設計
各種子系統通過消息來對接,這種解決方案也逐步發展成一種架構風格,即“通過消息傳遞的架構”。
舉個例子:現在醫院有兩個科“看病科”和“住院科”在之前他們之間是沒有任何關系的,如果你在“看病課”看完病后注冊的信息和資料,到住院科后還得重新注冊一遍?那現在要改革,你看完病后可以直接去住院科那兩個系統之間需要打通怎么辦?這里就可以使用我們的消息中間件了。
2、異步任務處理結果回調的設計
舉個例子:記錄日志,假如需要記錄系統中所有的用戶行為日志,如果通過同步的方式記錄日志勢必會影響系統的響應速度,當我們將日志消息發送到消息隊列,記錄日志的子系統就會通過異步的方式去消費日志消息。這樣不需要同步的寫入日志了NICE
3、並發請求的壓力高可用性設計
舉個例子:比如電商的秒殺場景。當某一時刻應用服務器或數據庫服務器收到大量請求,將會出現系統宕機。如果能夠將請求轉發到消息隊列,再由服務器去消費這些消息將會使得請求變得平穩,提高系統的可用性。
二、RabbitMQ的介紹

從上面的目前常用的消息中間件來說,感覺Kafka更好些,沒錯Kafka是大數據時代誕生的消息中間件,但對於目前來說使用最廣的還是RabbitMQ 如果對Kafka感興趣的可以看下我的另一片博客:http://www.cnblogs.com/luotianshuai/p/5206662.html(非常基礎的想看深入的可以去官網)
**Advanced Message Queuing Protocol (高級消息隊列協議** The Advanced Message Queuing Protocol (AMQP):
是一個標准開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了通過網絡發送的字節流的數據格式。因此兼容性非常好,任何實現AMQP協議的程序都可以和與AMQP協議兼容的其他程序交互,可以很容易做到跨語言,跨平台。
三、RabbitMQ和一般的消息傳輸模式:隊列模式&主題模式區別
1、隊列模式

一個發布者發布消息,下面的接收者按隊列順序接收,比如發布了10個消息,兩個接收者A,B那就是A,B總共會收到10條消息,不重復。
2、主題模式

對於Topic模式,一個發布者發布消息,有兩個接收者A,B來訂閱,那么發布了10條消息,A,B各收到10條消息。
3、RabbitMQ的模式

生產者生產消息后不直接直接發到隊列中,而是發到一個交換空間:Exchange,Exchange會根據Exchange類型和Routing Key來決定發到哪個隊列中,這個講到發布訂閱在詳細來看
RabbitMQ安裝配置
一、環境安裝
#安裝epel源[EPEL (Extra Packages for Enterprise Linux,企業版Linux的額外軟件包) 是Fedora小組維護的一個軟件倉庫項目為RHEL/CentOS提供他們默認不提供的軟件.] rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-6.noarch.rpm # 擴展erlang源安裝最新的erlang rpm -Uvh http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm yum -y install erlang # 安裝RabbitMQ wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc yum install rabbitmq-server-3.6.1-1.noarch.rpm
各位看官如果想通過源碼安裝“搜狗一下就知道”這里不在過多的復述
二、用戶配置
# 啟動服務 /etc/init.d/rabbitmq-server start # 添加用戶 rabbitmqctl add_user admin admin # 添加管理員權限 rabbitmqctl set_user_tags admin administrator # 修改密碼 abbitmqctl add_user admin youpassword # 設置權限 rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
三、啟用WEB管理
# 啟動Web插件 rabbitmq-plugins enable rabbitmq_management # 刪除guest用戶 rabbitmqctl delete_user guest # 添加Web訪問權限 """注意:rabbitmq從3.3.0開始禁止使用guest/guest權限通過除localhost外的訪問。如果想使用guest/guest通過遠程機器訪問,需要在rabbitmq配置文件中(/etc/rabbitmq/rabbitmq.config)中設置loopback_users為[],配置文件不存在創建即可。""" # 添加配置 [{rabbit, [{loopback_users, ["admin"]}]}].
RabbitMQ的消息玩法-Python
0、各位看官附帶視頻講解連接有需要的可以看下
1、最簡單的玩法-生產者消費者
send.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 生產者/發送消息方 """ import pika # 遠程主機的RabbitMQ Server設置的用戶名密碼 credentials = pika.PlainCredentials('admin', 'admin') connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials)) """ A virtual host holds a bundle of exchanges, queues and bindings. Why would you want multiple virtual hosts? Easy. A username in RabbitMQ grants you access to a virtual host…in its entirety. So the only way to keep group A from accessing group B’s exchanges/queues/bindings/etc. is to create a virtual host for A and one for B. Every RabbitMQ server has a default virtual host named “/”. If that’s all you need, you’re ready to roll. virtualHost is used as a namespace for AMQP resources (default is \"/\"),so different applications could use multiple virtual hosts on the same AMQP server [root@localhost ~]# rabbitmqctl list_permissions Listing permissions in vhost "/" ... admin . . . guest .* .* .* ...done. ConnectionParameters 中的參數:virtual_host 注: 相當於在rabbitmq層面又加了一層域名空間的限制,每個域名空間是獨立的有自己的Echange/queues等 舉個好玩的例子Redis中的db0/1/2類似 """ # 創建通道 channel = connection.channel() # 聲明隊列hello,RabbitMQ的消息隊列機制如果隊列不存在那么數據將會被丟掉,下面我們聲明一個隊列如果不存在創建 channel.queue_declare(queue='hello') # 給隊列中添加消息 channel.publish(exchange="", routing_key="hello", body="Hello World") print("向隊列hello添加數據結束") # 緩沖區已經flush而且消息已經確認發送到了RabbitMQ中,關閉通道 channel.close()
receive.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 消費者/接收消息方 """ import pika # 遠程主機的RabbitMQ Server設置的用戶名密碼 credentials = pika.PlainCredentials('admin', 'admin') connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials)) # 創建通道 channel = connection.channel() # 聲明隊列 channel.queue_declare(queue='hello') """ 你可能會問為什么我們還要聲明隊列呢? 我們在之前代碼里就有了,但是前提是我們已經知道了我們已經聲明了代碼,但是我們可能不太確定 誰先啟動~ So如果你們100%確定也可以不用聲明,但是在很多情況下生產者和消費者都是分離的.所以聲明沒有壞處 """ # 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 定義通道消費者參數 channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 開始接收信息,並進入阻塞狀態,隊列里有信息才會調用callback進行處理。按ctrl+c退出。 channel.start_consuming()
2、升級點玩法-工作隊列
為了好理解還是把咱們的玩的東西應用到實際生活場景中比如:寫日志同步慢、或者請求量寫日志機器扛不住怎么辦?異步、並且分擔日志記錄壓力到多台服務器上.

類似上面的圖的效果:
Work Queue背后的主要思想是避免立即執行資源密集型任務的時,需要等待其他任務完成。所以我們把任務安排的晚一些,我們封裝一個任務到消息中並把它發送到隊列,一個進程運行在后端發送並最終執行這個工作,當你運行多個消費者的時候這個任務將在他們之間共享。
send.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 生產者/發送方 """ import sys import pika # 遠程主機的RabbitMQ Server設置的用戶名密碼 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials)) # 創建通道 channel = connection.channel() # 聲明隊列task_queue,RabbitMQ的消息隊列機制如果隊列不存在那么數據將會被丟掉,下面我們聲明一個隊列如果不存在創建 channel.queue_declare(queue='task_queue') # 在隊列中添加消息 for i in range(100): message = '%s Meassage '% i or "Hello World!" # 發送消息 channel.basic_publish(exchange='', routing_key='task_queue', body=message, ) # 發送消息結束,並關閉通道 print(" [x] Sent %r" % message) channel.close()
receive1.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 消費者/接收方 """ import time import pika # 認證信息 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 建立通道 channel = connection.channel() # 創建隊列 channel.queue_declare("task_queue") # 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的 def callback(ch, method, properties, body): print(" [x] Received %r" % body) print(body.count(b'.')) time.sleep(body.count(b'.')) print(" [x] Done") # 定義通道消費者參數 channel.basic_consume(callback, queue="task_queue", no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 開始接收信息,並進入阻塞狀態,隊列里有信息才會調用callback進行處理。按ctrl+c退出。 channel.start_consuming()
receive2.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 消費者/接收方 """ import time import pika # 認證信息 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 建立通道 channel = connection.channel() # 創建隊列 channel.queue_declare("task_queue") # 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的 def callback(ch, method, properties, body): print(" [x] Received %r" % body) print(body.count(b'.')) time.sleep(body.count(b'.')) print(" [x] Done") # 定義通道消費者參數 channel.basic_consume(callback, queue="task_queue", no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') # 開始接收信息,並進入阻塞狀態,隊列里有信息才會調用callback進行處理。按ctrl+c退出。 channel.start_consuming()
默認RabbitMQ按照順序發送每一個消息,每個消費者會獲得相同的數量消息,這種分發消息的方式稱之為循環。
3、持久化和公平分發
1、消息持久化
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息后發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這里不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。 這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多;消費者重啟后會重復消費這些消息並重復執行業務邏輯…千面是因為我們在消費者端標記了ACK=True關閉了它們,如果你沒有增加ACK=True或者沒有回執就會出現這個問題
生產者需要在發送消息的時候標注屬性為持久化
# 在隊列中添加消息 for i in range(100): message = '%s Meassage '% i or "Hello World!" # 發送消息 channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2, )) # 標記屬性消息為持久化消息需要客戶端應答 # 發送消息結束,並關閉通道 print(" [x] Sent %r" % message)
消費者需要發送消息回執
# 訂閱回調函數,這個訂閱回調函數是由pika庫來調用 def callback(ch, method, properties, body): """ :param ch: 通道對象 :param method: 消息方法 :param properties: :param body: 消息內容 :return: None """ print(" [x] Received %r" % (body,)) time.sleep(2) print(" [x] Done") # 發送消息確認,確認交易標識符 ch.basic_ack(delivery_tag=method.delivery_tag)
我們可以通過命令查看那些消費者沒有回復ack確認
# Linux rabbitmqctl list_queues name messages_ready messages_unacknowledged # Windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
2、隊列持久化
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由於這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務。 這里我們需要修改下生產者和消費者設置RabbitMQ消息的持久化**[生產者/消費者]都需要配置**
channel.queue_declare(queue='task_queue', durable=True) # 隊列持久化
3、公平分發
默認情況下RabitMQ會把隊列里面的消息立即發送到消費者,無論該消費者有多少消息沒有應答,也就是說即使發現消費者來不及處理,新的消費者加入進來也沒有辦法處理已經堆積的消息,因為那些消息已經被發送給老消費者了。類似下面的
在消費者中增加:`channel.basic_qos(prefetch_count=1)`
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到有消息ack。 這樣做的好處是,如果系統處於高峰期,消費者來不及處理,消息會堆積在隊列中,新啟動的消費者可以馬上從隊列中取到消息開始工作。

工作過程如下:
1. 消費者1接收到消息后處理完畢發送了ack並接收新的消息並處理
2. 消費者2接收到消息后處理完畢發送了ack並接收新的消息並處理
3. 消費者3接收到消息后一直處於消息中並沒有發送ack不在接收消息一直等到消費者3處理完畢后發送ACK后再接收新消息
發布與訂閱-高級玩法
發布與訂閱這里同樣是拿兩個好玩的實際例子:我們來寫一個日志系統
在前面我們學了work Queue它主要是把每個任務分給一個worker[工作者]接下來我們要玩些不同的,把消息發多個消費者(不同的隊列中). 這個模式稱之為“發布訂閱”
舉個例子我們將創建一個簡單的日志系統,包含兩個程序第一個是用來發送日志,第二個是用來接收日志,接收日志的程序每一個副本都將收到消息,**這樣我們可以一個接收器用來寫入磁盤,一個接收器用來輸入到日志~ cool~**
Exchanges可用的類型很少:**direct, topic, headers, fanout**---4種,我們先看最后一種
1、fanout模式
模式特點:
- 可以理解他是一個廣播模式
- 不需要routing key它的消息發送時通過Exchange binding進行路由的~~在這個模式下routing key失去作用
- 這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定
- 如果接收到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
send.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' import sys import time import pika # 遠程主機的RabbitMQ Server設置的用戶名密碼 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 創建通道 channel = connection.channel() # 聲明Exchanges channel.exchange_declare(exchange="logs", exchange_type="fanout") """ 這里可以看到我們建立連接后,就聲明了Exchange,因為把消息發送到一個不存在的Exchange是不允許的, 如果沒有消費者綁定這個,Exchange消息將會被丟棄這是可以的因為沒有消費者 """ # 添加消息 message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange="logs", # 將消息發送至Exchange為logs綁定的隊列中 routing_key="", body=message,) print(" [x] Sent %r" % message) # 關閉通道 connection.close()
receive.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' import pika # 連接RabbitMQ驗證信息 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 建立通道 channel = connection.channel() # Bindings Exchanges 接收消息 channel.exchange_declare(exchange='logs', exchange_type="fanout") # 使用隨機隊列/並標注消費者斷開連接后刪除隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # Queue 與 Exchanges綁定 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') # 定義回調函數這個回調函數將被pika庫調用 def callback(ch, method, properties, body): print(" [x] %r" % body) # 定義接收消息屬性 channel.basic_consume(callback, queue=queue_name, no_ack=True) # 開始接收消息 channel.start_consuming()
測試:那現在我就可以把信息分開來記錄了
現在如果你想把日志存入文件
python fanout_receive1.py > fanout_receive_log.txt
並且想再屏幕也輸出,在打開一個窗口
python fanout_receive1.py
~~ Cool~
概念解釋
binding
當我們創建了Exchanges和(QUEUE)隊列后,我們需要告訴Exchange發送到們的Queue隊列中,所需要需要把Exchange和隊列(Queue)進行綁定,
channel.queue_bind(exchange='logs', queue=result.method.queue)
2、Direct 模式
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue
1. 一般情況可以使用rabbitMQ自帶的Exchange:”” (該Exchange的名字為空字符串), 也可以自定義Exchange
2. 這種模式下不需要將Exchange進行任何綁定(bind)操作。當然也可以進行綁定。可以將不同的routing_key與不同的queue進行綁定,不同的queue與不同exchange進行綁定
3. 消息傳遞時需要一個“routing_key”
4. 如果消息中不存在routing_key中綁定的隊列名,則該消息會被拋棄。
如果一個exchange 聲明為direct,並且bind中指定了routing_key,那么發送消息時需要同時指明該exchange和routing_key.
簡而言之就是:生產者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送 消費者:訂閱Exchange並根據Exchange類型和binding key(bindings 中的routing key) ,如果生產者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。
老規矩還是通過實例來說:
在上面的文檔中我們創建了一個簡單的日志系統,我們把消息發給所有的訂閱者 在下面的內容中將把特定的消息發給特定的訂閱者,舉個例子來說,把error級別的報警寫如文件,並把所有的報警打印到屏幕中,進行了路由的規則類似下面的架構

這里也要注意一個routing key 是可以綁定多個隊列的

在上面我們已經創建過bindings了類似下面
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
消費者端:Bindings可以增加routing_key 這里不要和basic_publish中的參數弄混了,我們給它稱之為**binding key**
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
binding key的含義依賴於Exchange類型,fanout exchanges類型只是忽略了它
emit_log_direct.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 生產者/發送方 """ import pika import sys # 添加RabbitMQ Server端認證信息 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 創建通道 channel = connection.channel() # 聲明Exchange 且類型為 direct channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # 從調用參數中或去類型 severity = sys.argv[1] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' # 發送消息指定Exchange為direct_logs,routing_key 為調用參數獲取的值 channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) # 打印信息 print(" [x] Sent %r:%r" % (severity, message)) # 關閉通道 connection.close()
receive_logs_direct.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ 消費者/接收方 """ import pika import sys # 添加RabbitMQ Server端認證信息 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 創建通道 channel = connection.channel() # binding Exchange 為direct_logs, 且Exchange類型為direct channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # 生成隨機接收隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 接收監聽參數 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 從循環參數中獲取routing_key,並綁定 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') # 定義回調參數 def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) # 定義通道參數 channel.basic_consume(callback, queue=queue_name, no_ack=True) # 開始接收消息 channel.start_consuming()
測試接收error 寫入到文件
python receive_logs_direct.py error > logs_from_rabbit.log
測試接收所有級別的報警輸出值公屏
python receive_logs_direct.py info warning error
測試發送消息
python emit_log_direct.py error "Run. Run. Or it will explode."
3、topic類型
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。
topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,
它約定:
- routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key與routing key一樣也是句點號“. ”分隔的字符串
- binding key中可以存在兩種特殊字符“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
RPC
MQ本身是基於異步的消息處理,前面的示例中所有的生產者(P)將消息發送到RabbitMQ后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成后再進行下一步處理。這相當於RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。

1、RabbitMQ實現RPC機制是
- 客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14種properties,這些屬性會隨着消息一起發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成后將通知我的消息發送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
- 服務器端收到消息並處理
- 服務器端處理完消息后,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
- 客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息后,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行后續業務處理
雖然RPC在計算中是一個很常見的模式,但它經常受到批評。當程序員不知道函數調用是本地的還是慢RPC時,問題就出現了。這樣的混淆導致了不可預測的系統,並增加了調試的不必要的復雜性。
與簡化軟件不同,誤用的RPC可能導致無法維護的面代碼。 >
記住上面幾點問題,考慮下面幾個建議
- - 確保調用的函數是本地的還是遠程的
- - 記錄您的系統。明確組件之間的依賴關系
- - 處理錯誤案例。當RPC服務器關閉很長時間時,客戶端應該如何反應?
如果對RPC有很多疑問,如果可以的話最好使用異步管道 一般來說在RabbitMQ執行RPC是很簡單的,客戶端發送請求服務端響應消息,為了接收響應,客戶端需要發送一個“回調”隊列地址。
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
AMQP 0-9-1 協議預先定義了14種屬性,除了以下幾種之外常用其他的不經常用
- - delivery_mode: 標記消息持久化,值為2的時候為持久化其他任何值都是瞬態的
- - content_type: 用來描述mime-type編碼,舉個例子來說經常使用JSON編碼的話將此屬性設置為:application/json.
- - reply_to:通常用於命名回調隊列
- - correlation_id:將RCP請求和響應進行關聯的id
Correlation id
上面的描述中我們建議為每個RPC創建一個隊列但是這個很低效,幸運的是我們有更好的辦法:為每個客戶端創建一個隊列 但是這就會觸發一個新的問題,我們不確定這個消息是哪個返回的這里就用到了 Correlation id 我們給每個請求設置一個唯一值,最后當我們收到消息在這個callback Queue中,我們查看這個屬性和請求的屬性(Correlation_id)進行匹配如果沒有匹配上,我們將拒絕這個消息它不是我們的~
您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是錯誤地失敗呢?這是由於服務器端可能出現競態條件。雖然不太可能,但是RPC服務器可能在發送了答案后才會死亡,但在發送請求消息之前。
如果發生這種情況,重新啟動的RPC服務器將再次處理此請求。這就是為什么在客戶端我們必須優雅地處理重復的響應,而RPC應該是冪等()的。
RPC冪等性:
- f(x)=f(f(x))
- 如果消息具有操作冪等性,也就是一個消息被應用多次與應用一次產生的效果是一樣的
2、工作機制
- 當客戶端啟動會創建一個匿名回調隊列
- 在RCP請求Client發送兩個屬性:reply_to 標記callback隊列,correlation_id 每個請求的唯一值,這個請求是被發送到rpc_queue 中
- 這個RPC worker (aka: server)等待請求在這個rpc_queue中,當請求出現它執行任務並將結果返回給客戶端,發送到那個隊列呢?就是我們reply_to標記的隊列,客戶端等待數據返回在這個callback隊列,當消息出現 檢查correlation_id 屬性是否是和請求的如果匹配進行相應
rpc_server.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ RPC/Server端 """ import pika # 添加認證信息 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 添加一個通道 channel = connection.channel() # 添加一個隊列,這個隊列在Server就是我們監聽請求的隊列 channel.queue_declare(queue='rpc_queue') # 斐波那契計算 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) # 應答函數,它是我們接受到消息后如處理的函數替代原來的callback def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', # 返回的隊列,從屬性的reply_to取出來 routing_key=props.reply_to, # 添加correlation_id,和Client進行一致性匹配使用的 properties=pika.BasicProperties(correlation_id=props.correlation_id), # 相應消息/我們的處理結果 body=str(response)) # 增加消息回執 ch.basic_ack(delivery_tag=method.delivery_tag) # 每次預處理的請求個數/這個請求我沒有處理完不要給我發了 channel.basic_qos(prefetch_count=1) # 定義接收通道的屬性/定義了callback方法,接收的隊列,返回的隊列在哪里?on_request 的routing_key=props.reply_to channel.basic_consume(on_request, queue='rpc_queue') # 開始接收消息 print(" [x] Awaiting RPC requests") channel.start_consuming()
這個Server端的代碼是非常簡單的
- 往常一樣我們先創建連接並聲明隊列 - 我們聲明我們的斐波那契額函數,它只假設有效的正整數輸入。(不要期望這個能夠為大數據工作,它可能是最慢的遞歸實現)。
- 我們聲明了callback在basic_consume,它是RCP核心請求過來執行callback,它來工作並發送相應
- 我們可能希望運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要設置prefetch_count設置
rpc_client.py
# !/usr/bin/env python3.5 # -*- coding:utf-8 -*- # __author__ == 'LuoTianShuai' """ RPC/Client端 """ import pika import uuid # 定義菲波那切數列RPC Client類調用RPC Server class FibonacciRpcClient(object): def __init__(self): # 添加認證信息 credentials = pika.PlainCredentials("admin", "admin") self.connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials)) # 添加一個通道 self.channel = self.connection.channel() # 生成一個隨機隊列-定義callback回調隊列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 定義回調的通道屬性 self.channel.basic_consume(self.on_response, # 回調結果執行完執行的Client端的callback方法 no_ack=True, queue=self.callback_queue) # 這里注意我們並沒有直接阻塞的開始接收消息了 # Client端 Callback方法 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: # 定義了self.response = body self.response = body def call(self, n): # 定義了一個普通字段 self.response = None # 生成了一個uuid self.corr_id = str(uuid.uuid4()) # 發送一個消息 self.channel.basic_publish(exchange='', # 使用默認的Exchange,根據發送的routing_key來選擇隊列 routing_key='rpc_queue', # 消息發送到rpc_queue隊列中 # 定義屬性 properties=pika.BasicProperties( reply_to=self.callback_queue, # Client端定義了回調消息的callback隊列 correlation_id=self.corr_id, # 唯一值用來做什么的?request和callback 匹配用 ), body=str(n)) # 開始循環 我們剛才定義self.response=None當不為空的時候停止` while self.response is None: # 非阻塞的接受消息 self.connection.process_data_events(time_limit=3) return int(self.response) # 實例化對象 fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") # 發送請求計算菲波那切數列 30 response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
- 我們建立一個連接,通道,並聲明一個排他的“回調”隊列作為回復
- 我們訂閱這個callback隊列,所以我們可以接收RPC Server消息
- 每個相應回來之后執行非常簡單的on_response方法,每個相應來后檢查下correlation_id是否是匹配的,保存相應並退出接收循環
- 接下來,我們定義主調用方法——它執行實際的RPC請求。
- 我們定義了uuid,並在消息發送時添加了兩個屬性:reply_to、correlation_id
概念匯總
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。
ConnectionFactory為Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
exchange
實際上生產者不會直接把消息放到消息隊列里,實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)
queue
Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,用下圖表示。
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息並消費。
Message acknowledgment
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息后發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。 這里不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。
**這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多;消費者重啟后會重復消費這些消息並重復執行業務邏輯…**
Message durability
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由於這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務。
Prefetch count
生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。 在Exchange Type與binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。 RabbitMQ為routing key設定的長度限制為255 bytes。
Binding
RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。 在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key;生產者將消息發送給Exchange時,一般會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。
在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。 binding key 並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout類型的Exchange就會無視binding key,而是將消息路由到所有綁定到該Exchange的Queue。
視頻講解
