python3和MQTT消息


1.MQTT介紹

MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。

 

 

2.Mqtt實現方式:

3.實現協議需要

 

實現MQTT協議需要:客戶端和服務器端

 

MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。

 

MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分

Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload)

payload,可以理解為消息的內容,是指訂閱者具體要使用的內容

 

 

4.具體應用

 

使用token調用這個接口獲取mqtt連接信息,使用接口返回的loadBalanceuserIdtoken連接mqtt,訂閱/sys/{userId}/notify這個topic,收到的消息體內容是就是消息下發的內容

 

 

5.具體代碼

拆解第一步:獲取mqtt連接信息

def get_md5_passwd(password):

    """

    獲取密碼的md5

    """

    my_md5 = hashlib.md5()  # 獲取一個MD5的加密算法對象

    my_md5.update(password.encode("utf-8"))  # 得到MD5消息摘要

    my_md5_Digest = my_md5.hexdigest()  # 以16進制返回消息摘要,32位

    return my_md5_Digest

 

 

def get_user_token(number='13530540284', passwd='zhu942100'):

    """

    根據賬號密碼得到token

    """

    url = 'https://www-sit.tcljd.com/auth/auth/login' 

    body = {

        "channel": "1",

        "deviceId": "",

        "password": get_md5_passwd(passwd),

        "username": number

    }

res = requests.post(url=url, json=body, verify=False)

return res.json()['accessToken']

 

accessToken = get_user_token()

 

 

def topicsss():

    """

    topic獲取方式 是采用tcl+tokenssss 來獲取

    https://www.zx.tcljd.com/project/298/interface/api/13897

        """

    url = host + '/v1/auth/service/loadBalance'

    header = {

        'Content-Type': 'application/json',

        'accessToken': accessToken  # 7天有效期

    }

    result = requests.get(url=url, verify=False, headers=header)

    # print('獲取到的balance:{} useid:%s'.format(result.json()['data']['loadBalance']) % (result.json()['data']['userId']))

    return result.json()['data']['loadBalance'], result.json()['data']['userId'], accessToken

 

 

拆解第二步:連接mqtt 並進行topic訂閱

 

broker, user, pwd = topicsss()

broker = broker[6:30]

port = 1883

topic = "/sys/5219709/notify"

client_id = '5219709@' + str(random.randint(1, 9999))

 

 

def connect_mqtt() -> mqtt_client:

    def on_connect(client, userdata, flags, rc):

        if rc == 0:

            print("Connected to MQTT Broker! %s" % rc)

        else:

            print("Failed to connect, return code %d\n", rc)

 

    client = mqtt_client.Client(client_id)

    client.username_pw_set(user, pwd)

    client.on_connect = on_connect

    # 配置服務器證書驗證

    client.tls_set(tls_version=ssl.PROTOCOL_TLSv1_2)

    client.tls_insecure_set(True)

    client.connect(broker, port)

    return client

 

 

def subscribe(client: mqtt_client):

    def on_message(client, userdata, msg):

        mylog = open('D:\push\/recode.log', mode='a', encoding='utf-8')

        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic", file=mylog)

        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

        mylog.close()

 

    client.subscribe(topic)

    client.on_message = on_message

 

 

def run():

    client = connect_mqtt()

    client.username_pw_set(user, pwd)

    subscribe(client)

    client.loop_forever()

 

 

if __name__ == '__main__':

    run()

 

 

6.下發消息,結果查看

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM