rabbitMQ 生產及消費代碼示例


https://www.jianshu.com/p/95a1b33e0555
https://www.cnblogs.com/zijiyanxi/p/7648062.html

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

# durable:server掛了隊列仍然存在

channel.queue_declare(queue='task_queue', durable=True)

# 使用默認的交換機發送消息。exchange為空就使用默認的。delivery_mode=2:使消息持久化。和隊列名稱綁定routing_key

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2,

))

print(" [x] Sent %r" % message)

connection.close()

消費端:

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):

	print(" [x] Received %r" % body)

	time.sleep(body.count(b'.'))

	print(" [x] Done")

# 手動對消息進行確認

ch.basic_ack(delivery_tag=method.delivery_tag)

# basic_consume:這個函數有no_ack參數。該參數默認為false。表示:需要對message進行確認。怎么理解:no設置成false,表示要確認

channel.basic_consume(callback, queue='task_queue')

channel.start_consuming()

說明:

個人覺得rabbitMQ比較好的一點是能夠對消費的信息進行反饋,如果消費端的程序運行失敗了,還可以重復消費.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM