python淺學【網絡服務中間件】之RabbitMQ


一、關於AMQP:

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議。

AMQP使符合要求的客戶端應用程序能夠與符合要求的消息傳遞中間件代理進行通信。

AMQP是一種新的消息傳遞中間件開放標准。它解決了如何跨LANS和WANS連接應用程序的問題。在AMQP之前,沒有相互連接應用程序的標准,這是大型軟件體系結構中比較昂貴的部分之一。AMQP是一個線級協議,外加一個用於路由和消息排隊的模型。它包括非常高性能的發布-訂閱(通過單個代理的速度可達每秒150k條消息)和高可靠性的消息傳遞(無論如何都保證交付)。

AMQP是一種可編程的協議:

定義了三種實體(對象)
  queues, exchanges, bindings
  queues :隊列,存儲信息,用於消費
  exchanges : 消息中轉站,包含多種類型
  bindings :消息轉發規則,定義了route,規定怎么把消息發送到隊列

 

 

 

 

 

二、關於RabbitMQ:

RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而群集和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。

消息代理的應用:

  (1)將消息路由到一個或多個目的地
  (2)將消息轉換為替代表示
  (3)執行消息聚合,將消息分解為多個消息並將其發送到目的地,然后將響應重新組合成一條消息以返回給用戶
  (4)與外部的存儲庫交互以擴充消息或存儲消息
  (5)調用web服務以檢索數據
  (6)回應事件或錯誤
  (7)使用發布訂閱模式提供內容和基於主題的消息路由

 

三、RabbitMQ交換機exchanges的四種類型:

exchanges 的屬性:
  name 名字
  durability 持久化
  auto-delete 自動刪除(所有隊列都解除綁定的時候)

 

exchanges 類型: direct exchange,fanout exchange,topic exchange,headers exchange

1、direct exchange:
  queue創建時,綁定一個同名的routing key
  用途:把任務分配給多個workers, 每個work做特定工作,比如寫日志

 

 

 2、fanout exchange:
  傳遞消息到每一個queue,忽略routeing key
  用途:多處記錄日志,統一發布通知,球賽更新比分, 分布式系統更新配置信息

 

 

3、topic exchange:
  根據規則匹配相應的queue, 實現訂閱發布
  用途:根據不同標簽更新新聞,根據位置信息提供商品

 

 

4、Headers Exchange
  根據多個屬性當做消息頭,忽略routing key, 需要開發者定義更多內容
  用途:當direct 和routeing key 不是字符串時,可使用這個自定義屬性匹配

 

 

 

四、python簡單操作RabbitMQ:

client端:消費信息

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# msg_worker.py
# Author:Riy


import time
import pika


# 連接服務
config = {
    "username": "root",
    "password": "root",
    "host": "127.0.0.1",
    "port": ""
}
creds = pika.PlainCredentials(config["username"], config["password"])
params = pika.ConnectionParameters(config["host"], credentials=creds)
connection = pika.BlockingConnection(params)

# 創建隊列
channel = connection.channel()
channel.queue_declare(queue="msg_queue", durable=True)

# 接收,消費消息
def callback(ch, method, properties, body):
    print(f"收到消息:{body}")
    time.sleep(body.count(b"-"))
    print('ok')
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 均衡任務
channel.basic_qos(prefetch_count=1)
channel.basic_consume("msg_queue", callback)

channel.start_consuming()

server端:發送信息

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# msg_pub.py
# Author:Riy


import sys
import pika


# 連接服務
config = {
    "username": "root",
    "password": "root",
    "host": "127.0.0.1",
    "port": ""
}
creds = pika.PlainCredentials(config["username"], config["password"])
params = pika.ConnectionParameters(config["host"], credentials=creds)
connection = pika.BlockingConnection(params)

# 創建隊列
channel = connection.channel()
channel.queue_declare(queue="msg_queue", durable=True)

# 發送消息
msg = " ".join(sys.argv[1:]) or "new msg from Riy"
channel.basic_publish(exchange="", 
                    routing_key="msg_queue",
                    body=msg,
                    properties=pika.BasicProperties(
                        delivery_mode=2 # TODO
                    ))

print(f"發送消息:{msg}")
connection.close()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM