(1) 安裝paho-mqtt包
(2) 導入mqttimport paho.mqtt.client as mqttimport threading
import json import paho.mqtt.publish as publish class Thread(threading.Thread): # 開啟一個線程 def __init__(self, dat): threading.Thread.__init__(self) self.Dat = dat def run(self): c = self.Dat c.client = mqtt.Client(c.client_id) c.client.on_connect = c.on_connect # 設置連接上服務器回調函數 c.client.on_message = c.on_message # 設置接收到服務器消息回調函數 c.client.on_subscribe = c.on_subscribe c.client.on_publish = c.on_publish c.client.on_unsubscribe = c.on_unsubscribe c.client.on_disconnect = c.on_disconnect nRet = False try: c.client.connect(c.host, c.port, c.keepalive) # 連接服務器,端口為1883,維持心跳為60秒 nRet = True except Exception as e: print('MQTT errorA', e) # 打錯誤 c.client.loop_forever(retry_first_connection=True) class ClientProtcal(object): def __init__(self): pass def Byte(self, op): data = json.dumps(op).encode('utf-8') return data def Parse(self, buf): op = None try: op = json.loads(buf.decode('utf-8')) except Exception as e: print('error', e) return op class MqttClient(object): protcal = ClientProtcal() logName = '' def __init__(self, obj, host, port=1883, keepalive=60, bind_address="", log=None): if log: self.l = log self.name = 'Mqtt' # 類名 self.obj = obj # 對象 self.q = obj.q self.topical = obj.topical # 主題 self.client_id = obj.client_id self.host = host self.port = port self.keepalive = keepalive self.bind_address = bind_address self.client = None self.thread = Thread(self) self.thread.start() def log(self, msg): if self.l: self.l.logInfo(msg) def setName(self, name): self.name = name def on_connect(self, client, userdata, flags, rc): # 連接成功 if self.client: for t in self.topical: # topical = '%s%s' % ('', t) self.client.subscribe(t) # 訂閱主題 self.log('訂閱主題:[%s]' % t) def subscribes(self, topicals): for topical in topicals: self.subscribe(topical) def subscribe(self, topical): if self.client: self.client.subscribe(topical) self.log('訂閱主題:[%s]' % topical) def unSubscribe(self, topical): if self.client: self.client.unsubscribe(topical) self.log('取消訂閱:[%s]' % topical) def on_message(self, client, userdata, msg): # 接收到消息 op = self.protcal.Parse(msg.payload) s = '收到[%s]推送:[%s]' % (msg.topic, str(op))
print(s)
self.q.put(op) def on_socket_close(self, client, userdata, socket): self.obj.strNetStaut = 'OffLine' self.obj.netState = 0 pass def close(self): if self.client: self.client.disconnect() def Send(self, topical, op): # 發送消息 if self.client: payload = MqttClient.protcal.Byte(op) res = self.client.publish(topical, payload) @classmethod def single(cls, stopic, payload, host, port): d = MqttClient.protcal.Byte(payload) try: publish.single(stopic, payload=d, hostname=host, client_id="lora1", port=port, protocol=mqtt.MQTTv311) MqttClient.log(MqttClient.logName, '推送主題:[%s][%s]' % (stopic, str(payload))) except Exception as e: print("publish.single err:", e)
(3)在需要的用到的地方導入類
mq=MqttClient(obj=self.obj,host=mqttIp,port=mqttPort,keepalive=60)
(4) 發送消息:
mq.send("topicl","message")
(5) 接受消息
class CHMTestThread(threading.Thread): # 線程獲取消息,做相應處理 def __init__(self, dat): threading.Thread.__init__(self) self.Dat = dat self.RunFlag = True def run(self): obj= self.Dat q = obj.q while self.RunFlag: op = q.get() if op: obj.handle(op)
(6) 在主對象中提供一個處理函數。
class obj(object): def __init__(): print("init") def handle(message): print(message)