RabbitMQ基本用法
- 進程queue用於同一父進程創建的子進程間的通信
- 而RabbitMQ可以在不同父進程間通信(例如在word和QQ間通信)
示例代碼
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#相當於建立一個socket,連接本地的RabbitMQ,默認端口:5672
channel = connection.channel()#聲明一個通信管道(信道)
#在管道里什么一個queue
channel.queue_declare(queue='hello')#聲明一個名稱為hello的queue
#通過管道發送消息
channel.basic_publish(exchange='',
routing_key='hello',#queue的名字
body='Hellow Word!')#消息主體
connection.close()#關閉連接
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#不確定生產端或消費端誰先運行,因此為了避免錯誤,消費端也要申請一個queue
#消費端先運行,如果沒申請這個queue,生產端還沒建立這個queue,因此報錯
channel.queue_declare(queue='hello')
def callback(ch,method,properties,body):
#ch,管道(信道)channel的內存地址
#method,設置的一些基本信息
#properties,
#body,消息主體,二進制數據
print(ch,method,properties)
print('[x] Received %r'%body)
#聲明要收消息
channel.basic_consume(
callback,#如果收到消息就調用回調函數處理消息
queue='hello',#queue的名字
no_ack=True#不確認,是否處理完callback,給rabbitmq返回確認信息
)
#開始收消息
channel.start_consuming()#開啟后一直收消息,沒消息則卡住
消息分發
RabbitMQ消息分發(一對多)
- 一個生產者,多個消費者
- 多個消費者時,是輪詢機制,依次分發給消費者。(每個消費者按順序依次消費)
no_act
設置是否確認消息處理完
- 設置no_act = True,消費者不發送確認信息,RabbitMQ從發送消息隊列后,不管消費者是否處理完,刪除queue
- 設置no_act = False,RabbitMQ等待消費者的callback處理完,發送確認信息,如果此時消費者down了,則Rabbit把消息輪詢發送給下一個消費者,等待確認才會刪除queue
- 去掉no_act = True(默認為False),需要在回調函數中新增代碼,手動向RabbitMQ發送確認信息
ch.basic_ack(delivery_tag=method.delivery_tag)
消息持久化
rabbitmq目錄下啟動cmd,命令:rabbitmqctl.bat list_queues
查看當前queue列表
當我們需要消息不會丟失(RabbitMQ server宕機時),需要進行消息持久化
- 1、在申明隊列是加上參數使其持久化,生產者和消費者都需要申明
channel.queue_declare(queue='hello',durable=True)
隊列持久化
- 2、在生產端發送消息函數時加入參數使消息持久化
- 消息持久化
channel.basic_publish(
exchange='',
routing_key='hello',#queue的名字
body='Hellow Word!'
porperties=pika.BasicProperties(
delivery_mode=2#使隊列里的消息持久化
)
)#消息主體
廣播模式
消息公平分發
- 如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
- 在消費端channel.basic_consume()函數前新增一條代碼
channel.basic_qos(prefetch_count=1)
- 解釋:如果有2個消費者(a,b),a處理消息比較慢,b比較快;RabbitMQ是輪詢發送消息,依次給a一條,給b一條,再給a.....。當在消費者端設置以上代碼時,a還在處理,那么RabbitMQ不會給a發送,只會給b
廣播模式(消息是實時的,發送時沒有啟動接收端,消息丟失)
- 1、發送端將消息發送到RabbitMQ的消息轉發器(exchange)
- 2、轉發器(Exchange)遍歷所有綁定它的queue,將消息廣播給queue
- 3、接收端從queue里獲取接收消息
- 4、使用此queue的消費者斷開后,此queue刪除
- 設置exchange轉發器
- Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
- fanout: 所有bind到此exchange的queue都可以接收消息
- direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
- topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
給所有bind此exchange的發送消息
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')#廣播模式,不用申明queue指定queue名
#設置exchange為fanout模式
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_name = result.method.queue#拿到這個隨機分配的queue名
channel.queue_bind(exchange='logs',#綁定發送端的exchange
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
有選擇的廣播(接受者過濾接收消息exchange type=direct)
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
細致的消息過濾()
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
- 消費者
- 參數為
#
,不過濾收所有
mysql.*
,收所有mysql開頭的消息
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()