python+rabbitMQ實現生產者和消費者模式


(一)安裝一個消息中間件,如:rabbitMQ

(二)生產者

sendmq.py

import pika
import sys
import time

# 遠程rabbitmq服務的配置信息
username = 'admin'  # 指定遠程rabbitmq的用戶名密碼
pwd = 'admin'
ip_addr = '10.1.7.7'
port_num = 5672

# 消息隊列服務的連接和隊列的創建
credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials))
channel = connection.channel()
# 創建一個名為balance的隊列,對queue進行durable持久化設為True(持久化第一步)
channel.queue_declare(queue='balance', durable=True)

message_str = 'Hello World!'
for i in range(100000000):
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(
        exchange='',
        routing_key='balance',  # 寫明將消息發送給隊列balance
        body=message_str,  # 要發送的消息
        properties=pika.BasicProperties(delivery_mode=2, )  # 設置消息持久化(持久化第二步),將要發送的消息的屬性標記為2,表示該消息要持久化
    )  # 向消息隊列發送一條消息
    print(" [%s] Sent 'Hello World!'" % i)
    # time.sleep(0.2)
connection.close()  # 關閉消息隊列服務的連接

  

運行sendmq.py文件,可以從以下方法查看隊列中的消息數量。

一是,rabbitmq的管理界面,如下圖所示:

二是,從服務器端命令查看

rabbitmqctl list_queues

(三)消費者

receivemq.py

import pika
import sys
import time

# 遠程rabbitmq服務的配置信息
username = 'admin'  # 指定遠程rabbitmq的用戶名密碼
pwd = 'admin'
ip_addr = '10.1.7.7'
port_num = 5672

credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials))
channel = connection.channel()

# 消費成功的回調函數
def callback(ch, method, properties, body):
    print(" [%s] Received %r" % (time.time(), body))
    # time.sleep(0.2)


# 開始依次消費balance隊列中的消息
channel.basic_consume(queue='balance', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 啟動消費

  

運行receivemq.py文件,可以從以下方法查看隊列中的消息數量。

或者

rabbitmqctl  list_queues

 

延伸:

systemctl status rabbitmq-server.service # 狀態
systemctl restart rabbitmq-server.service # 重啟

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM