【rocketmq-client-python】學習筆記


rocketmq-python 是一個基於 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。

rocketmq-client-python安裝
目前rocketmq庫只支持linux和mac。

rocketmq-client-python 的安裝:

pip install rocketmq
安裝太慢?國內源安裝:

pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple
示例代碼:

Producer

from rocketmq.client import Producer, Message
 
producer = Producer('PID-XXX')
producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')#rocketmq隊列接口地址(服務器ip:port)
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# producer.set_namesrv_addr('127.0.0.1:9887')
producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')#可以不使用
producer.start()
 
msg_body = {"id":"test_id","name":"test_name","message":"test_message"}
ss = json.dumps(msg_body).encode('utf-8')
 
msg = Message('YOUR-TOPIC') #topic名稱
msg.set_keys('XXX')#每個消息在業務層面的唯一標識碼,要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。
msg.set_tags('XXX')#一個應用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。
msg.set_body(ss)
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()

其中:

設置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
當只有單一服務器時,格式是上面這個;

當有多個服務器地址(集群模式)時,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")

如果使用pandas數據,pandas數據可以直接轉換
df.to_json(orient='records').encode('utf-8'),然后放入body中發送。

不同應用的多個Topic使用同一個namesrv_addr時數據傳輸會發生沖突
解決方案:每一個Topic對應一個 “PID-XXX”


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM