上一章簡單介紹了RabbitMQ的安裝和一些參數及簡單的傳送信息,今天我們介紹一些其他的參數。
當創建了隊列和發送的消息,如果沒有被消費者消費的時候,重啟了RabbitMQ服務,隊列和消息都會丟失了。
pika版本1.1.0
一、RabbitMQ持久化
MQ默認建立的是臨時 queue 和 exchange,如果不聲明持久化,一旦 rabbitmq 掛掉,queue、exchange 將會全部丟失。所以我們一般在創建 queue 或者 exchange 的時候會聲明 持久化。
1.queue 聲明持久化(******)
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創建。durable = True 代表消息隊列持久化存儲,False 非持久化存儲 result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 聲明持久化
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建.durable = True 代表exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一個非持久化的 queue 或 exchange ,執行上述代碼會報錯,因為當前狀態不能更改 queue 或 exchange 存儲屬性,需要刪除重建。如果 queue 和 exchange 中一個聲明了持久化,另一個沒有聲明持久化,則不允許綁定。隊列必須在第一次聲明的時候,就必須要持久化。
3.消息持久化(******)
雖然exchange和queue都聲明了持久化,但是消息只存在內存里,rabbitmq重啟后,內存里面的東西還是會丟失。所以必須聲明消息也是持久化,從內存轉存到硬盤。
# 向隊列插入數值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
4.Message acknowledgment消息不丟失(******)
消費者(consumer)調用callback函數時,會存在處理消息失敗的風險,如果處理失敗,則消息丟失。但是也可以選擇消費者處理失敗時,將消息回退給 rabbitmq ,重新再被消費者消費,這個時候需要設置確認標識。
channel.basic_consume('python-test',callback # no_ack 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 no_ack = False)
注意:消息持久化也不能說完全保證數據不會丟失。以上只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間有一個很小的時間間隔。如果你一定要保證持久化,你需要改寫你的代碼來支持事務。
二、消息的能者多勞
服務器的性能大小不一,有的服務器處理的快,有的服務器處理的慢,因此默認的輪詢方式不能夠滿足我們的需求,我們要的是 能者多勞,
最大限度的發揮我們機器的性能. 為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue='task_queue')
三、RabbitMQ發布與訂閱
我們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。所以rabbitmq的發布與訂閱要借助交換機(Exchange)的原理實現。
在此之前我們需要先了解一些參數:
routing key(生產者發給Exchange)
生產者在將消息發送給Exchange的時候,一般會指定一個routing key(指定連接隊列名),來指定這個消息的路由規則,而這個routing key需要與Exchange Type及
binding key聯合使用才能最終生效。在Exchange Type與binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在發送消息給Exchange時,
通過指定routing key來決定消息流向哪里。RabbitMQ為routing key設定的長度限制為255 bytes
Binding
RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
Binding key(Exchange與Queue之間)
在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者將消息發送給Exchange時,一般會指定一個routing key; routing_key指定隊列名字
當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。
在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
binding key 並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout類型的Exchange就會無視queue_bind中的routing_key,而是將消息路由到所有綁定到該Exchange的Queue。
Exchange Types一共有四種工作模式:fanout,direct,topic,headers 現在我們一般都不用headers,所以我們只分析前三種
模式一:fanout(扇形交換機)
這種模式下,傳遞到exchange的消息將會轉發到所有與其綁定的queue上。fanout類型轉發消息是最快的。
注意以下幾點:
1.不需要指定 routing_key ,即使指定了也是無效。 2.需要提前將 exchange 和 queue 綁定,一個 exchange 可以綁定多個 queue,一個queue可以綁定多個exchange。 3.需要先啟動 訂閱者,此模式下的隊列是 consumer 隨機生成的,發布者 僅僅發布消息到 exchange ,由 exchange 轉發消息至 queue
發布者代碼:
import pika import json credentials = pika.PlainCredentials('guest', 'guest') # mq用戶名和密碼 connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,credentials = credentials))
#創建鏈接 channel=connection.channel() # 聲明exchange,指定exchange名字,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向隊列插入數值 exchange指定隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
訂閱者代碼:
import pika credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,credentials = credentials)) channel = connection.channel() # 創建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除 result = channel.queue_declare('',exclusive=True) # 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') # 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去,queue臨時隊列,使用result.method.queue獲取隊列 channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId') #這個routing_key和發布者中basic_publish的routing_key不一樣 # 定義一個回調函數來處理消息隊列中的消息,這里是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) channel.basic_consume(result.method.queue,callback,# 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 auto_ack = False) #消息響應是默認開啟的,no_ack=True是指不返回響應 channel.start_consuming()
模式二:direct(直連交換機)
這種工作模式的原理是 消息發送至 exchange,exchange 根據 路由鍵(routing_key)轉發到相對應的 queue 上。
- 可以使用默認 exchange =' ' ,也可以自定義 exchange
- 這種模式下不需要將 exchange 和 任何進行綁定,當然綁定也是可以的。可以將 exchange 和 queue ,routing_key 和 queue 進行綁定
- 傳遞或接受消息時 需要 指定 routing_key
- 需要先啟動 訂閱者,此模式下的隊列是 consumer 隨機生成的,發布者 僅僅發布消息到 exchange ,由 exchange 轉發消息至 queue。
發布者代碼:
import pika import json credentials = pika.PlainCredentials('guest', 'guest') # mq用戶名和密碼 connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,credentials = credentials)) channel=connection.channel() # 聲明exchange,指定exchange名字,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 指定 routing_key。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
訂閱者代碼:
import pika credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,credentials = credentials)) channel = connection.channel() # 創建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除 result = channel.queue_declare('',exclusive=True) # 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') # 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId') # 定義一個回調函數來處理消息隊列中的消息,這里是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #channel.basic_qos(prefetch_count=1) # 告訴rabbitmq,用callback來接受消息 channel.basic_consume(result.method.queue,callback, # 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 auto_ack = False) channel.start_consuming()
模式三:topicd(主題交換機)
這種模式和第二種模式差不多,exchange 也是通過 路由鍵 routing_key 來轉發消息到指定的 queue 。 不同點是 routing_key 使用正則表達式支持模糊匹配,但匹配規則又與常規的正則表達式不同,比如‘’#‘’是匹配全部,“*”是匹配一個詞。
舉例:routing_key =“#orderid#”,意思是將消息轉發至所有 routing_key 包含 “orderid” 字符的隊列中。代碼和模式二 類似,就不貼出來了。
發布者代碼:
import pika import sys credentials = pika.PlainCredentials('用戶名', '密碼') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 channel.exchange_declare(exchange='mytopic',type='topic') log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info' message = ' '.join(sys.argv[1:]) or "all.info: Hello World!" channel.basic_publish(exchange='topic_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()
訂閱者代碼:
import pika,sys credentials = pika.PlainCredentials('用戶名', '密碼') parameters = pika.ConnectionParameters(host='localhost',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = queue_obj.method.queue log_levels = sys.argv[1:] # info warning errr if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: channel.queue_bind(exchange='topic_log', queue=queue_name, routing_key=level) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, auto_ack=True) channel.start_consuming()
RabbitMQ服務器的管理
./sbin/rabbitmq-server -detached # 后台啟動 ./sbin/rabbitmqctl status # 查看狀態 ./sbin/rabbitmqctl stop # 關閉 ./sbin/rabbitmqctl list_queues # 查看queue