rocketmq安裝部署 ,運行避坑(python3)


按照rocketmq官網的快速入門,進行linux上的安裝:

  快速入門 - Apache RocketMQ

避坑:

1、rocketmq可執行命令的路徑:

  xx/xx/rocketmq/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/bin/

  /xx/xx為下載rocketmq時的存放路徑

  否則會報錯:

    apache.xxx class noload 依賴的啟動類找不到

2、后台啟動namesrv和broker

  nohup sh bin/mqnamesrv &

  nohup sh bin/mqbroker &

  使用默認的配置文件啟動broker,會導致mq服務只能在服務器本地連接,外部無法連接

  producer發消息報錯:

    rocketmq.exceptions.ProducerSendSyncFailed: No route info of this topic: xxx_topic,error:-1,in file rocketmq-client-cpp/src/producer/DefaultMQProducer.cpp line:379

  解決方法:

    1、指定配置文件/xx/xx/xx/xx/.../broker_me.conf, 在配置文件中添加以下配置:

      namesrvAddr=本地外網IP:9876

      brokerIP1=本地外網IP

      使用該命令啟動:nohup sh bin/mqbroker -c /xx/xx/xx/xx/.../broker_me.conf &

    2、服務器開放默認的namesrv端口9876、broker配置文件中的listenPort值(broker端口,默認為10911)

3、日志查看:

  namesrv:tail -f ~/logs/rocketmqlogs/namesrv.log

  broker:tail -f ~/logs/rocketmqlogs/broker.log

 

 

python3使用rocketmq庫進行生產和消費rocketmq:

  producer:

 1 # -*-coding:utf-8-*-
 2 from rocketmq.client import Producer, Message
 3 
 4 producer = Producer("producer_id")
 5 producer.set_namesrv_addr("namesrvIP:9876")
 6 producer.set_group("producer_group")
 7 print("start")
 8 producer.start()
 9 
10 def success(sendres):
11     print("success")
12     print(sendres)
13 
14 def error(error_obj):
15     print("error")
16     print(error_obj.error)
17 
18 msg = Message("xxx_topic")
19 msg.set_body("body_str")
20 # 異步發送
21 producer.send_async(msg, success, error)
22 # 同步發送
23 # ret = producer.send_sync(msg)
24 # print(ret)
25 
26 producer.shutdown()
27 print("end...")

 

  consumer:

 1 # -*-coding:utf-8-*-
 2 import time
 3 from rocketmq.client import PushConsumer
 4 
 5 
 6 def handle_message(msg):
 7     print(msg.id)
 8     print(msg.body)
 9 
10 
11 consumer = PushConsumer("push_consumer_id")
12 consumer.set_namesrv_addr("namesrvIp:9876")
13 consumer.set_group("push_consumer_group")
14 consumer.subscribe("xxx_topic", handle_message)
15 print("start")
16 consumer.start()
17 
18 while True:
19     print("wait 5s")
20     time.sleep(5)
21 
22 #consumer.shutdown()
23 #print("end.")

 

    

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM