python mqtt的訂閱與發布


寫測試代碼時用到的,借助了網上許多教程也踩了很多坑。

class MQTTClient(object):
def __init__(self, host, port):
# Initialize MQTT client.
self._client = mqtt.Client() #create MQTT client
self.host = host
self.port = port
#self.topic = topic
#self.obj_id = obj_id
self.on_message = None
self.msg_topic = None
self.msg_payload = None
self._client.on_connect = self._on_connect
#self._client.on_disconnect = self._mqtt_disconnect
self._client.on_message = self._on_message
self._connected = False
self._client.on_publish = self._on_publish

def connect(self, username, password):
self._client.username_pw_set(username, password)
self._client.connect(self.host, self.port, 60)

def _on_connect(self, client, userdata, flags, rc):
logging.info("Client on_connect called.")
logging.info("Connected with result code "+str(rc))
self._client.subscribe("compass/event/monitor/e100.zxy.car9")
#self._client.on_message = self._on_message

def is_connected(self):
return self._connected

def _on_message(self, client, userdata, msg):
#logging.info("Client on_message called.")
#print(msg.topic+" "+str(msg.payload))
self.msg_topic = msg.topic
self.msg_payload = str(msg.payload)
self.on_message(self, msg.topic, str(msg.payload))

def _on_publish(self, client, userdata, result):
#logging.info("data published")
pass

'''
def _on_subscribe(self, client, userdata, result):
#logging.info("data published")
pass
'''
def loop(self, timeout_sec=0.5):
#self._client.loop_forever()
self._client.loop(timeout=timeout_sec)
'''
def on_subscribe(self, topic, obj_id, qos):
self._client.subscribe("compass/event/monitor/e100.zxy.car9", qos)
print "@@@@@@@2"
self._client.on_message = self._on_message
print "######"
'''
def on_publish(self, topic, msg, qos):
self._client.publish(topic, payload=str(msg), qos=qos)

訂閱調用:
client = MQTTClient(MQTT_HOST, MQTT_PORT)
client.connect(MQTT_USER, MQTT_PASS)
start = time.time()
while (time.time() - start) < self.TIMEOUT_SEC:
client.loop()
if client.msg_topic != None and client.msg_payload != None:
if eval(client.msg_payload)["type"] == "dcu_connection":
break
發布調用:
client = MQTTClient(MQTT_HOST, MQTT_PORT)
client.connect(MQTT_USER, MQTT_PASS)
client.on_publish("/" + CLOUD_BACKEND + "/tracking", rsu_msg, 1)




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM