python3 訪問 rabbitmq 示例


關於 rabbitmq

之前用過 kafka,要是拿這兩者做對比的話,大概有以下異同:

  1. 兩者都是一個分布式架構
  2. kafka 具有較高的吞吐量,rabbimq 吞吐量較小
  3. rabbitmq 的可靠性更好,確認機制(生產者和 exchange,消費者和隊列),支持事務,但會造成阻塞,委托(添加回調來處理發送失敗的消息)和備份交換器(將發送失敗的消息存下來后面再處理)機制
  4. kafka 常用於日志收集業務,rabbitmq 常用於搶購,支付業務

 

rabbitmq demo

producer

# coding: utf-8
import json
import pika

credentials = pika.PlainCredentials('sm', 'sm')
connection = pika.BlockingConnection(pika.ConnectionParameters('32.86.5.93', 5672, '/', credentials))
channel = connection.channel()

# 聲明queue,需要注意這里的配置,消費者聲明 queue 時需要與生產者保持一致
channel.queue_declare(queue='viosm', arguments={"x-max-length": 10000})


body = json.dumps({"test": "test"})
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 推送到交換機,因為隊列是一進一出的,如果推送到隊列,只能有一個消費者來獲取(來自毛絨絨胖蟲子)
ret = channel.basic_publish(exchange="smai",
                            routing_key="",
                            body=body)
print(ret)  # 返回 None
connection.close()

 

consumer

# coding: utf-8
import json
import pika

credentials = pika.PlainCredentials('sm', 'sm')
connection = pika.BlockingConnection(pika.ConnectionParameters('32.86.5.93', 5672, '/', credentials))
channel = connection.channel()

# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='viosm', arguments={"x-max-length": 10000})  # 這里需要與生產者的聲明保持一致


def msg_consumer(ch, method, properties, data_bytes):
    data_json = data_bytes.decode()
    data_dict = json.loads(data_json)
    print("{}".format(data_dict))
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動提交偏移量


channel.basic_consume('viosm',       # 隊列名
                      msg_consumer,  # 回調函數
                      consumer_tag="seemmo_consumer",
                      # auto_ack=True,  # 自動提交偏移量
                      )

channel.start_consuming()
# forever

 

 

ending ~ 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM