rocketmq-python 是一個基於 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。
一、Producer
#coding:utf-8
import json from rocketmq.client import Producer, Message producer = Producer('PID-001') # 實例化Producer對象,指定group-id(可任意取名) producer.set_namesrv_addr('xxxxxx:xx') # rocketmq隊列接口地址(服務器ip:port) producer.start() # 開啟 # 實例化消息對象,需要指定應用名:topic_name msg = Message('your_topic_name') # 實例化消息對象時,傳入topic名稱,一個應用盡可能用一個Topic # 指定消息的keys msg.set_keys('your_keys') # 業務層面的唯一標識碼,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。 # 指定消息tags msg.set_tags('your_tags') # 消息子類型用tags來標識,tags可以由應用自由設置。 #指定消息體(內容) msg_body = {'name':'laowang','age':28} body = json.dumps(msg_body).encode('utf-8') msg.set_body(body) # 傳入消息體(json字節串) # 向隊列發送消息 ret = producer.send_sync(msg) print(f'status:{ret.status}') # 0表示OK print(f'msg_id:{ret.msg_id}') # 消息id,同消費者獲取到的消息id print(f'offset:{ret.offset}') # 偏移量,默認從0開始,1,2。。。 producer.shutdown() # 關閉
二、PullConsumer
# coding:utf-8 from rocketmq.client import PullConsumer consumer = PullConsumer('CID-001') # 指定group-id consumer.set_namesrv_addr('xxxxxx:xx') # rocketmq隊列接口地址(服務器ip:port) consumer.start() # 開啟 # 可重復性消費 # 指定topic-name for msg in consumer.pull('your_topic_name'): print(f'id:{msg.id}') # 消息id print(f'topic:{msg.topic}') # 消息topic_name print(f'tags:{msg.tags}') # 消息tags print(f'keys:{msg.keys}') # 消息Keys print(f'body:{msg.body}') # 消息體 print('-'*25+'分隔符'+'-'*25) consumer.shutdown() # 關閉
三、PushConsumer
# coding:utf-8 import time from rocketmq.client import PushConsumer # 回調函數,參數是消息對象 def callback(msg): print(msg.id, msg.body) consumer = PushConsumer('CID_XXX') # 指定group-id consumer.set_namesrv_addr('127.0.0.1:9887') # rocketmq隊列接口地址(服務器ip:port) consumer.subscribe('Your_topic', callback) # 訂閱 consumer.start() # 開啟 while True: time.sleep(3600) consumer.shutdown() # 關閉
