python代碼實現
安裝:pip install paho-mqtt
實現Publish-發送消息:

1 #!/usr/bin/env python 2 # encoding: utf-8 3 """ 4 @version: v1.0 5 @author: W_H_J 6 @license: Apache Licence 7 @contact: 415900617@qq.com 8 @software: PyCharm 9 @file: clicentMqttTest.py 10 @time: 2019/2/22 14:19 11 @describe: mqtt客戶端 12 """ 13 import json 14 import sys 15 import os 16 import paho.mqtt.client as mqtt 17 import time 18 19 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) 20 sys.path.append("..") 21 22 TASK_TOPIC = 'test' # 客戶端發布消息主題 23 24 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) 25 """ 26 client_id是連接到代理。如果client_id的長度為零或為零,則行為為由使用的協議版本定義。如果使用MQTT v3.1.1, 27 那么一個零長度的客戶機id將被發送到代理,代理將被發送為客戶端生成一個隨機變量。如果使用MQTT v3.1,那么id將是 28 隨機生成的。在這兩種情況下,clean_session都必須為True。如果這在這種情況下不會產生ValueError。 29 注意:一般情況下如果客戶端服務端啟用兩個監聽那么客戶端client_id 不能與服務器相同,如這里用時間"20190222142358"作為它的id, 30 如果與服務器id相同,則無法接收到消息 31 """ 32 client = mqtt.Client(client_id, transport='tcp') 33 34 client.connect("127.0.0.1", 1883, 60) # 此處端口默認為1883,通信端口期keepalive默認60 35 client.loop_start() 36 37 38 def clicent_main(message: str): 39 """ 40 客戶端發布消息 41 :param message: 消息主體 42 :return: 43 """ 44 time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time())) 45 payload = {"msg": "%s" % message, "data": "%s" % time_now} 46 # publish(主題:Topic; 消息內容) 47 client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False)) 48 print("Successful send message!") 49 return True 50 51 52 if __name__ == '__main__': 53 msg = "我是一條測試數據!" 54 clicent_main(msg)
實現Subscribe-訂閱

1 #!/usr/bin/env python 2 # encoding: utf-8 3 """ 4 @version: v1.0 5 @author: W_H_J 6 @license: Apache Licence 7 @contact: 415900617@qq.com 8 @software: PyCharm 9 @file: serverMqttTest.py 10 @time: 2019/2/22 14:35 11 @describe: mqtt 服務端 12 """ 13 import json 14 import sys 15 import os 16 import time 17 import paho.mqtt.client as mqtt 18 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) 19 sys.path.append("..") 20 21 REPORT_TOPIC = 'test' # 主題 22 23 24 def on_connect(client, userdata, flags, rc): 25 print('connected to mqtt with resurt code ', rc) 26 client.subscribe(REPORT_TOPIC) # 訂閱主題 27 28 29 def on_message(client, userdata, msg): 30 """ 31 接收客戶端發送的消息 32 :param client: 連接信息 33 :param userdata: 34 :param msg: 客戶端返回的消息 35 :return: 36 """ 37 print("Start server!") 38 payload = json.loads(msg.payload.decode('utf-8')) 39 print(payload) 40 41 42 def server_conenet(client): 43 client.on_connect = on_connect # 啟用訂閱模式 44 client.on_message = on_message # 接收消息 45 client.connect("127.0.0.1", 1883, 60) # 鏈接 46 # client.loop_start() # 以start方式運行,需要啟動一個守護線程,讓服務端運行,否則會隨主線程死亡 47 client.loop_forever() # 以forever方式阻塞運行。 48 49 50 def server_stop(client): 51 client.loop_stop() # 停止服務端 52 sys.exit(0) 53 54 55 def server_main(): 56 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) 57 client = mqtt.Client(client_id, transport='tcp') 58 server_conenet(client) 59 60 61 if __name__ == '__main__': 62 # 啟動監聽 63 server_main()
--------------------- 原作者寫得非常好,記下來,怕自己以后找不到
作者:涼城的夜
來源:CSDN
原文:https://blog.csdn.net/sinat_32651363/article/details/87876978
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!