基於MQTT(EMQ)的消息發布訂閱-python實現


python代碼實現 

安裝:pip install paho-mqtt

實現Publish-發送消息:

 1 #!/usr/bin/env python  
 2 # encoding: utf-8  
 3 """ 
 4 @version: v1.0 
 5 @author: W_H_J 
 6 @license: Apache Licence  
 7 @contact: 415900617@qq.com 
 8 @software: PyCharm 
 9 @file: clicentMqttTest.py 
10 @time: 2019/2/22 14:19 
11 @describe: mqtt客戶端
12 """
13 import json
14 import sys
15 import os
16 import paho.mqtt.client as mqtt
17 import time
18  
19 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
20 sys.path.append("..")
21  
22 TASK_TOPIC = 'test'  # 客戶端發布消息主題
23  
24 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
25 """
26 client_id是連接到代理。如果client_id的長度為零或為零,則行為為由使用的協議版本定義。如果使用MQTT v3.1.1,
27 那么一個零長度的客戶機id將被發送到代理,代理將被發送為客戶端生成一個隨機變量。如果使用MQTT v3.1,那么id將是
28 隨機生成的。在這兩種情況下,clean_session都必須為True。如果這在這種情況下不會產生ValueError。
29 注意:一般情況下如果客戶端服務端啟用兩個監聽那么客戶端client_id 不能與服務器相同,如這里用時間"20190222142358"作為它的id,
30 如果與服務器id相同,則無法接收到消息
31 """
32 client = mqtt.Client(client_id, transport='tcp')
33  
34 client.connect("127.0.0.1", 1883, 60)  # 此處端口默認為1883,通信端口期keepalive默認60
35 client.loop_start()
36  
37  
38 def clicent_main(message: str):
39     """
40     客戶端發布消息
41     :param message: 消息主體
42     :return:
43     """
44     time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
45     payload = {"msg": "%s" % message, "data": "%s" % time_now}
46     # publish(主題:Topic; 消息內容)
47     client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))
48     print("Successful send message!")
49     return True
50  
51  
52 if __name__ == '__main__':
53     msg = "我是一條測試數據!"
54     clicent_main(msg)
View Code

實現Subscribe-訂閱

 1 #!/usr/bin/env python  
 2 # encoding: utf-8  
 3 """ 
 4 @version: v1.0 
 5 @author: W_H_J 
 6 @license: Apache Licence  
 7 @contact: 415900617@qq.com 
 8 @software: PyCharm 
 9 @file: serverMqttTest.py 
10 @time: 2019/2/22 14:35 
11 @describe: mqtt 服務端
12 """
13 import json
14 import sys
15 import os
16 import time
17 import paho.mqtt.client as mqtt
18 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
19 sys.path.append("..")
20  
21 REPORT_TOPIC = 'test'  # 主題
22  
23  
24 def on_connect(client, userdata, flags, rc):
25     print('connected to mqtt with resurt code ', rc)
26     client.subscribe(REPORT_TOPIC)  # 訂閱主題
27  
28  
29 def on_message(client, userdata, msg):
30     """
31     接收客戶端發送的消息
32     :param client: 連接信息
33     :param userdata: 
34     :param msg: 客戶端返回的消息
35     :return: 
36     """
37     print("Start server!")
38     payload = json.loads(msg.payload.decode('utf-8'))
39     print(payload)
40  
41  
42 def server_conenet(client):
43     client.on_connect = on_connect  # 啟用訂閱模式
44     client.on_message = on_message  # 接收消息
45     client.connect("127.0.0.1", 1883, 60)  # 鏈接
46     # client.loop_start()   # 以start方式運行,需要啟動一個守護線程,讓服務端運行,否則會隨主線程死亡
47     client.loop_forever()   # 以forever方式阻塞運行。
48  
49  
50 def server_stop(client):
51     client.loop_stop()  # 停止服務端
52     sys.exit(0)
53  
54  
55 def server_main():
56     client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
57     client = mqtt.Client(client_id, transport='tcp')
58     server_conenet(client)
59  
60  
61 if __name__ == '__main__':
62     # 啟動監聽
63     server_main()
View Code

--------------------- 原作者寫得非常好,記下來,怕自己以后找不到

作者:涼城的夜
來源:CSDN
原文:https://blog.csdn.net/sinat_32651363/article/details/87876978
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM