Python與RabbitMQ交互


RabbitMQ 消息隊列

  成熟的中間件RabbitMQ、ZeroMQ、ActiveMQ等等

  RabbitMQ使用erlang語言開發,使用RabbitMQ前要安裝erlang語言

  RabbitMQ允許不同應用、程序間交互數據

  python中的Threading queue只能允許單進程內多線程交互的

  python中的MultiProcessing queue只能允許父進程與子進程或同父進程的多個子進程交互


RabbitMQ啟動:
  1.windows中默認安裝成功,在服務列表中會顯示自動啟動
  2.Linux中使用命令rabbitmq-server start

RabbitMQ支持不同的語言,對於不同語言有相應的模塊,這些模式支持使用開發語言連接RabbitMQ
Python連接RabbitMQ模塊有:
  1.pika主流模塊
  2.Celery分布式消息隊列
  3.Haigha提供了一個簡單的使用客戶端庫來與AMQP代理進行交互的方法


使用RabbitMQ前,首先閱讀開始文檔: http://www.rabbitmq.com/getstarted.html


簡單的發送接收實例
  默認情況下,使用同一隊列的進程,接收消息方使用輪詢的方式,依次獲取消息
  對於一條消息的接收來說,只有當接收方收到消息,並處理完消息,給RabbitMQ發送ack,隊列中的消息才會刪除
  如果在處理的過程中socket斷開,那么消息自動轉接到下一個接收方

 producer.py

__author__ = 'Cq'

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

#聲明一個管道
channel = connection.channel()

#聲明queue,這個隊列在RabbitMQ中生成,發送方和接收方使用同一個隊列
channel.queue_declare(queue='hello2', durable=True)

#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello2',#隊列名稱
                      body='Hello World!',
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      )
                    )#body消息內容
print(" [x] Sent 'Hello World!'")
connection.close()
View Code

consumer.py

__author__ = 'Cq'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()


#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
#發送方和接收方不知道誰首先連接到RabbitMQ,雙方連接上來都先聲明一個隊列
channel.queue_declare(queue='hello2', durable=True)

def callback(ch, method, properties, body):
    print("recived message...")
    # time.sleep(30)
    print(" [x] Received %r" % body)
    #處理完成消息后,主動要向RabbitMQ發送ack
    ch.basic_ack(delivery_tag=method.delivery_tag)
    #ch -->  管道內存對象的地址
    #method --> 指定各種參數
    #properties -->
    #python3 socket等發送網絡包都是byte格式

#如果隊列里還有1條消息未處理完,將不能接收新的消息
channel.basic_qos(prefetch_count=1)

#聲明接收收消息變量
channel.basic_consume(callback,#收到消息后執行的回調函數
                      queue='hello2',)
                     #no_ack=True)#執行完callback函數后,默認會發送ack給RabbitMQ

print(' [*] Waiting for messages. To exit press CTRL+C')
#開始接收消息,不停循環接收,沒有消息掛起等待
channel.start_consuming()
View Code

 

在RabbitMQ中查看當前隊列數
  1.windows中查看隊列
  在RabbitMQ安裝目錄下,sbin下有個管理工具rabbitmqctl.bat可以查看隊列和隊列中的消息數
  E:\RabbitMQ Server\rabbitmq_server-3.6.14\sbin>rabbitmqctl.bat list_queues
  Listing queues
  hello 1

 

 

消息持久化
如果當RabbitMQ服務器宕機了,不允許為處理的消息丟失時
  1.需要在聲明隊列時,聲明為持久隊列,只是隊列持久化,消息未能持久化
    channel.queue_declare(queue='hello',durable=True)

  2.需要在發送端發送消息時聲明
    channel.basic_publish(exchange='',
    routing_key='hello', #隊列名稱
    body='Hello World!', #body消息內容
    properties=pika.BasicProperties(
    delivery_mode = 2, # make message persistent
    #..這里可以添加附帶參數,客戶的通過回調函數的位置參數prop.參數名獲取
    ))

消息處理配置
  對於不同性能的機器,處理消息量大小不同
  判斷接收方消息隊列里是否有未處理的消息,如果隊列里還有1條消息未處理完,將不能接收新的消息
  channel.basic_qos(prefetch_count=1)

 

發送廣播消息
  使用exchange,exchange的類型決定如果發送廣播消息,它就是一個轉發器
    類型:
      fanout: 所有bind到此exchange的queue都可以接收消息
      direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
      topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
      headers: 通過headers 來決定把消息發給哪些queue


  fanout純廣播,只要bind到exchange的queue都能收到廣播消息
    ☆發送的消息只廣播發送一次
    channel.exchange_declare(exchange='log', type='fanout')
    channel.basic_publish(exchange='log',
    routing_key='',
    body=message)

  實例:

  fanout_producer.py

__author__ = 'Cq'
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
View Code

  fanout_consumer.py

__author__ = 'Cq'
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
#此queue名唯一,且只接收廣播消息,當不需要接收時,能自動銷毀
result = channel.queue_declare(exclusive=True)
#不需要queue名,只要綁定到轉發器就能接收消息

queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

 

  topic過濾內容廣播,隊列只接收關心的消息

  實例:

  topic_producer.py

__author__ = 'Cq'

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs1',
                         exchange_type='topic')

#默認發送的消息格式為xxx.info
severity = sys.argv[1] if len(sys.argv) > 1 else 'test_message.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs1',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
View Code

  topic_consumer.py

__author__ = 'Cq'

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs1',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='topic_logs1',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

       過濾條件設置

To receive all the logs run:
python receive_logs_topic.py "#"

To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"

You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
View Code
發送端
    python topic_producer.py xxx.info         messagexxxx
    python topic_producer.py xxx.warngin   messagexxxx
    python topic_producer.py xxx.error       messagexxxx

接收端
    python topic_consumer.py *.info
    python topic_consumer.py *.warngin
    python topic_consumer.py *.error
    python topic_consumer.py *.*
View Code

 

參考博客:http://www.cnblogs.com/alex3714/articles/5248247.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM