每天一點小進步(7):Mqtt客戶端理解


網上找了很多關於Mqtt 客戶端變成的文章,基本是翻譯官方文檔為主,從小白角度來看,這些文檔還不足以讓一個小白能夠快速上手並完成簡單客戶端的開發。

於是自己總結了一些內容,用於練習並熟練記憶。

第一步:安裝mqtt庫

pip install paho-mqtt

第二步:創建mqtt客戶端腳本

創建一個test_mqtt_client.py文件

第三步:在文件中編寫腳本

1、簡單的客戶端腳本-Mqtt服務器需要做任何校驗

import paho.mqtt.client as mqtt

MQTTHOST = "101.200.46.138"
MQTTPORT = 1883
mqttClient = mqtt.Client()
# 連接MQTT服務器
def on_mqtt_connect():
  mqttClient.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient.loop_start()
# publish 消息
def on_publish(topic, payload, qos):
  mqttClient.publish(topic, payload, qos)
# 消息處理函數
def on_message_come(lient, userdata, msg):
  print(msg.topic + " " + ":" + str(msg.payload))
# subscribe 消息
def on_subscribe():
  mqttClient.subscribe("/server", 1)
  mqttClient.on_message = on_message_come # 消息到來處理函數
def main():
  on_mqtt_connect()
  on_publish("/test/server", "Hello Python!", 1)
  on_subscribe()
  while True:
    pass
if __name__ == '__main__':
  main()

2、一般項目中使用的Mqtt服務都需要認證,以下是認證客戶端腳本

# -*- coding:utf-8 -*-

# /usr/bin/env python 

__author__ = 'Five'

import json
import time

import paho.mqtt.client as mqtt

from models.model_wx_pc import PCShow

pc = PCShow()

# 鑒權服務獲取秘鑰
def getuserTicker(uid):
	content = pc.userTicker(uid)
	return content["data"]["ext"], content["data"]["ticker"]


def on_connect(mqttc, obj, flags, rc):
	print("Connected with result code " + str(rc) + ' '+str(flags) + obj)


def on_message(mqttc, obj, msg):
	print("message: "+ msg)


def on_subscribe(mqttc, obj, mid, granted_qos):
	print(obj + "訂閱Subscribed: " + str(mid) + " " + str(granted_qos))


def on_log(mqttc, obj, level, string):
	print(obj +level+ u'日志' + string)


def on_publish(mqttc, obj, mid):
	print(obj + u"發送消息" + str(mid))


def client(host='101.200.46.138', uid="1"):
	ext, ticker = getuserTicker(uid)
	topic = 'default'
	port = 1883
	payload = {
		'cmd':'inroom',
		'msgid':1
}
        # 初始化mqtt實例時需要根據mqtt服務端具體協議版本指定protocol
        # MQTTv31 = 3,MQTTv311 = 4,MQTTv5 = 5一定要匹配,不然無法連接服務器,其他參數詳解看源碼即可
	mqttc = mqtt.Client(client_id="1", clean_session=True, protocol=3)
       # 設置mqtt實例的認證信息
	mqttc.username_pw_set(ticker, ticker)
       # 設置mqtt實例中自定義的消息內容
	mqttc.user_data_set(u'無敵是多么寂寞')
	mqttc.on_message = on_message
	mqttc.on_connect = on_connect
	mqttc.on_subscribe = on_subscribe
	mqttc.on_publish = on_publish
	mqttc.on_log=on_log
	mqttc.connect(host, port, 60)
        # 發布
	mqttc.publish(topic, payload, 0)
        # 訂閱
	mqttc.subscribe(topic, 0)
        # loop_forever調用會阻塞主線程,將永遠不會終止
	mqttc.loop_forever()
			

if __name__ == '__main__':
	client()

3、第2種方式用使用了loop_forever,導致主線程一直阻塞無法完成其他的事情,優化腳本使用loop_start()(Loop_start在另一個線程中啟動一個循環,如果您需要在主線程中執行其他操作,則讓主線程繼續運行其他內容。),並在主線中打印消息

# -*- coding:utf-8 -*-

# /usr/bin/env python 

__author__ = 'Five'

import json
import time

import paho.mqtt.client as mqtt

from models.model_wx_pc import PCShow

from queue import Queue

q = Queue()
msgs = []

pc = PCShow(1083722)


def getuserTicker(uid=607538990):
	content = PCShow().userTicker(uid)
	return content["data"]["ext"], content["data"]["ticker"]


def on_connect(mqttc, obj, flags, rc):
	print("Connected with result code " + str(rc) + ' '+str(flags) + obj)


def on_message(mqttc, obj, msg):
	# if json.loads(msg.payload.decode('utf-8'))["cmd"] == "onpwsysgiftbox":
	# 	print(json.loads(msg.payload.decode('utf-8'))['activity_ret']['data'])
	m = "message received  ", str(msg.payload.decode("utf-8"))
	msgs.append(m)
	q.put(m)


def on_subscribe(mqttc, obj, mid, granted_qos):
	print(obj + "訂閱Subscribed: " + str(mid) + " " + str(granted_qos))


def on_log(mqttc, obj, level, string):
	print(obj +level+ u'日志' + string)


def on_publish(mqttc, obj, mid):
	print(obj + u"發送消息" + str(mid))


def client(host='101.200.46.138', uid="1"):
	ext, ticker = getuserTicker(uid)
	topic = 'default'
	port = 1883
	payload = {
		"Cmd": "inroom",
		"Platform": "Android",
		"MsgId": 99,
	}
	mqttc = mqtt.Client(client_id="1", clean_session=True, protocol=3)
	mqttc.username_pw_set(ticker, ticker)
	mqttc.user_data_set(u'無敵是多么寂寞')
	mqttc.on_message = on_message
	mqttc.on_connect = on_connect
	mqttc.on_subscribe = on_subscribe
	mqttc.on_publish = on_publish
	mqttc.on_log=on_log
	mqttc.connect(host, port, 60)
	mqttc.publish(topic, payload, 0)
	mqttc.subscribe(topic, 0)
	mqttc.loop_start()
	while True:
		while len(msgs) > 0:
			print("message: ",msgs.pop(0))
		while not q.empty():
			message = q.get()
			print("queue: ", message)
if message == 'quit':
break
	mqttc.loop_stop()
if __name__ == '__main__': client()

  輸出結果如下:

 

 

 新線程中獲得消息並存儲,主線程中進行打印。實現新線程和主線程之間的通信。

你還可以在主線程中做一些其他可以做的事情。

 

mqtt其他相關資料參考:

http://www.steves-internet-guide.com/into-mqtt-python-client/

https://blog.csdn.net/youshenmebutuo/article/details/79779387


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM