RabbitMQ 消息隊列
成熟的中間件RabbitMQ、ZeroMQ、ActiveMQ等等
RabbitMQ使用erlang語言開發,使用RabbitMQ前要安裝erlang語言
RabbitMQ允許不同應用、程序間交互數據
python中的Threading queue只能允許單進程內多線程交互的
python中的MultiProcessing queue只能允許父進程與子進程或同父進程的多個子進程交互
RabbitMQ啟動:
1.windows中默認安裝成功,在服務列表中會顯示自動啟動
2.Linux中使用命令rabbitmq-server start
RabbitMQ支持不同的語言,對於不同語言有相應的模塊,這些模式支持使用開發語言連接RabbitMQ
Python連接RabbitMQ模塊有:
1.pika主流模塊
2.Celery分布式消息隊列
3.Haigha提供了一個簡單的使用客戶端庫來與AMQP代理進行交互的方法
使用RabbitMQ前,首先閱讀開始文檔: http://www.rabbitmq.com/getstarted.html
簡單的發送接收實例
默認情況下,使用同一隊列的進程,接收消息方使用輪詢的方式,依次獲取消息
對於一條消息的接收來說,只有當接收方收到消息,並處理完消息,給RabbitMQ發送ack,隊列中的消息才會刪除
如果在處理的過程中socket斷開,那么消息自動轉接到下一個接收方
producer.py
__author__ = 'Cq' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) #聲明一個管道 channel = connection.channel() #聲明queue,這個隊列在RabbitMQ中生成,發送方和接收方使用同一個隊列 channel.queue_declare(queue='hello2', durable=True) #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello2',#隊列名稱 body='Hello World!', properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ) )#body消息內容 print(" [x] Sent 'Hello World!'") connection.close()
consumer.py
__author__ = 'Cq' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. #發送方和接收方不知道誰首先連接到RabbitMQ,雙方連接上來都先聲明一個隊列 channel.queue_declare(queue='hello2', durable=True) def callback(ch, method, properties, body): print("recived message...") # time.sleep(30) print(" [x] Received %r" % body) #處理完成消息后,主動要向RabbitMQ發送ack ch.basic_ack(delivery_tag=method.delivery_tag) #ch --> 管道內存對象的地址 #method --> 指定各種參數 #properties --> #python3 socket等發送網絡包都是byte格式 #如果隊列里還有1條消息未處理完,將不能接收新的消息 channel.basic_qos(prefetch_count=1) #聲明接收收消息變量 channel.basic_consume(callback,#收到消息后執行的回調函數 queue='hello2',) #no_ack=True)#執行完callback函數后,默認會發送ack給RabbitMQ print(' [*] Waiting for messages. To exit press CTRL+C') #開始接收消息,不停循環接收,沒有消息掛起等待 channel.start_consuming()
在RabbitMQ中查看當前隊列數
1.windows中查看隊列
在RabbitMQ安裝目錄下,sbin下有個管理工具rabbitmqctl.bat可以查看隊列和隊列中的消息數
E:\RabbitMQ Server\rabbitmq_server-3.6.14\sbin>rabbitmqctl.bat list_queues
Listing queues
hello 1
消息持久化
如果當RabbitMQ服務器宕機了,不允許為處理的消息丟失時
1.需要在聲明隊列時,聲明為持久隊列,只是隊列持久化,消息未能持久化
channel.queue_declare(queue='hello',durable=True)
2.需要在發送端發送消息時聲明
channel.basic_publish(exchange='',
routing_key='hello', #隊列名稱
body='Hello World!', #body消息內容
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
#..這里可以添加附帶參數,客戶的通過回調函數的位置參數prop.參數名獲取
))
消息處理配置
對於不同性能的機器,處理消息量大小不同
判斷接收方消息隊列里是否有未處理的消息,如果隊列里還有1條消息未處理完,將不能接收新的消息
channel.basic_qos(prefetch_count=1)
發送廣播消息
使用exchange,exchange的類型決定如果發送廣播消息,它就是一個轉發器
類型:
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
headers: 通過headers 來決定把消息發給哪些queue
fanout純廣播,只要bind到exchange的queue都能收到廣播消息
☆發送的消息只廣播發送一次
channel.exchange_declare(exchange='log', type='fanout')
channel.basic_publish(exchange='log',
routing_key='',
body=message)
實例:
fanout_producer.py
__author__ = 'Cq' import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
fanout_consumer.py
__author__ = 'Cq' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 #此queue名唯一,且只接收廣播消息,當不需要接收時,能自動銷毀 result = channel.queue_declare(exclusive=True) #不需要queue名,只要綁定到轉發器就能接收消息 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
topic過濾內容廣播,隊列只接收關心的消息
實例:
topic_producer.py
__author__ = 'Cq' import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs1', exchange_type='topic') #默認發送的消息格式為xxx.info severity = sys.argv[1] if len(sys.argv) > 1 else 'test_message.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs1', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message))
topic_consumer.py
__author__ = 'Cq' import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs1', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='topic_logs1', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
過濾條件設置
To receive all the logs run: python receive_logs_topic.py "#" To receive all logs from the facility "kern": python receive_logs_topic.py "kern.*" Or if you want to hear only about "critical" logs: python receive_logs_topic.py "*.critical" You can create multiple bindings: python receive_logs_topic.py "kern.*" "*.critical" And to emit a log with a routing key "kern.critical" type: python emit_log_topic.py "kern.critical" "A critical kernel error"
發送端 python topic_producer.py xxx.info messagexxxx python topic_producer.py xxx.warngin messagexxxx python topic_producer.py xxx.error messagexxxx 接收端 python topic_consumer.py *.info python topic_consumer.py *.warngin python topic_consumer.py *.error python topic_consumer.py *.*
參考博客:http://www.cnblogs.com/alex3714/articles/5248247.html
