Python之RabbitMQ的使用


今天總結一下Python關於Rabbitmq的使用

  RabbitMQ官網說明,其實也是一種隊列,那和前面說的線程queue和進程queue有什么區別呢?

    線程queue只能在同一個進程下進行數據交互

    進程queue只能在父進程和子進程之間,或者同一父進程下的子進程之間做數據交互

    如果需要對不同進程(eg:微信和qq)兩個獨立的程序間通信

方法1就是直接把數據寫在硬盤(disk)上然后各自的進程讀取數據就可以,但是由於硬盤的讀寫速度太慢,效率太低

方法2自己寫個socket,直接做數據交互,問題是如果改變程序,或者再加一個程序,需要對寫好的socket進行修改,還要處理黏包什么的復雜的連接關系,維護成本太高。

方法3,利用已有的中間商(代理)。這個broker其實就是封裝好的socket,我們拿來直接用就好了。

   這里的broker,就有RabbitMQ,ZeroMQ,ActiveMQ等等等等。

一.安裝及環境配置

    windows的安裝和配置方法較為簡單,直接安裝就好了

    Rabbit支持多種語言: Java, .NET, PHP, Python, JavaScript, Ruby, Go這些常用語言都支持


如圖所示,python操作RabbitMQ需要的模塊有上述幾種選擇,我們用最簡單的pika,用pip直接安裝

pip install pika

二.RabbitMQ的使用

這里所有的用法都是基於RabbitMQ是工作在‘localhost’上,並且端口號為15672,能在瀏覽器里訪問http://localhost:15672這個地址。

1.消息分發(基礎版)

這就是RabbitMQ最簡單的工作模式,p為生產者(Producer),生產者發送message給queue,queue再把消息發送至消費者c(Customer)

先看看生產者至隊列(send)這個過程

import pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel()

我們先建立了一個鏈接,然后就需要定義一個隊列,隊列的名字就暫時定位‘hello'

channel.queue_declare(queue='hello')

在RabbitMQ里消息並不能直接發送給隊列,所有的信息發送都要通過一個exchange,但是這里我們先把這個exchange定義成一個空的字符串,后面在將他的具體用法

channel.basic_publish(exchange='', routing_key='hello', body='123')

在發送確認完成后,可以將連接關閉

connect.close()

這就是send端的代碼

import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='123')
print("[x] Sent 'hello world!'")
connect.close()
RabbitMQ_basic_producer

 運行了send代碼后我們可以在terminal里RabbitMQ安裝目錄下sbin文件夾里查看一下消息隊列

rabbitmqctl.bat list_queues 

如果是Linux命令為

sudo rabbitmqctl list_queues

這里就說明了隊列信息和消息狀態。

 

然后再看一下消費者這一端的代碼是什么樣的,同樣,先要建立連接並定義好隊列名

connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello')

這里可能有個疑問:我們不是在生產者里已經定義了隊列名嗎?為什么在消費者里還要定義呢?

因為在實際工作中,我們並不能確定是生產者還是消費者先一步運行,如果隊列名沒有定義的話運行時候是會報錯的。下面就是對消息的處理

def callback(ch,method,properties,body): print("[x] Received %r"%body) channel.basic_consume(callback, queue='hello', no_ack=True)

當消息來臨時,消費者會執行回調函數callback。這里的callback就是直接打印消息內容(body)。

回調函數另外的幾個參數:chconne.channel的內存對象地址,

<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 62145, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>

method是包含了發送的信息

<Basic.Deliver(['consumer_tag=ctag1.9ae48c906b014a83a512413c0e6f9ef8', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>

 properties我們以后再了解。

2.公平分發(workqueue)

  在這種結構里,我們要考慮到這樣一種情況:有多個消費者,消費者在得到消息時需要對消息進行處理,並且有可能處理消息所消耗的時間是不同的

。這里我們用的queue叫做workqueue

  為了模擬消費者對消息進行處理的過程,我們用time.sleep()做一個消耗時間的過程。消息的產生和接收是這樣的

message = ' '.join(sys.argv[1:]) or "Hello World!"
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done")

這里插播一條''.join(sys.argv[1:])的作用:就是把在shell里輸入的指令后跟的代碼加在message里。消費者得到消息后,數消息里有幾個“.”,sleep相應的秒數。

 放出第一版的代碼

import pika,sys
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
message = ' '.join(sys.argv[1:]) or "Hello World."
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print('send %s'%message)
RabbitMQ生產者
import time,pika

connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')

print(' [*] Waiting for messages. To exit press CTRL+C')



def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')
channel.start_consuming()
RabbbitMQ消費者

 

 這時候,我們可以多啟動幾個消費者,再用生產者發送消息,看看效果!

 

可以發現,消息是被公平的依次被分發給各個消費者的(Fair dispatch),這種分發的方式叫輪詢。

消息確認message acknowledgments  

現在考慮下這種情形:消費者在處理消息時需要較長的時間,在這時把這個消費者kill掉,正在處理的消息和已經接收但未被處理的消息就丟失了。這應該是不允許的,我們可不希望有數據丟失,就需要將這些任務重新發送給其他正常工作的消費者。

為了保證任務不丟失,RabbitMQ支持使用message acknowledgments,消費者在完成任務后會給RabbitMQ發送個消息,告訴他活已經干完了,RabbitMQ就會把這個任務給釋放掉。而當出現消費者宕機、掉線等情況時,RabbitMQ會重新把這個任務發送給其他的消費者。

往回看看上文說到的no_ack,這個值默認的是False,RabbitMQ是不主動銷毀消息的所以我們一看看在這里把值置為True。

channel.basic_consume(callback, queue='hello', no_ack=True)

這樣只要消費者接收到消息,RabbitMQ就直接銷毀掉這個消息,就成了手動確認。我們要想實現剛才說的消息不丟失,就要這樣定義

def callback(ch, method, properties, body): print(" [x] Received %r" % body) # time.sleep(body.count(b'.'))
    time.sleep(10) #修改了一下,在延時的10s把消費者斷掉 print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')#這里的no_ack默認為False

這樣,當一個消費者宕機了,RabbitMQ就會直接把任務拍個下一個消費者。

 消息持久化

剛才通過了消息確認,我們保證了消費者在掉線的時候任務不丟失,可是還有一個問題,如果RabbitMQ如果斷掉(或者服務重啟)了,里面的任務(包括所有queue和exchange依舊會丟失)這時候我們可以用到——消息持久化Message durability

channel.queue_declare(queue='hello',durable=True)#將隊列持久化(只保存了隊列)
channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties(delivery_mode=2))#保持消息持久化

必須同時將隊列和消息持久化,可以保證RabbitMQ服務在重啟后任務還存在。

注意幾點:

1.如果只持久化了消息,服務重啟后消息丟失

2.如果只持久化了隊列,服務重啟后隊列還在,但消息丟失

3.在持久化隊列的時候要保持生產者和消費者的一致性

最后一點,因為有可能每個消費者處理信息的能力不一樣,如果按公平分發的化有可能導致負載不平衡,旱的旱死、澇的澇死。為避免這種情況發生還有一個知識點

channel.basic_qos(prefetch_count=1)

用這個語句限制了消費者待處理信息的個數

workQueue的終極代碼

import pika,sys
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello',durable=True)#隊列持久化
message = ' '.join(sys.argv[1:]) or "Hello World2."
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message,
                      properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
print('send %s'%message)
RabbitMQ_workqueue_procucer
import time,pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello',durable=True)#隊列持久化
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)  #消息回執
channel.basic_qos(prefetch_count=1)  #限制消費者待處理任務個數
channel.basic_consume(callback,
                      queue='hello'
                      )
channel.start_consuming()
RabbitMQ_workqueue_customer

 3.發布/訂閱(publish/subscribe)

我們在前面兩部分將的都是將消息由生產者到消費者之間通過queue傳遞,現在將引入一個新的成員:exchange。

其實生產者在發送的時候是不知道消息要發送給那個queue的,甚至他都不知道消息是由queue接收的。實際上生產者只是把message發送給了exchange。至於message后續的處理都是由exchange決定的。

就像圖上標示的,exchange在sender和queue之間起到了轉呈的作用。

按照工作方式,我們將exchange分成了fanoutdirecttopicheaders四種類型。

fanout:所有綁定到這個exchange的隊列都接收消息

direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息

topic:所有符合routingKey(可以是表達式)的queue可以接收消息

  表達式說明:#表示一個或多個字符

                            *表示任何字符

        使用RoutingKey為#時相當於fanout

headers:通過headers來決定把消息發送給哪些queue。

在這個part我們來看fanout的作用。

channel.exchange_declare(exchange='logs', exchange_type='fanout') 

我們定義一個exchange,名字隨便起一個‘logs’,類型就聲明為fanout。

(在前面兩節我們還沒有引入exchange這個概念,就用了默認的exchange設置

channel.basic_publish(exchange='', routing_key='hello', body='123')

,exchange=''空的字符串表示了默認的exchange或名字是空的,那exchange就把消息發送給routing_key指定的queue里(前提是這個queue是存在的),在聲明了exchange以后,我們就可以用這個exchange發送消息了

channel.basic_publish(exchange='logs',                #使用的exchange名稱
                      routing_key='',                 #使用的隊列名稱
                      body='123')                     #消息內容

注意到了一點沒有?這里並沒有定義隊列的名稱?為什么?在廣播的時候是不用固定具體的哪個queue的,我們

result = channel.queue_declare() #生成隨機queue

我們在消費端聲明queue的時候可以生成一個隨機的queue,這里還要加個命令

result = channel.queue_declare(exclusive=True) 

這個exclusive表示在連接在關閉以后這個queue直接被銷毀掉。

然后把這個queue綁定在轉發器上。所有進入這個exchange的消息被發送給所有和他綁定的隊列里。

隨機的queue已經聲明了,現在就把他跟exchange綁定

channel.queue_bind(exchange='logs', queue=result.method.queue)#注意queue名的獲取方法

這就是最終的代碼:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')   #logs 是隨便起的名字,聲明了exchange
message = ' '.join(sys.argv[1:]) or 'info: Hello World!'
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
fanout_publish
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
result = channel.queue_declare(exclusive=True)   #exclusive 唯一的,為True時不指定queue名的化隨機生成一個queue,
                                                    # 在斷開連接后把queue刪除,相當於生成一個隨機queue
channel.queue_bind(exchange='logs',
                   queue=result.method.queue) #綁定的是exchange對應的queue
print('waiting for logs.')
def callback(ch,method,preproteries,body):
    print('get data:%r'%body)
channel.basic_consume(callback,
                      queue=result.method.queue,
                      no_ack=True)
channel.start_consuming()
fanout_customer

 總體看一下,發送端的代碼跟前面的差不太多,最重要的差別就是把routingKey給忽略掉了,但是明確了exchange的對象。

而接收方是在建立連接后要聲明exchange,並且要和隊列綁定。如果沒有隊列和exchange綁定,消息就被銷毀了。這就是整個發送的過程

還有一點,這個訂閱——發布的模型就像電台和收音機一樣,如果customer下線了是收不到信息的,消息也是在線發送的,並不會保存。

4.routing(exchange type:direct)

在這個過程中,我們大致了解了發布——訂閱模型。其實就是在發送端定義了一個exchange,在接收端定義了一個隊列,然后把這兩者綁定,就OK了。可是我們現在只想訂閱一部分有用的信息,比如只獲取錯誤信息寫到日志文件里,但同時又能將所有的信息都顯示在控制台(或者terminal上)。

上一節所講述的bind,也可以簡單的理解為這個queue對這個exchange的內容“感興趣”。

在binding的時候,還可以加一個routingKey的參數,這個參數就描述了queue對哪些exchange感興趣。

channel.queue_bind(exchange='logs',         #被綁定的exchange名
                   queue='queue_name',      #被綁定的queue名
                   routing_key='black')     #queue的‘興趣愛好’

對queue和exchange進行bind時,bind的參數主要取決於exchange的類型,比如在fanout模式下是不能有這個routingKey的,運行時候會報錯。

我們使用了fanout的發布訂閱模式,在這個模式下接收端不能對信息進行一定原則的過濾,一股腦的照單全收,已經不能滿足我們的要求了,現在就要用direct模式。

 

在上面的圖里,有兩個queue分別和exchange綁定,Q1的routingKey是orange,Q2則有兩個分別是black和green。在這個模型中,發布的消息關鍵字是orange則被分發到Q1內,而包含有black或green的則發給Q2.剩余的消息就被discard了。

而在上圖中,同樣的key同多個隊列進行綁定的方法也是合法的。所有包含關鍵字black的消息會被同時發送 給Q1和Q2。

了解了上面所說的方法,我們來按照本節一開始的目標來修改下代碼

首先要聲明exchange

channel.exchange_declare(exchange='logs', exchange_type='direct')#聲明exchange,注意接收端的代碼是一樣的

在發送的時候對消息進行分類

serverity = sys.argv[1] if len(sys.argv)>1 else 'info'

然后發送消息

channel.basic_publish(exchange='logs', routing_key=serverity,  #消息的分類
                      body=message)

在接收端,我們用一個循環把所有的routingKey和queue綁定(有可能出現多個關鍵字和一個queue同時綁定的情況)

servrities = sys.argv[1:]    #獲取所有的關鍵字
if not servrities: sys.stderr.write('Usage: %s [info] [warning] [error]\n'%sys.argv[0]) sys.exit(1)              #關鍵字不存在打印提示后退出
print('recived:%s'%servrities) for servrity in servrities:  #循環綁定
    channel.queue_bind(exchange='logs', queue=queue_name, routing_key=servrity)

整個方案就是這樣的

我們啟動兩個terminal,按這樣的方式啟動

python direct_consumer.py info error warning
python direct_consumer.py error

 在分別發送

python direct_publisher.py info 123 python direct_publisher.py error 456 python direct_publisher.py warning 789

看看是什么效果

是不是達到了訂閱的效果!

4.更加細致的消息過濾(topic模式)

在上一節我們利用了direct的模式實現了初步的消息過濾,在這一節里,我們要看看如何實現如何實現更加細致的消息過濾,比如我在獲取info的前提下還要知道哪些message是RabbitMQ發來的,哪些是Redis發來的,那怎么區分呢?

就想這個圖里的一樣,我們在定義RoutingKey的時候利用了表達式,就像模糊查詢一樣其中

*表示任意一個字符

#表示0個或多個字符

topic模式的代碼和上一節的基本一致,只是改變了exchange的模式

channel.exchange_declare(exchange='logs', exchange_type='topic')#聲明exchange

 啟動terminal,輸入指令

python topic_customer kern.*                 可以接收以kern.開頭的所有消息 kern.123 abc  接收到abc
python topic_customer.py *.kern.*            中間包含.kern.的消息 123.kern.345 abc  接收到abc

 同時綁定多個關鍵字

接收端
d:\python\week11>python topic_customer.py kern.* pip.* ['kern.*', 'pip.*'] [*] Waiting for logs. To exit press CTRL+C [x] 'pip.11':b'duziele' [x] 'kern.11':b'duziele'
發送端 d:\python\week11>python topic_publisher.py pip.11 duziele [x] Sent 'pip.11':'duziele' d:\python\week11>python topic_publisher.py kern.11 duziele [x] Sent 'kern.11':'duziele'

 還可以用#獲取所有消息

d:\python\week11>python topic_customer.py #

 ps:#的作用我一直不大明白,我試過了

d:\python\week11>python topic_customer.py kern.#

效果和kern.*是一樣的。

6.Remote procedure call(RPC)

       我們在前面的章節將到了在多個消費者之間分發耗時任務的方法,可是現在要實現這樣的功能:調用遠程的設備上的一個函數,然后等執行完畢返回結果。這樣的工作模式就叫遠程過程調用——Remote Procedure Call(RPC)。

  利用RabbitMQ也可以實現RPC的功能,為了能模擬這個過程,我們在server端設立一個fun:給定一個整數n,然后返回n對應的斐波那契數列。

callback queue

  通過RabbitMQ實現RPC的方法很簡單——客戶端發送請求,服務端對請求響應然后把消息發送至叫callback的queue,過程類似這樣

result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)

Correlation id

  我們剛才為每個請求對應的響應都聲明了一個隊列,但是在這里等待着結果的返回效率是不是太低了?還好有另外的一種方法:為每個客戶端創建一個callback的隊列。然而又引發了一個新問題:在這個隊列里我不知道哪個響應是對應我這個請求的!這時候就到大神出馬了——Correlation ID。對每個請求都設置一個唯一的ID,在callback的隊列里通過查看屬性來判斷他對應哪個請求。如果出現沒有對應的ID,安全起見我們還是把他忽略掉。

 

總之我們的RPC的工作流程就是這樣的:

1.client啟動,聲明一個匿名的callback queue

2.建立RPC請求,請求里除了消息還包含兩個參數:a.replay_to(告訴server響應的結論callback的隊列里)

                       b.correlation_id:每個請求都被賦予一個獨一無二的值

3.請求被發送給RPC_queue

4.server等待queue里的消息,一旦出現請求,server響應請求並把結論發送給通過replay_to要求的queue里

5.client在callback_queue里等待數據,一旦消息出現,他將correlation進行比對,如果相同就獲取請求結果。

import pika

connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:return 1
    else:return fib(n-1)+fib(n-2)


def on_request(ch,method,props,body):
    n = int(body)
    print('[.]fib(%s)'%n)
    response = fib(n)
    print(response)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),
                     body = str(response))
    print('send over')
    ch.basic_ack(delivery_tag=method.delivery_tag)   #確認client接收到消息

channel.basic_qos(prefetch_count=1)   #限制消息處理個數
channel.basic_consume(on_request,queue='rpc_queue')

print('[x]Awaitiong RPC requests')
channel.start_consuming()
RPC_server
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue     #聲明請求響應回執的queue

        self.channel.basic_consume(consumer_callback=self.on_response,
                                   queue=self.callback_queue,no_ack=True) #監聽回執queue

    def on_response(self,ch,method,props,body):  #callback_queue的回調函數
        print(body)
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties   #發送回執消息的參數
                                   (reply_to=self.callback_queue,
                                    correlation_id=self.corr_id),
                                   body=str(n)
                                   )
        while self.response is None:
            self.connection.process_data_events()   #事件驅動,非阻塞版的start_consuming
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()
n = input('>>>')
print('[x] Requesting fib(%s)'%n)
response = fibonacci_rpc.call(n)
print(response)
RPC_client

 以上就是RabbitMQ的常規用法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM