python操作RabbitMQ
一、rabbitmq
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
1.1 安裝rabbitmq
RabbitMQ安裝
安裝配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安裝erlang $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server
注意:service rabbitmq-server start/stop
安裝API:
pip install pika or easy_install pika or 源碼 or pycharm https://pypi.python.org/pypi/pika
1.3 用python操作rabbitmq
1.3.1 基於Queue實現生產者消費者模型

#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
1.3.2 rabbitmq實現消息隊列
對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。
先運行消費者腳本,讓它監聽隊列消息,然后運行生產者腳本,生產者往隊列里發消息。然后消費者往隊列里取消息。
import pika # ########################### 消費者 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.queue_declare(queue='abc') # 如果隊列沒有創建,就創建這個隊列 def callback(ch, method, propertities,body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='abc', # 隊列名 no_ack=True) # 不通知已經收到,如果連接中斷可能消息丟失 print(' [*] Waiting for message. To exit press CTRL+C') channel.start_consuming()
import pika # ############################## 生產者 ############################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208' )) channel = connection.channel() channel.queue_declare(queue='abc') # 如果隊列沒有創建,就創建這個隊列 channel.basic_publish(exchange='', routing_key='abc', # 指定隊列的關鍵字為,這里是隊列的名字 body='Hello World!') # 往隊列里發的消息內容 print(" [x] Sent 'Hello World!'") connection.close()
先運行消費者,然后再運行生產者:

''' 打印: 生產者: [x] Sent 'Hello World!' 消費者: [*] Waiting for message. To exit press CTRL+C [x] Received b'Hello World!' '''
1.4 no-ack=False:rabbitmq消費者連接斷了 消息不丟失
rabbitmq支持一種方式:應答。比如我從消息里拿一條消息,如果全處理完,你就不要幫我記着了。如果沒處理完,突然斷開了,再連接上的時候,消息隊列就會重新發消息。
總結:
- Basic.Ack 發回給 RabbitMQ 以告知,可以將相應 message 從 RabbitMQ 的消息緩存中移除。
- Basic.Ack 未被 consumer 發回給 RabbitMQ 前出現了異常,RabbitMQ 發現與該 consumer 對應的連接被斷開,之后將該 message 以輪詢方式發送給其他 consumer (假設存在多個 consumer 訂閱同一個 queue)。
- 在 no_ack=true 的情況下,RabbitMQ 認為 message 一旦被 deliver 出去了,就已被確認了,所以會立即將緩存中的 message 刪除。所以在 consumer 異常時會導致消息丟失。
- 來自 consumer 側的 Basic.Ack 與 發送給 Producer 側的 Basic.Ack 沒有直接關系
注意:
1)只有在Consumer(消費者)斷開連接時,RabbitMQ才會重新發送未經確認的消息。
2)超時的情況並未考慮:無論Consumer需要處理多長時間,RabbitMQ都不會重發消息。
消息不丟失的關鍵代碼:
1)在接收端的callback最后:
channel.basic_ack(delivery_tag=method.delivery_tag)
ack即acknowledge(承認,告知已收到) 也就是消費者每次收到消息,要通知一聲:已經收到,如果消費者連接斷了,rabbitmq會重新把消息放到隊列里,下次消費者可以連接的時候,就能重新收到丟失消息。 A message MUST not be acknowledged morethan once. The receiving peer MUST validate that a non-zero delivery-tag refersto a delivered message,
and raise a channel exception if this is not the case.
2)除了callback函數,還要在之前設置接收消息時指定no_ack(默認False):
channel.basic_consume(callback, queue='hello', no_ack=False)
消費者:
import pika
# ########################### 消費者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.211.55.4')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
消費者斷掉連接,再次連接,消息還會收到。
1.5 durable:rabbitmq服務端宕機 消息不丟失
發數據的時候,就說了:我這條數據要持久化保存。
如果rabbitmq服務端機器如果掛掉了,會給這台機器做持久化。如果啟動機器后,消息隊列還在。
生產者.py:
import pika
# ############################## 生產者 ##############################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
消費者.py:
import pika
# ########################### 消費者 ###########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
測試:
1)把生產者.py執行三次。
2)然后在linux上停掉rabbitmq服務,然后再開啟rabbitmq服務
[root@localhost ~]# /etc/init.d/rabbitmq-server stop Stopping rabbitmq-server: rabbitmq-server. [root@localhost ~]# /etc/init.d/rabbitmq-server start Starting rabbitmq-server: SUCCESS rabbitmq-server.
3)運行:消費者.py:三條消息都打印了:
[*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!' ok [x] Received b'Hello World!' ok [x] Received b'Hello World!' ok
1.6 消息獲取順序
默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
因為默認是跳着取得。第一個消費者取得很快,已經執行到20了,但是第二個消費者只取到13,可能消息執行的順序就有問題了。
如果多個消費者,如果不想跳着取,就按消息的順序取,而不是按着自己的間隔了。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列
#!/usr/bin/env python # -*- coding:utf-8 -*- __author__ = 'WangQiaomei' import pika # ########################### 消費者 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.137.208')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello1') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello1', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
1.7發布訂閱
發布訂閱原理:
1)發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。
2)所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
3)exchange 可以幫你發消息到多個隊列!type設為什么值,就把消息發給哪些隊列。
發布訂閱應用到監控上:
模板就是寫上一段腳本,放在服務器上,
客戶端每5分鍾,從服務端拿到監控模板,根據模板來取數據,
然后把數據結果發步到服務端的redis頻道里。
服務端收到數據,1)處理歷史記錄 2)報警 3)dashboard顯示監控信息
服務端有三處一直來訂閱服務端頻道(一直來收取客戶端監控數據)
1.7.1 發布給所有綁定隊列
exchange type = fanout
exchange 可以幫你發消息到多個隊列,type = fanout表示:跟exchange綁定的所有隊列,都會收到消息。
發布者:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei import pika import sys # ########################### 發布者 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
訂閱者:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika # ########################### 訂閱者 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') # 隨機創建隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 綁定 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() ''' 多次執行這個文件,就會隨機生成多個隊列。並且exchange都綁定這些隊列。 然后發布者只需要給exchange發送消息,然后exchange綁定的多個隊列都有這個消息了。訂閱者就收到這個消息了。 '''
1.7.2關鍵字發送
一個隊列還可以綁定多個關鍵字
對一個隨機隊列,綁定三個關鍵字
再次執行,對另一個隨機隊列,只綁定一個關鍵字。
消費者:每執行一次可以生成一個隊列。通過使用命令行傳參的方式,來傳入隊列的關鍵字。
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
容易測試的版本:
消費者1:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika import sys # ########################### 消費者1 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) # 隨機生成隊列 queue_name = result.method.queue severities = ["info", "warning", "error"] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
消費者2:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika import sys # ########################### 消費者2 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) # 隨機生成隊列 queue_name = result.method.queue severities = ["error"] for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
生產者:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika import sys # ############################## 生產者 ############################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = 'info' message = 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() ''' 同時運行消費者1,消費者2,然后修改生產者的關鍵字,運行生產者。 當生產者:severity = 'info',則消費者1收到消息,消費者2沒收到消息 當生產者:severity = 'error',則消費者1、消費者2 都收到消息 '''
1.7.2 模糊匹配
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配 0 個 或 多個 字符
- * 表示只能匹配 一個 任意字符
發送者路由值 隊列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
消費者:
#!/usr/bin/env python import pika import sys # ############################## 消費者 ############################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = "*.orange.*" for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
生產者:
#!/usr/bin/env python import pika import sys # ############################## 生產者 ############################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') # routing_key = 'abc.new.qiaomei.old' routing_key = 'neworangeold' message = 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() ''' #.orange.# 匹配:new.orange.old neworangeold *.orange.* 匹配:neworangeold,不匹配:new.orange.old '''
1.8 saltstack原理實現
saltstack:zeromq:放到內存里的,會更快,會基於這個做rcp
openstack:大量使用:rabbitmq
saltstack上有master,有三個隊列。,讓三個客戶端每個人取一個隊列的任務
saltstack的原理:
1)發一條命令ifconfig,想讓所有nginx主機組的機器,都執行。
2)在master我們可以發命令給exchange,nginx總共有10台服務器,創建10個帶有nginx關鍵字的10個隊列,
3)master隨機生成隊列,md5是一個隊列的名字,exchange把命令和md5這個消息推送到nginx關鍵字的隊列里。
4)nginx10台服務器從隊列中取出消息,執行命令,並且把主機名和執行的結果返回給這個隊列里。
5)master變為消費者,取出隊列里的主機名和執行結果,並打印到終端上。
服務器1:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika import sys # ########################### 消費者1 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) # 隨機生成隊列 queue_name = result.method.queue severities = ["nginx", "gfs", "redis"] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) queue_md5=body.decode().split(",")[1] hostname = 'nginx1' channel.queue_declare(queue=queue_md5) # 如果隊列沒有創建,就創建這個隊列 channel.basic_publish(exchange='', routing_key=queue_md5, # 指定隊列的關鍵字為,這里是隊列的名字 body='%s|cmd_result1' %hostname) # 往隊列里發的消息內容 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
服務器2:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika import sys # ########################### 消費者2 ########################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) # 隨機生成隊列 queue_name = result.method.queue severities = ["nginx"] for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) queue_md5=body.decode().split(",")[1] hostname = 'nginx2' channel.queue_declare(queue=queue_md5) # 如果隊列沒有創建,就創建這個隊列 channel.basic_publish(exchange='', routing_key=queue_md5, # 指定隊列的關鍵字為,這里是隊列的名字 body='%s|cmd_result2' %hostname) # 往隊列里發的消息內容 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
master:
#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'WangQiaomei' import pika import sys import hashlib # ############################## 生產者 ############################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = 'nginx' m2 = hashlib.md5() m2.update(severity.encode('utf-8')) md5_security=m2.hexdigest() print('md5_security:',md5_security) message = 'cmd,%s' % md5_security channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() #################################3 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.137.208')) channel = connection.channel() channel.queue_declare(queue=md5_security) # 如果隊列沒有創建,就創建這個隊列 def callback(ch, method, propertities,body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue=md5_security, # 隊列名 no_ack=True) # 不通知已經收到,如果連接中斷消息就丟失 print(' [*] Waiting for message. To exit press CTRL+C') channel.start_consuming()
打印:
''' 服務器1,和服務器2都打印: [*] Waiting for logs. To exit press CTRL+C [x] 'nginx':b'cmd,ee434023cf89d7dfb21f63d64f0f9d74' master打印: md5_security: ee434023cf89d7dfb21f63d64f0f9d74 [x] Sent 'nginx':'cmd,ee434023cf89d7dfb21f63d64f0f9d74' [*] Waiting for message. To exit press CTRL+C [x] Received b'nginx2|cmd_result2' [x] Received b'nginx1|cmd_result1' '''