python實現的消息隊列RocketMQ客戶端使用


rocketmq-python 是一個基於 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。

一、Producer

#coding:utf-8
import
json from rocketmq.client import Producer, Message producer = Producer('PID-001') # 實例化Producer對象,指定group-id(可任意取名) producer.set_namesrv_addr('xxxxxx:xx') # rocketmq隊列接口地址(服務器ip:port) producer.start() # 開啟 # 實例化消息對象,需要指定應用名:topic_name msg = Message('your_topic_name') # 實例化消息對象時,傳入topic名稱,一個應用盡可能用一個Topic # 指定消息的keys msg.set_keys('your_keys') # 業務層面的唯一標識碼,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。 # 指定消息tags msg.set_tags('your_tags') # 消息子類型用tags來標識,tags可以由應用自由設置。 #指定消息體(內容) msg_body = {'name':'laowang','age':28} body = json.dumps(msg_body).encode('utf-8') msg.set_body(body) # 傳入消息體(json字節串) # 向隊列發送消息 ret = producer.send_sync(msg) print(f'status:{ret.status}') # 0表示OK print(f'msg_id:{ret.msg_id}') # 消息id,同消費者獲取到的消息id print(f'offset:{ret.offset}') # 偏移量,默認從0開始,1,2。。。 producer.shutdown() # 關閉

二、PullConsumer

# coding:utf-8

from rocketmq.client import  PullConsumer

consumer = PullConsumer('CID-001') # 指定group-id
consumer.set_namesrv_addr('xxxxxx:xx') # rocketmq隊列接口地址(服務器ip:port)
consumer.start() # 開啟

# 可重復性消費
# 指定topic-name
for msg in consumer.pull('your_topic_name'):
    print(f'id:{msg.id}') # 消息id
    print(f'topic:{msg.topic}') # 消息topic_name
    print(f'tags:{msg.tags}') # 消息tags
    print(f'keys:{msg.keys}') # 消息Keys
    print(f'body:{msg.body}') # 消息體
    print('-'*25+'分隔符'+'-'*25)
    
consumer.shutdown() # 關閉

三、PushConsumer

# coding:utf-8
import time

from rocketmq.client import PushConsumer

# 回調函數,參數是消息對象
def callback(msg):
    print(msg.id, msg.body)


consumer = PushConsumer('CID_XXX') # 指定group-id
consumer.set_namesrv_addr('127.0.0.1:9887') # rocketmq隊列接口地址(服務器ip:port)
consumer.subscribe('Your_topic', callback) # 訂閱
consumer.start() # 開啟

while True:
    time.sleep(3600)

consumer.shutdown() # 關閉

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM