python使用mqtt


(1) 安装paho-mqtt包

(2) 导入mqttimport paho.mqtt.client as mqttimport threading

import json import paho.mqtt.publish as publish class Thread(threading.Thread): # 开启一个线程  def __init__(self, dat): threading.Thread.__init__(self) self.Dat = dat def run(self): c = self.Dat c.client = mqtt.Client(c.client_id) c.client.on_connect = c.on_connect # 设置连接上服务器回调函数 c.client.on_message = c.on_message # 设置接收到服务器消息回调函数 c.client.on_subscribe = c.on_subscribe c.client.on_publish = c.on_publish c.client.on_unsubscribe = c.on_unsubscribe c.client.on_disconnect = c.on_disconnect nRet = False try: c.client.connect(c.host, c.port, c.keepalive) # 连接服务器,端口为1883,维持心跳为60秒 nRet = True except Exception as e: print('MQTT errorA', e) # 打错误 c.client.loop_forever(retry_first_connection=True) class ClientProtcal(object): def __init__(self): pass def Byte(self, op): data = json.dumps(op).encode('utf-8') return data def Parse(self, buf): op = None try: op = json.loads(buf.decode('utf-8')) except Exception as e: print('error', e) return op class MqttClient(object): protcal = ClientProtcal() logName = '' def __init__(self, obj, host, port=1883, keepalive=60, bind_address="", log=None): if log: self.l = log self.name = 'Mqtt' # 类名 self.obj = obj # 对象 self.q = obj.q self.topical = obj.topical # 主题 self.client_id = obj.client_id self.host = host self.port = port self.keepalive = keepalive self.bind_address = bind_address self.client = None self.thread = Thread(self) self.thread.start() def log(self, msg): if self.l: self.l.logInfo(msg) def setName(self, name): self.name = name def on_connect(self, client, userdata, flags, rc): # 连接成功 if self.client: for t in self.topical: # topical = '%s%s' % ('', t) self.client.subscribe(t) # 订阅主题 self.log('订阅主题:[%s]' % t) def subscribes(self, topicals): for topical in topicals: self.subscribe(topical) def subscribe(self, topical): if self.client: self.client.subscribe(topical) self.log('订阅主题:[%s]' % topical) def unSubscribe(self, topical): if self.client: self.client.unsubscribe(topical) self.log('取消订阅:[%s]' % topical) def on_message(self, client, userdata, msg): # 接收到消息 op = self.protcal.Parse(msg.payload) s = '收到[%s]推送:[%s]' % (msg.topic, str(op))
     print(s)
self.q.put(op) def on_socket_close(self, client, userdata, socket): self.obj.strNetStaut = 'OffLine' self.obj.netState = 0 pass def close(self): if self.client: self.client.disconnect() def Send(self, topical, op): # 发送消息 if self.client: payload = MqttClient.protcal.Byte(op) res = self.client.publish(topical, payload) @classmethod def single(cls, stopic, payload, host, port): d = MqttClient.protcal.Byte(payload) try: publish.single(stopic, payload=d, hostname=host, client_id="lora1", port=port, protocol=mqtt.MQTTv311) MqttClient.log(MqttClient.logName, '推送主题:[%s][%s]' % (stopic, str(payload))) except Exception as e: print("publish.single err:", e)

 

(3)在需要的用到的地方导入类

mq=MqttClient(obj=self.obj,host=mqttIp,port=mqttPort,keepalive=60)

(4) 发送消息:

mq.send("topicl","message")

(5) 接受消息

class CHMTestThread(threading.Thread):  # 线程获取消息,做相应处理
    def __init__(self, dat):
        threading.Thread.__init__(self)
        self.Dat = dat
        self.RunFlag = True

    def run(self):
        obj= self.Dat 
        q = obj.q  
        while self.RunFlag:
            op = q.get()
            if op:
                obj.handle(op)

(6) 在主对象中提供一个处理函数。

class obj(object):
    def __init__():
       print("init")
    def handle(message):
       print(message)

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM