python操作RabbitMQ


python操作RabbitMQ

 

一、rabbitmq

 

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。

 

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

 

1.1 安裝rabbitmq

 

RabbitMQ安裝

 

安裝配置epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安裝erlang
   $ yum -y install erlang
 
安裝RabbitMQ
   $ yum -y install rabbitmq-server

 

注意:service rabbitmq-server start/stop

安裝API

pip install pika
or
easy_install pika
or
源碼
or
pycharm

https://pypi.python.org/pypi/pika

  

1.3 python操作rabbitmq

1.3.1 基於Queue實現生產者消費者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading

message = Queue.Queue(10)

def producer(i):
    while True:
        message.put(i)

def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
基於Queue實現生產者消費者模型

1.3.2 rabbitmq實現消息隊列

對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。

先運行消費者腳本,讓它監聽隊列消息,然后運行生產者腳本,生產者往隊列里發消息。然后消費者往隊列里取消息。

import pika

# ########################### 消費者 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='192.168.137.208'))
channel = connection.channel()

channel.queue_declare(queue='abc')  # 如果隊列沒有創建,就創建這個隊列

def callback(ch, method, propertities,body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='abc',  # 隊列名
                      no_ack=True)  # 不通知已經收到,如果連接中斷可能消息丟失

print(' [*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()

 

import pika
# ############################## 生產者 ##############################
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.137.208'
))
channel = connection.channel()
channel.queue_declare(queue='abc')  # 如果隊列沒有創建,就創建這個隊列
channel.basic_publish(exchange='',
                      routing_key='abc',   # 指定隊列的關鍵字為,這里是隊列的名字
                      body='Hello World!')  # 往隊列里發的消息內容
print(" [x] Sent 'Hello World!'")
connection.close()

先運行消費者,然后再運行生產者:

'''
打印:
生產者:
 [x] Sent 'Hello World!'
消費者:
 [*] Waiting for message. To exit press CTRL+C
 [x] Received b'Hello World!'
'''
打印

 

1.4 no-ackFalserabbitmq消費者連接斷了 消息不丟失

rabbitmq支持一種方式:應答。比如我從消息里拿一條消息,如果全處理完,你就不要幫我記着了。如果沒處理完,突然斷開了,再連接上的時候,消息隊列就會重新發消息。

總結: 

  • Basic.Ack 發回給 RabbitMQ 以告知,可以將相應 message 從 RabbitMQ 的消息緩存中移除。
  • Basic.Ack 未被 consumer 發回給 RabbitMQ 前出現了異常,RabbitMQ 發現與該 consumer 對應的連接被斷開,之后將該 message 以輪詢方式發送給其他 consumer (假設存在多個 consumer 訂閱同一個 queue)。
  • 在 no_ack=true 的情況下,RabbitMQ 認為 message 一旦被 deliver 出去了,就已被確認了,所以會立即將緩存中的 message 刪除。所以在 consumer 異常時會導致消息丟失。
  • 來自 consumer 側的 Basic.Ack 與 發送給 Producer 側的 Basic.Ack 沒有直接關系

注意:

1)只有在Consumer(消費者)斷開連接時,RabbitMQ才會重新發送未經確認的消息。

2)超時的情況並未考慮:無論Consumer需要處理多長時間,RabbitMQ都不會重發消息。

 

消息不丟失的關鍵代碼:

1)在接收端的callback最后:

channel.basic_ack(delivery_tag=method.delivery_tag)

ack即acknowledge(承認,告知已收到)
也就是消費者每次收到消息,要通知一聲:已經收到,如果消費者連接斷了,rabbitmq會重新把消息放到隊列里,下次消費者可以連接的時候,就能重新收到丟失消息。
A message MUST not be acknowledged morethan once. The receiving peer MUST validate that a non-zero delivery-tag refersto a delivered message, 
and raise a channel exception if this is not the case.

2)除了callback函數,還要在之前設置接收消息時指定no_ack(默認False):

channel.basic_consume(callback, queue='hello', no_ack=False)  

消費者:

import pika

  # ########################### 消費者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消費者斷掉連接,再次連接,消息還會收到。

 

1.5 durablerabbitmq服務端宕機 消息不丟失

 

發數據的時候,就說了:我這條數據要持久化保存。

 

如果rabbitmq服務端機器如果掛掉了,會給這台機器做持久化。如果啟動機器后,消息隊列還在。

 

生產者.py

 

import pika

# ############################## 生產者 ##############################


connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
 )) print(" [x] Sent 'Hello World!'")
connection.close()

 

消費者.py

 

import pika

# ########################### 消費者 ###########################


connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

測試:

1)把生產者.py執行三次。

2)然后在linux上停掉rabbitmq服務,然后再開啟rabbitmq服務

 

[root@localhost ~]# /etc/init.d/rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.

[root@localhost ~]# /etc/init.d/rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.

 

3)運行:消費者.py:三條消息都打印了:

 

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'
ok
 [x] Received b'Hello World!'
ok
 [x] Received b'Hello World!'
ok

 

1.6 消息獲取順序

 

默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

因為默認是跳着取得。第一個消費者取得很快,已經執行到20了,但是第二個消費者只取到13,可能消息執行的順序就有問題了。

 

如果多個消費者,如果不想跳着取,就按消息的順序取,而不是按着自己的間隔了。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = 'WangQiaomei'
import pika

# ########################### 消費者 ###########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.137.208'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello1')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello1',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

1.7發布訂閱

 

發布訂閱原理:

1)發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。

2)所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

3exchange 可以幫你發消息到多個隊列!type設為什么值,就把消息發給哪些隊列。

 

發布訂閱應用到監控上:

模板就是寫上一段腳本,放在服務器上,

客戶端每5分鍾,從服務端拿到監控模板,根據模板來取數據,

然后把數據結果發步到服務端的redis頻道里。

服務端收到數據,1)處理歷史記錄 2)報警 3dashboard顯示監控信息

服務端有三處一直來訂閱服務端頻道(一直來收取客戶端監控數據)

 

 

1.7.1  發布給所有綁定隊列

exchange type = fanout

exchange 可以幫你發消息到多個隊列,type = fanout表示:跟exchange綁定的所有隊列,都會收到消息。

 

發布者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei
import pika
import sys
# ########################### 發布者 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

 

訂閱者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
# ########################### 訂閱者 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')
# 隨機創建隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 綁定
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()


'''
多次執行這個文件,就會隨機生成多個隊列。並且exchange都綁定這些隊列。
然后發布者只需要給exchange發送消息,然后exchange綁定的多個隊列都有這個消息了。訂閱者就收到這個消息了。
'''

 

 

1.7.2關鍵字發送

一個隊列還可以綁定多個關鍵字

 

對一個隨機隊列,綁定三個關鍵字

再次執行,對另一個隨機隊列,只綁定一個關鍵字。

 

消費者:每執行一次可以生成一個隊列。通過使用命令行傳參的方式,來傳入隊列的關鍵字。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:] if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

容易測試的版本:

消費者1

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
import sys

# ########################### 消費者1 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)    # 隨機生成隊列
queue_name = result.method.queue

severities = ["info", "warning", "error"] if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

消費者2

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
import sys

# ########################### 消費者2 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)    # 隨機生成隊列
queue_name = result.method.queue

severities = ["error"] for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

 

生產者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
import sys

# ############################## 生產者 ##############################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = 'info'
message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

'''
同時運行消費者1,消費者2,然后修改生產者的關鍵字,運行生產者。
當生產者:severity = 'info',則消費者1收到消息,消費者2沒收到消息
當生產者:severity = 'error',則消費者1、消費者2 都收到消息
'''

 

1.7.2  模糊匹配

 

exchange type = topic

topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchangeexchange將傳入路由值關鍵字進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示可以匹配 0 個 或 多個 字符
  • *  表示只能匹配 一個 任意字符

 

發送者路由值              隊列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配

消費者:

 

#!/usr/bin/env python
import pika
import sys
# ############################## 消費者 ##############################
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = "*.orange.*"


for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

生產者:

 

#!/usr/bin/env python
import pika
import sys
# ############################## 生產者 ##############################
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

# routing_key = 'abc.new.qiaomei.old'
routing_key = 'neworangeold'
message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

'''
#.orange.#  匹配:new.orange.old  neworangeold
*.orange.*  匹配:neworangeold,不匹配:new.orange.old
'''

 

1.8 saltstack原理實現

 

saltstackzeromq:放到內存里的,會更快,會基於這個做rcp

openstack:大量使用:rabbitmq

saltstack上有master,有三個隊列。,讓三個客戶端每個人取一個隊列的任務

saltstack的原理:

1)發一條命令ifconfig,想讓所有nginx主機組的機器,都執行。

2)在master我們可以發命令給exchangenginx總共有10台服務器,創建10個帶有nginx關鍵字的10個隊列,

3master隨機生成隊列,md5是一個隊列的名字,exchange把命令和md5這個消息推送到nginx關鍵字的隊列里。

4nginx10台服務器從隊列中取出消息,執行命令,並且把主機名和執行的結果返回給這個隊列里。

5master變為消費者,取出隊列里的主機名和執行結果,並打印到終端上。

 

服務器1

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
import sys

# ########################### 消費者1 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)  # 隨機生成隊列
queue_name = result.method.queue

severities = ["nginx", "gfs", "redis"] if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    queue_md5=body.decode().split(",")[1]
    hostname = 'nginx1'
    channel.queue_declare(queue=queue_md5)  # 如果隊列沒有創建,就創建這個隊列
    channel.basic_publish(exchange='',
                          routing_key=queue_md5,   # 指定隊列的關鍵字為,這里是隊列的名字
                          body='%s|cmd_result1' %hostname)  # 往隊列里發的消息內容

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

 

服務器2:

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
import sys

# ########################### 消費者2 ###########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)  # 隨機生成隊列
queue_name = result.method.queue

severities = ["nginx"]


for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    queue_md5=body.decode().split(",")[1]
    hostname = 'nginx2'
    channel.queue_declare(queue=queue_md5)  # 如果隊列沒有創建,就創建這個隊列
    channel.basic_publish(exchange='',
                          routing_key=queue_md5,   # 指定隊列的關鍵字為,這里是隊列的名字
                          body='%s|cmd_result2' %hostname)  # 往隊列里發的消息內容

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

master:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

import pika
import sys
import hashlib

# ############################## 生產者 ##############################

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = 'nginx'
m2 = hashlib.md5()
m2.update(severity.encode('utf-8'))
md5_security=m2.hexdigest()
print('md5_security:',md5_security)
message = 'cmd,%s' % md5_security

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

#################################3
connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='192.168.137.208'))
channel = connection.channel()

channel.queue_declare(queue=md5_security)  # 如果隊列沒有創建,就創建這個隊列

def callback(ch, method, propertities,body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue=md5_security,  # 隊列名
                      no_ack=True)  # 不通知已經收到,如果連接中斷消息就丟失

print(' [*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()

 

打印:

'''
服務器1,和服務器2都打印:
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'nginx':b'cmd,ee434023cf89d7dfb21f63d64f0f9d74'

master打印:
md5_security: ee434023cf89d7dfb21f63d64f0f9d74
 [x] Sent 'nginx':'cmd,ee434023cf89d7dfb21f63d64f0f9d74'
 [*] Waiting for message. To exit press CTRL+C
 [x] Received b'nginx2|cmd_result2'
 [x] Received b'nginx1|cmd_result1'
'''

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM