一、消息公平分發 |
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1)
注:在消費之前加
二、fanout廣播模式 |
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,先來說說exchange的官方說明:
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
- fanout:所有bind到此exchange的queue都可以接收消息(純廣播的,所有消費者都能收到消息)
- direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息
- topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
- headers:通過headers 來決定把消息發給哪些queue(這個很少用,一般情況下,我們用不到)
2.1、fanout廣播模式
說明:fanout這種模式是所有綁定exchange的queue都可以接收到消息。exchange=>轉換器
1、生產者(fanout_publiser)
說明:跟之前寫的不同,生產者這邊並沒有聲明queue,因為生產者是以廣播的形式,所以這邊不需要聲明queue
import pika #創建socket連接 connection = pika.BlockingConnection(pika.ConnectionParameters (host='localhost')) #創建管道 channel = connection.channel() #聲明exchange,且exchange的名字是logs,exchange的類型為fanout channel.exchange_declare(exchange='logs',exchange_type="fanout") #發送的消息 message = "info:hello world" #生產一個消息 channel.basic_publish( exchange="logs", routing_key='', body=message ) print("[X] Send {0}".format(message)) #關閉連接 connection.close()
注:這邊的exchange的名字logs是隨便起的
2、消費者(fanout_consumer)
說明:消費者這邊要聲明一個唯一的queue_name的對象,並且從對象中獲取queue名
import pika #創建一個socket connection = pika.BlockingConnection(pika.ConnectionParameters( host="localhost")) #創建一個管道 channel = connection.channel() #聲明exchange,exchange的名字logs,類型是fanout廣播模式 channel.exchange_declare(exchange="logs", exchange_type="fanout") #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除,result是queue的對象 result = channel.queue_declare(exclusive=True) #exclusive=>排他的,唯一的 #獲取queue名 queue_name = result.method.queue #綁定exchange channel.queue_bind(exchange="logs", queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') #聲明回調函數 def callback(ch,method,properties,body): "回調函數" print("[X] {0}".format(body)) #消費者消費 channel.basic_consume(callback, queue=queue_name, no_ack=True) #啟動消費模式 channel.start_consuming()
3、代碼邏輯圖
3、web消息圖
①服務端沒有聲明queue,為什么客戶端要聲明一個queue?
答:生產者發消息到exchange上,exchange就會遍歷一遍,所有綁定它的哪些queue,然后把消息發到queue里面,它發了queue就不管了,消費者從queue里面去收,所以就收到廣播了,而不是說exchange直接就把消息發給消費者,消費者只會從queue里去讀消息,且拿着queue去綁定exchange。
②為什么queue要自動生成,而不是自己手動去寫?
答:我這個queue只是為了收廣播的,所以如果我消費者不收了,這個queue就不需要了,所以就讓它自動生成了,不需要的了,就自動銷毀
2.2、廣播實時性
廣播是實時的,你不在的時候,就是你消費者沒有開啟的時候,發消息的時候,就沒有收到,這個時候就沒有了。如果消費者開啟了,生產者發消息時,消費者是收的到的,這個又叫訂閱發布,收音機模式。