python使用mqtt


(1) 安裝paho-mqtt包

(2) 導入mqttimport paho.mqtt.client as mqttimport threading

import json import paho.mqtt.publish as publish class Thread(threading.Thread): # 開啟一個線程  def __init__(self, dat): threading.Thread.__init__(self) self.Dat = dat def run(self): c = self.Dat c.client = mqtt.Client(c.client_id) c.client.on_connect = c.on_connect # 設置連接上服務器回調函數 c.client.on_message = c.on_message # 設置接收到服務器消息回調函數 c.client.on_subscribe = c.on_subscribe c.client.on_publish = c.on_publish c.client.on_unsubscribe = c.on_unsubscribe c.client.on_disconnect = c.on_disconnect nRet = False try: c.client.connect(c.host, c.port, c.keepalive) # 連接服務器,端口為1883,維持心跳為60秒 nRet = True except Exception as e: print('MQTT errorA', e) # 打錯誤 c.client.loop_forever(retry_first_connection=True) class ClientProtcal(object): def __init__(self): pass def Byte(self, op): data = json.dumps(op).encode('utf-8') return data def Parse(self, buf): op = None try: op = json.loads(buf.decode('utf-8')) except Exception as e: print('error', e) return op class MqttClient(object): protcal = ClientProtcal() logName = '' def __init__(self, obj, host, port=1883, keepalive=60, bind_address="", log=None): if log: self.l = log self.name = 'Mqtt' # 類名 self.obj = obj # 對象 self.q = obj.q self.topical = obj.topical # 主題 self.client_id = obj.client_id self.host = host self.port = port self.keepalive = keepalive self.bind_address = bind_address self.client = None self.thread = Thread(self) self.thread.start() def log(self, msg): if self.l: self.l.logInfo(msg) def setName(self, name): self.name = name def on_connect(self, client, userdata, flags, rc): # 連接成功 if self.client: for t in self.topical: # topical = '%s%s' % ('', t) self.client.subscribe(t) # 訂閱主題 self.log('訂閱主題:[%s]' % t) def subscribes(self, topicals): for topical in topicals: self.subscribe(topical) def subscribe(self, topical): if self.client: self.client.subscribe(topical) self.log('訂閱主題:[%s]' % topical) def unSubscribe(self, topical): if self.client: self.client.unsubscribe(topical) self.log('取消訂閱:[%s]' % topical) def on_message(self, client, userdata, msg): # 接收到消息 op = self.protcal.Parse(msg.payload) s = '收到[%s]推送:[%s]' % (msg.topic, str(op))
     print(s)
self.q.put(op) def on_socket_close(self, client, userdata, socket): self.obj.strNetStaut = 'OffLine' self.obj.netState = 0 pass def close(self): if self.client: self.client.disconnect() def Send(self, topical, op): # 發送消息 if self.client: payload = MqttClient.protcal.Byte(op) res = self.client.publish(topical, payload) @classmethod def single(cls, stopic, payload, host, port): d = MqttClient.protcal.Byte(payload) try: publish.single(stopic, payload=d, hostname=host, client_id="lora1", port=port, protocol=mqtt.MQTTv311) MqttClient.log(MqttClient.logName, '推送主題:[%s][%s]' % (stopic, str(payload))) except Exception as e: print("publish.single err:", e)

 

(3)在需要的用到的地方導入類

mq=MqttClient(obj=self.obj,host=mqttIp,port=mqttPort,keepalive=60)

(4) 發送消息:

mq.send("topicl","message")

(5) 接受消息

class CHMTestThread(threading.Thread):  # 線程獲取消息,做相應處理
    def __init__(self, dat):
        threading.Thread.__init__(self)
        self.Dat = dat
        self.RunFlag = True

    def run(self):
        obj= self.Dat 
        q = obj.q  
        while self.RunFlag:
            op = q.get()
            if op:
                obj.handle(op)

(6) 在主對象中提供一個處理函數。

class obj(object):
    def __init__():
       print("init")
    def handle(message):
       print(message)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM