---恢復內容開始---
python RabbitMQ隊列使用
關於python的queue介紹
關於python的隊列,內置的有兩種,一種是線程queue,另一種是進程queue,但是這兩種queue都是只能在同一個進程下的線程間或者父進程與子進程之間進行隊列通訊,並不能進行程序與程序之間的信息交換,這時候我們就需要一個中間件,來實現程序之間的通訊。
RabbitMQ
MQ並不是python內置的模塊,而是一個需要你額外安裝(ubunto可直接apt-get其余請自行百度。)的程序,安裝完畢后可通過python中內置的pika模塊來調用MQ發送或接收隊列請求。接下來我們就看幾種python調用MQ的模式(作者自定義中文形象的模式名稱)與方法。
RabbitMQ設置遠程鏈接賬號密碼
啟動rabbitmq web服務:
2.遠程訪問rabbitmq:自己增加一個用戶,步驟如下:
l1. 創建一個admin用戶:sudo rabbitmqctl add_user admin 123123
l2. 設置該用戶為administrator角色:sudo rabbitmqctl set_user_tags admin administrator
l3. 設置權限:sudo rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
l4. 重啟rabbitmq服務:sudo service rabbitmq-server restart
之后就能用admin用戶遠程連接rabbitmq server了。
輪詢消費模式
此模式下,發送隊列的一方把消息存入mq的指定隊列后,若有消費者端聯入相應隊列,即會獲取到消息,並且隊列中的消息會被消費掉。
若有多個消費端同時連接着隊列,則會已輪詢的方式將隊列中的消息消費掉。
接下來是代碼實例:
producer生產者
# !/usr/bin/env python import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # 聲明queue channel.queue_declare(queue='balance') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='balance', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
發送過隊列后,可在MQ服務器中查看隊列狀態
[root@localhost ~]# rabbitmqctl list_queues Listing queues ... hello 1
consumer消費者
# _*_coding:utf-8_*_ __author__ = 'Alex Li' import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='balance') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='balance', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
接收隊列后,查看一下隊列狀態
[root@localhost ~]# rabbitmqctl list_queues Listing queues ... hello 0
隊列持久化
當rabbitMQ意外宕機時,可能會有持久化保存隊列的需求(隊列中的消息不消失)。
producer
# Cheng # !/usr/bin/env python import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # 聲明queue channel.queue_declare(queue='durable',durable=True) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='durable', body='Hello cheng!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent 'Hello cheng!'") connection.close()
執行后查看隊列,記下隊列名字與隊列中所含消息的數量
[root@localhost ~]# rabbitmqctl list_queues Listing queues ... durable 1
#重啟rabbitmq
[root@localhost ~]# systemctl restart rabbitmq-server
#重啟完畢后再次查看 [root@localhost ~]# rabbitmqctl list_queues Listing queues ... durable #隊列以及消息並未消失
執行消費者代碼
cunsumer
# Cheng # _*_coding:utf-8_*_ __author__ = 'Alex Li' import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='durable',durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='durable', #no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
可正確接收到信息。
再次查看隊列的情況。
[root@localhost ~]# rabbitmqctl list_queues Listing queues ... durable 0
廣播模式
當producer發送消息到隊列后,所有的consumer都會收到消息,需要注意的是,此模式下producer與concerned之間的關系類似與廣播電台與收音機,如果廣播后收音機沒有接受到,那么消息就會丟失。
建議先執行concerned
concerned
# _*_coding:utf-8_*_ __author__ = 'Alex Li' import pika credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() channel.exchange_declare(exchange='Clogs', type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='Clogs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
producer
import pika import sys credentials = pika.PlainCredentials('admin','123456') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.56.19',5672,'/',credentials)) channel = connection.channel() channel.exchange_declare(exchange='Clogs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='Clogs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()