寫測試代碼時用到的,借助了網上許多教程也踩了很多坑。
class MQTTClient(object):
def __init__(self, host, port):
# Initialize MQTT client.
self._client = mqtt.Client() #create MQTT client
self.host = host
self.port = port
#self.topic = topic
#self.obj_id = obj_id
self.on_message = None
self.msg_topic = None
self.msg_payload = None
self._client.on_connect = self._on_connect
#self._client.on_disconnect = self._mqtt_disconnect
self._client.on_message = self._on_message
self._connected = False
self._client.on_publish = self._on_publish
def connect(self, username, password):
self._client.username_pw_set(username, password)
self._client.connect(self.host, self.port, 60)
def _on_connect(self, client, userdata, flags, rc):
logging.info("Client on_connect called.")
logging.info("Connected with result code "+str(rc))
self._client.subscribe("compass/event/monitor/e100.zxy.car9")
#self._client.on_message = self._on_message
def is_connected(self):
return self._connected
def _on_message(self, client, userdata, msg):
#logging.info("Client on_message called.")
#print(msg.topic+" "+str(msg.payload))
self.msg_topic = msg.topic
self.msg_payload = str(msg.payload)
self.on_message(self, msg.topic, str(msg.payload))
def _on_publish(self, client, userdata, result):
#logging.info("data published")
pass
'''
def _on_subscribe(self, client, userdata, result):
#logging.info("data published")
pass
'''
def loop(self, timeout_sec=0.5):
#self._client.loop_forever()
self._client.loop(timeout=timeout_sec)
'''
def on_subscribe(self, topic, obj_id, qos):
self._client.subscribe("compass/event/monitor/e100.zxy.car9", qos)
print "@@@@@@@2"
self._client.on_message = self._on_message
print "######"
'''
def on_publish(self, topic, msg, qos):
self._client.publish(topic, payload=str(msg), qos=qos)
訂閱調用:
client = MQTTClient(MQTT_HOST, MQTT_PORT)
client.connect(MQTT_USER, MQTT_PASS)
start = time.time()
while (time.time() - start) < self.TIMEOUT_SEC:
client.loop()
if client.msg_topic != None and client.msg_payload != None:
if eval(client.msg_payload)["type"] == "dcu_connection":
break
發布調用:
client = MQTTClient(MQTT_HOST, MQTT_PORT)
client.connect(MQTT_USER, MQTT_PASS)
client.on_publish("/" + CLOUD_BACKEND + "/tracking", rsu_msg, 1)