消息隊列rabbitmq


為什么用消息隊列

舉例

比如在一個企業里,技術老大接到boss的任務,技術老大把這個任務拆分成多個小任務,完成所有的小任務就算搞定整個任務了。
那么在執行這些小任務的時候,可能有一個環節很費時間,並且優先級很低,推遲完成也不影響整個任務運轉,那么技術老大就會將這個很費時間,且不重要的任務,丟給他的小弟去解決,自己繼續完成其他任務。

轉化為計算機思想

那個技術老大就是一個 程序系統,那個小弟就是消息隊列。
當程序系統發現某些任務耗費時間且優先級較低,遲點完成也不影響整個任務,就把這個任務丟給消息隊列。

場景

在程序系統中,例如外賣系統,訂單系統,庫存系統,優先級較高
發紅包,發郵件,發短信,app消息推送等任務優先級很低,很適合交給消息隊列去處理,以便於程序系統更快的處理其他請求。

消息隊列工作流程

消息隊列一般有三個角色:
隊列服務端
隊列生產者
隊列消費者
消息隊列工作流程就如同一個流水線,有產品加工,一個輸送帶,一個打包產品
輸送帶就是 不停運轉的消息隊列服務端
加工產品的就是 隊列生產者
在傳輸帶結尾打包產品的 就是隊列消費者

隊列產品

RabbitMQ Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等。 ZeroMQ saltstack軟件使用此消息,速度最快。 Redis key-value的系統,也支持隊列數據結構,輕量級消息隊列 Kafka 由Scala編寫,目標是為處理實時數據提供一個統一、高通量、低等待的平台

一個app系統消息隊列工作流程

消費者,一個后台進程,不斷的去檢測消息隊列中是否有消息,有消息就取走,開啟新線程去處理業務,如果沒有一會再來

消息隊列作用

1)程序解耦

允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2)冗余:

消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。

許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

3)峰值處理能力:

(大白話,就是本來公司業務只需要5台機器,但是臨時的秒殺活動,5台機器肯定受不了這個壓力,我們又不可能將整體服務器架構提升到10台,那在秒殺活動后,機器不就浪費了嗎?因此引入消息隊列)

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。

如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。

使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

4)可恢復性:

系統的一部分組件失效時,不會影響到整個系統。

消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

5)順序保證:

在大多使用場景下,數據處理的順序都很重要。

大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)

6)緩沖:

有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

7)異步通信:

很多時候,用戶不想也不需要立即處理消息。比如發紅包,發短信等流程。

消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

 

rabbitMQ

1. 你了解的消息隊列

生活里的消息隊列,如同郵局的郵箱,
如果沒郵箱的話,
郵件必須找到郵件那個人,遞給他,才玩完成,那這個任務會處理的很麻煩,很慢,效率很低

但是如果有了郵箱,
郵件直接丟給郵箱,用戶只需要去郵箱里面去找,有沒有郵件,有就拿走,沒有就下次再來,這樣可以極大的提升郵件收發效率!

 

rabbitmq是一個消息代理,它接收和轉發消息,可以理解為是生活的郵局。
你可以將郵件放在郵箱里,你可以確定有郵遞員會發送郵件給收件人。
概括:
rabbitmq是接收,存儲,轉發數據的。
官方教程:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

2. 公司在什么情況下會用消息隊列?

1.電商訂單

想必同學們都點過外賣,點擊下單后的業務邏輯可能包括:檢查庫存、生成單據、發紅包、短信通知等,如果這些業務同步執行,完成下單率會非常低,如發紅包,短信通知等不必要的流程,異步執行即可

此時使用MQ,可以在核心流程(扣減庫存、生成訂單記錄)等完成后發送消息到MQ,快速結束本次流程。消費者拉取MQ消息時,發現紅包、短信等消息時,再進行處理

場景:雙11是購物狂節,用戶下單后,訂單系統需要通知庫存系統,傳統的做法就是訂單系統調用庫存系統的接口

 

這種做法有一個缺點:

  • 當庫存系統出現故障時,訂單就會失敗。(這樣馬雲將少賺好多好多錢錢。。。。)

  • 訂單系統和庫存系統高耦合.

引入消息隊列

 

  • 訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

  • 庫存系統:訂閱下單的消息,獲取下單消息,進行庫操作 就算庫存系統出現故障,消息隊列也能保證消息的可靠投遞,不會導致消息丟失(馬雲這下高興了,鈔票快快的來呀~~).

2.秒殺活動

流量削峰一般在秒殺活動中應用廣泛 場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入消息隊列。 作用: 1.可以控制活動人數,超過此一定閥值的訂單直接丟棄(怪不得我一次秒殺都沒搶到過。。。。。wtf???)

2.可以緩解短時間的高流量壓垮應用(應用程序按自己的最大處理能力獲取訂單)

 

 

3.用戶的請求,服務器接收到之后,寫入消息隊列,超過定義的閾值就直接丟棄請求,或者跳轉錯誤頁面

4.業務系統取出隊列中的消息,再做后續處理。

 

3. rabbitMQ安裝

rabbitmq-server服務端

1.下載centos源
wget -O /etc/yum.repos.d/CentOS-Base.repo   http://mirrors.cloud.tencent.com/repo/centos7_base.repo
2.下載epel源
wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo
3.清空yum緩存並且生成新的yum緩存
yum clean all
yum makecache
4.安裝erlang
   $ yum -y install erlang
5.安裝RabbitMQ
   $ yum -y install rabbitmq-server
6.啟動(無用戶名密碼):
    systemctl start/stop/restart/status rabbitmq-server
​
設置rabbitmq賬號密碼,以及角色權限設置

# 設置新用戶yugo 密碼123
sudo rabbitmqctl add_user yugo 123
​
# 設置用戶為administrator角色
sudo rabbitmqctl set_user_tags yugo administrator
​
# 設置權限,允許對所有的隊列都有權限
對何種資源具有配置、寫、讀的權限通過正則表達式來匹配,具體命令如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
sudo rabbitmqctl set_permissions
-p "/" yugo ".*" ".*" ".*" ​ #重啟服務生效設置 service rabbitmq-server start/stop/restart rabbitmq相關命令 // 新建用戶 rabbitmqctl add_user {用戶名} {密碼} ​ // 設置權限 rabbitmqctl set_user_tags {用戶名} {權限} ​ // 查看用戶列表 rabbitmqctl list_users ​ // 為用戶授權 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost>// 刪除用戶 rabbitmqctl delete_user Username ​ // 修改用戶的密碼 rabbitmqctl change_password Username Newpassword // 刪除 Virtual Hosts : rabbitmqctl delete_vhost <vhost> // 添加 Users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 刪除 Users : delete_user <username>// 使用戶user1具有vhost1這個virtual host中所有資源的配置、寫、讀權限以便管理其中的資源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*'// 查看權限 rabbitmqctl list_user_permissions user1 ​ rabbitmqctl list_permissions -p vhost1 ​ // 清除權限 rabbitmqctl clear_permissions [-p VHostPath] User ​ //清空隊列步驟 rabbitmqctl reset 需要提前關閉應用rabbitmqctl stop_app , 然后再清空隊列,啟動應用 rabbitmqctl start_app 此時查看隊列rabbitmqctl list_queues ​ 查看所有的exchange: rabbitmqctl list_exchanges 查看所有的queue: rabbitmqctl list_queues 查看所有的用戶: rabbitmqctl list_users 查看所有的綁定(exchange和queue的綁定信息): rabbitmqctl list_bindings 查看消息確認信息: rabbitmqctl list_queues name messages_ready messages_unacknowledged 查看RabbitMQ狀態,包括版本號等信息:rabbitmqctl status

#開啟web界面rabbitmq
rabbitmq-plugins enable rabbitmq_management

#訪問web界面
http://server-name:15672/

RabbitMQ組件解釋

AMQP

AMQP協議是一個高級抽象層消息通信協議,RabbitMQ是AMQP協議的實現。它主要包括以下組件:

1.Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程。

 
         

2.Virtual Host:其實是一個虛擬概念,類似於權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權限控制的最小粒度是Virtual Host

 
         

3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。

 
         

4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。

 
         

5.Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等。而Body是真正需要傳輸的APP數據。

 
         

6.Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由Exchange Type決定。 

 
         

7.Connection:連接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連接。

 
         

8.Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,但是建議盡量共用Connection。

 
         

9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發送消息,txSelect開啟一個事務,txCommit提交一個事務。

python客戶端

// rabbitmq官方推薦的python客戶端pika模塊
pip3 install pika

應用場景1:單發送單接收

生產-消費者模型

P 是生產者
C 是消費者
中間hello是消息隊列
可以有多個P、多個C

P發送消息給hello隊列,C消費者從隊列中獲取消息,默認輪詢方式

 

 

生產者send.py

我們的第一個程序send.py將向隊列發送一條消息。我們需要做的第一件事是建立與RabbitMQ服務器的連接。
#!/usr/bin/env python
import pika
# 創建憑證,使用rabbitmq用戶密碼登錄
# 去郵局取郵件,必須得驗證身份
credentials = pika.PlainCredentials("s14","123")
# 新建連接,這里localhost可以更換為服務器ip
# 找到這個郵局,等於連接上服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
# 創建頻道
# 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個連接
channel = connection.channel()
# 聲明一個隊列,用於接收消息,隊列名字叫“水許傳”
channel.queue_declare(queue='水許傳')
# 注意在rabbitmq中,消息想要發送給隊列,必須經過交換(exchange),初學可以使用空字符串交換(exchange=''),它允許我們精確的指定發送給哪個隊列(routing_key=''),參數body值發送的數據
channel.basic_publish(exchange='',
                      routing_key='水許傳',
                      body='武松又去打老虎啦2')
print("已經發送了消息")
# 程序退出前,確保刷新網絡緩沖以及消息發送給rabbitmq,需要關閉本次連接
connection.close()

可以同時存在多個接受者,等待接收隊列的消息,默認是輪訓方式分配消息

接受者receive.py,可以運行多次,運行多個消費者

import pika
# 建立與rabbitmq的連接
credentials = pika.PlainCredentials("s14","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="水許傳")

def callbak(ch,method,properties,body):
    print("消費者接收到了任務:%r"%body.decode("utf8"))
# 有消息來臨,立即執行callbak,沒有消息則夯住,等待消息
# 老百姓開始去郵箱取郵件啦,隊列名字是水許傳
channel.basic_consume(callbak,queue="水許傳",no_ack=True)
# 開始消費,接收消息
channel.start_consuming()

  

練習:

分別啟動生產者、兩個消費者,往隊列發送數據,查看消費者的結果

應用場景2:單發送多接收

使用場景:一個發送端,多個接收端,如分布式的任務派發。為了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時為了防止接收端在處理消息時down掉,只有在消息處理完成后才發送ack消息。

rabbitmq消息確認之ack

官網資料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html

默認情況下,生產者發送數據給隊列,消費者取出消息后,數據將被清除。

特殊情況,如果消費者處理過程中,出現錯誤,數據處理沒有完成,那么這段數據將從隊列丟失

no-ack機制

不確認機制也就是說每次消費者接收到數據后,不管是否處理完畢,rabbitmq-server都會把這個消息標記完成,從隊列中刪除

ACK機制

ACK機制用於保證消費者如果拿了隊列的消息,客戶端處理時出錯了,那么隊列中仍然還存在這個消息,提供下一位消費者繼續取

流程

1.生產者無須變動,發送消息
2.消費者如果no_ack=True啊,數據消費后如果出錯就會丟失
反之no_ack=False,數據消費如果出錯,數據也不會丟失

3.ack機制在消費者代碼中演示

生產者.py 只負責發送數據即可,無須變動

#!/usr/bin/env python
import pika
# 創建憑證,使用rabbitmq用戶密碼登錄
# 去郵局取郵件,必須得驗證身份
credentials = pika.PlainCredentials("s14","123")
# 新建連接,這里localhost可以更換為服務器ip
# 找到這個郵局,等於連接上服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
# 創建頻道
# 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個連接
channel = connection.channel()
# 新建一個hello隊列,用於接收消息
# 這個郵箱可以收發各個班級的郵件,通過
channel.queue_declare(queue='金品沒')
# 注意在rabbitmq中,消息想要發送給隊列,必須經過交換(exchange),初學可以使用空字符串交換(exchange=''),它允許我們精確的指定發送給哪個隊列(routing_key=''),參數body值發送的數據
channel.basic_publish(exchange='',
                      routing_key='金品沒',
                      body='潘金蓮又出去。。。')
print("已經發送了消息")
# 程序退出前,確保刷新網絡緩沖以及消息發送給rabbitmq,需要關閉本次連接
connection.close()

消費者.py給與ack回復

拿到消息必須給rabbitmq服務端回復ack信息,否則消息不會被刪除,防止客戶端出錯,數據丟失

import pika

credentials = pika.PlainCredentials("s14","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
channel = connection.channel()

# 聲明一個隊列(創建一個隊列)
channel.queue_declare(queue='金品沒')

def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body.decode("utf-8"))
    # int('asdfasdf')
    # 我告訴rabbitmq服務端,我已經取走了消息
    # 回復方式在這
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 關閉no_ack,代表給與服務端ack回復,確認給與回復
channel.basic_consume(callback,queue='金品沒',no_ack=False)

channel.start_consuming()

消息持久化

演示
1
.執行生產者,向隊列寫入數據,產生一個新隊列queue 2.重啟服務端,隊列丟失

3.開啟生產者數據持久化后,重啟rabbitmq,隊列不丟失
4.依舊可以讀取數據

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢——消息持久化。 為了保證RabbitMQ在退出或者crash等異常情況下數據沒有丟失,需要將queue,exchange和Message都持久化。

生產者.py

import pika
# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61'))
# 有密碼
credentials = pika.PlainCredentials("s14","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
channel = connection.channel()
# 聲明一個隊列(創建一個隊列)
# 默認此隊列不支持持久化,如果服務掛掉,數據丟失
# durable=True 開啟持久化,必須新開啟一個隊列,原本的隊列已經不支持持久化了
'''
實現rabbitmq持久化條件
 delivery_mode=2
使用durable=True聲明queue是持久化
 
'''
channel.queue_declare(queue='LOL',durable=True)
channel.basic_publish(exchange='',
                      routing_key='LOL', # 消息隊列名稱
                      body='德瑪西亞萬歲',
                      # 支持數據持久化
                      properties=pika.BasicProperties(
                          delivery_mode=2,#代表消息是持久的  2
                      )
                      )
connection.close()

消費者.py

import pika
credentials = pika.PlainCredentials("s14","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
channel = connection.channel()
# 確保隊列持久化
channel.queue_declare(queue='LOL',durable=True)

'''
必須確保給與服務端消息回復,代表我已經消費了數據,否則數據一直持久化,不會消失
'''
def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body.decode("utf-8"))
    # 模擬代碼報錯
    # int('asdfasdf')    # 此處報錯,沒有給予回復,保證客戶端掛掉,數據不丟失
   
    # 告訴服務端,我已經取走了數據,否則數據一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 關閉no_ack,代表給與回復確認
channel.basic_consume(callback,queue='LOL',no_ack=False)
channel.start_consuming()

 

Exchange模型

rabbitmq發送消息首先是發給exchange,然后再通過exchange發送消息給隊列(queue)

exchange有四種模式

fanout

exchange將消息發送給和該exchange連接的所有queue;也就是所謂的廣播模式;此模式下忽略routing_key;

direct

路由模式,通過routing_key將消息發送給對應的queue; 如下面這句即可設置exchange為direct模式,只有routing_key為“black”時才將其發送到隊列queue_name; channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')

 

在上圖中,Q1和Q2可以綁定同一個key,如綁定routing_key=‘KeySame’,那么收到routing_key為KeySame的消息時將會同時發送給Q1和Q2,退化為廣播模式;

top

topic模式類似於direct模式,只是其中的routing_key變成了一個有“.”分隔的字符串,“.”將字符串分割成幾個單詞,每個單詞代表一個條件;

headers

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

官方教程:http://www.rabbitmq.com/tutorials/tutorial-three-python.html

 

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

# fanout所有的隊列放一份/給某些隊列發
# 傳送消息的模式
# 與exchange有關的模式都發
exchange_type = fanout

消費者_訂閱.py

可以運行多次,運行多個消費者,等待消息

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘書)的名稱
# exchange_type='fanout' , 秘書工作方式將消息發送給所有的隊列
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 隨機生成一個隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='m1',queue=queue_name)


def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

生產者_發布者.py

# -*- coding: utf-8 -*-
# __author__ = "yugo"


import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# 指定exchange
channel.exchange_declare(exchange='m1',exchange_type='fanout')

channel.basic_publish(exchange='m1',
                    routing_key='',# 這里不再指定隊列,由exchange分配,如果是fanout模式,每一個隊列放一份
                    body='haohaio')

connection.close()

實例

1.可以運行多個消費者,相當於有多個滴滴司機,等待着Exchange同一個電台發消息
2.運行發布者,發送消息給Exchange,查看是否給所有的隊列(滴滴司機)發送了消息

關鍵字發布Exchange

之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

 

消費者1.py

路由關鍵字是sb,alex

# -*- coding: utf-8 -*-
# __author__ = "maple"
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘書)的名稱
# exchange_type='fanout' , 秘書工作方式將消息發送給所有的隊列
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 隨機生成一個隊列,隊列退出時,刪除這個隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定,只要
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='alex')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb')


def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()


消費者2.py

路由關鍵字sb

# -*- coding: utf-8 -*-
# __author__ = "maple"
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘書)的名稱
# exchange_type='fanout' , 秘書工作方式將消息發送給所有的隊列
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 隨機生成一個隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb')


def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

生產者.py

發送消息給匹配的路由,sb或者alex

# -*- coding: utf-8 -*-
# __author__ = "yugo"


import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# 路由模式的交換機會發送給綁定的key和routing_key匹配的隊列
channel.exchange_declare(exchange='m2',exchange_type='direct')
# 發送消息,給有關sb的路由關鍵字
channel.basic_publish(exchange='m2',
                    routing_key='sb',
                    body='aaaalexlaolelaodi')

connection.close()

RPC之遠程過程調用

將一個函數運行在遠程計算機上並且等待獲取那里的結果,這個稱作遠程過程調用(Remote Procedure Call)或者 RPC。

RPC是一個計算機通信協議。

比喻

將計算機服務運行理解為廚師做飯,廚師想做一個小蔥拌豆腐,廚師需要洗小蔥、切豆腐、調汁、涼拌。他一個人完成所有的事,如同古老的集中式應用,一台計算機做所有的事。

制作小蔥拌豆腐{
  廚師>洗小蔥>切豆腐>涼拌
}

rpc應用場景

而如今,飯店做大了,有錢了,專職分工來干活,不再是廚師單打獨斗,備菜師傅准備小蔥、豆腐,切菜師傅切小蔥、豆腐,廚師只負責調味,完成食品。

制作小蔥拌豆腐{
  備菜師>洗菜
  切菜師>切菜
  廚師>調味
}

此時一件事好多人在做,廚師就得和其他人溝通,通知備菜、洗菜師傅的這個動作就是遠程過程調用(RPC)。

這個過程在計算機系統中,一個電商的下單過程,涉及物流、支付、庫存、紅包等多個系統,多個系統又在多個服務器上,由不同的技術團隊負責,整個下單過程,需要所有團隊進行遠程調用。

下單{
  庫存>減少庫存
  支付>扣款
  紅包>減免紅包
  物流>生成訂單
}

到底什么是rpc

rpc指的是在計算機A上的進程,調用另外一台計算機B的進程,A上的進程被掛起,B上的被調用進程開始執行后,產生返回值給A,A繼續執行。
調用方可以通過參數將信息傳遞給被調用方,而后通過返回結果得到信息,這個過程對於開發人員來說是透明的。

如同廚師一樣,服務員把菜單給后廚,廚師告訴洗菜人,備菜人,開始工作,完成工作后,整個過程對於服務員是透明的,他完全不用管后廚是怎么把菜做好的。

由於服務在不同的機器上,遠程調用必經網絡通信,調用服務必須寫一坨網絡通信代碼,很容易出錯且很復雜,因此就出現了RPC框架。

阿里巴巴的 Dubbo  java
新浪的 Motan java
谷歌的 gRPC 多語言
Apache thrift 多語言

rpc封裝了數據的序列化,反序列化,以及傳輸協議

 

python實現RPC

利用RabbitMQ構建一個RPC系統,包含了客戶端和RPC服務器,依舊使用pika模塊

Callback queue 回調隊列

一個客戶端向服務器發送請求,服務器端處理請求后,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那么客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to

Correlation id 關聯標識

一個客戶端可能會發送多個請求給服務器,當服務器處理完后,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就可以分辨此響應屬於哪個請求。

客戶端發送請求:某個應用將請求信息交給客戶端,然后客戶端發送RPC請求,在發送RPC請求到RPC請求隊列時,客戶端至少發送帶有reply_to以及correlation_id兩個屬性的信息

服務器端工作流: 等待接受客戶端發來RPC請求,當請求出現的時候,服務器從RPC請求隊列中取出請求,然后處理后,將響應發送到reply_to指定的回調隊列中

客戶端接受處理結果: 客戶端等待回調隊列中出現響應,當響應出現時,它會根據響應中correlation_id字段的值,將其返回給對應的應用

過程
1.啟動rpc客戶端,等待接收數據到來,來了之后就進行處理,再將結果丟進隊列
2.啟動rpc服務端,發起請求

 

 

rpc_server.py

 
         
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
# 客戶端啟動時,創建回調隊列,會開啟會話用於發送RPC請求以及接受響應
# 建立連接,指定服務器的ip地址
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.119.10'))
# 建立一個會話,每個channel代表一個會話任務
self.channel = self.connection.channel()

# 聲明回調隊列,再次聲明的原因是,服務器和客戶端可能先后開啟,該聲明是冪等的,多次聲明,但只生效一次
#exclusive=True 參數是指只對首次聲明它的連接可見
#exclusive=True 會在連接斷開的時候,自動刪除
result = self.channel.queue_declare(exclusive=True)
# 將次隊列指定為當前客戶端的回調隊列
self.callback_queue = result.method.queue
# 客戶端訂閱回調隊列,當回調隊列中有響應時,調用`on_response`方法對響應進行處理;
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)


# 對回調隊列中的響應進行處理的函數
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

# 發出RPC請求
# 例如這里服務端就是一個切菜師傅,菜切好了,需要傳遞給洗菜師傅,這個過程是發送rpc請求
def call(self, n):
# 初始化 response
self.response = None
# 生成correlation_id 關聯標識,通過python的uuid庫,生成全局唯一標識ID,保證時間空間唯一性
self.corr_id = str(uuid.uuid4())
# 發送RPC請求內容到RPC請求隊列`s14rpc`,同時發送的還有`reply_to`和`correlation_id`
self.channel.basic_publish(exchange='',
routing_key='s14rpc',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)

# 建立客戶端
fibonacci_rpc = FibonacciRpcClient()

# 發送RPC請求,丟進rpc隊列,等待客戶端處理完畢,給與響應
print("發送了請求sum(99)")
response = fibonacci_rpc.call(99)

print("得到遠程結果響應%r" % response)
 

 




rpc_client.py

 
         
import pika
# 建立連接,服務器地址為localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.119.10'))
# 建立會話
channel = connection.channel()
# 聲明RPC請求隊列
channel.queue_declare(queue='s14rpc')

# 模擬一個進程,例如切菜師傅,等着洗菜師傅傳遞數據
def sum(n):
n+=100
return n
# 對RPC請求隊列中的請求進行處理


def on_request(ch, method, props, body):
print(body,type(body))
n = int(body)
print(" 正在處理sum(%s)" % n)
# 調用數據處理方法
response = sum(n)
# 將處理結果(響應)發送到回調隊列
ch.basic_publish(exchange='',
# reply_to代表回復目標
routing_key=props.reply_to,
# correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)

# 負載均衡,同一時刻發送給該服務器的請求不超過一個
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='s14rpc')
print("等待接收rpc請求")


#開始消費
channel.start_consuming()
 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM