rocketmq可以與kafka等一起使用,用於實時消息處理。
安裝rocketmq:
pip install rocketmq [-i https://pypi.tuna.tsinghua.edu.cn/simple]
生產消息producer:
from rocketmq.client import Producer, Message import json producer = Producer('PID-test') producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx') #rocketmq隊列接口地址(服務器ip:port) producer.start() msg_body = {"id":"001","name":"test_mq","message":"abcdefg"} ss = json.dumps(msg_body).encode('utf-8') msg = Message('topic_name') #topic名稱 msg.set_keys('xxxxxx') msg.set_tags('xxxxxx') msg.set_body(ss) #message body retmq = producer.send_sync(msg) print(retmq.status, retmq.msg_id, retmq.offset) producer.shutdown()
其中:
- 設置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
當只有單一服務器時,格式是上面這個;
當有多個服務器地址(集群模式)時,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")
不過以下這種方式本人測試不通過:producer.set_namesrv_addr(["xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx"])
- 如果使用pandas數據,pandas數據可以直接轉換
some_df.to_json(orient='records').encode('utf-8'),然后放入body中發送。
消費消息consumer:
可以使用 PushConsumer 和 PullConsumer,同樣來自 rocketmq.client。
# 使用PullConsumer時 from rocketmq.client import PullConsumer consumer = PullConsumer('CID_test') consumer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx') consumer.start() for msg in consumer.pull('topic_name'): print(msg.id, msg.body) consumer.shutdown() # PushConsumer與此類似 from rocketmq.client import PushConsumer
注:目前rocketmq庫只支持linux和mac。
#
參考:
https://www.oschina.net/p/rocketmq-python
https://github.com/apache/rocketmq-client-python