一、概述
一)基本概念
使用回調處理從MQTT代理返回的數據,要使用回調需要先定義回調函數然后將其指派給客戶端實例(client)。
例如:
# 定義一個回調函數
def on_connect(client, userdata, flags, rc):
print("Connection returned " + str(rc))
# 將回調函數指派給客戶端實例
client.on_connect = on_connect
所有的回調函數都有client和userdata參數。
client是調用回調的客戶端實例;
userdata可以使任何類型的用戶數據,可以在創建新客戶端實例時設置或者使用user_data_set(userdata)設置。
二)paho-mqtt總的說來分為三部分:
**種類:**
1.服務器連接on_connect()/服務器斷開 on_disconnect()
2.信息的回調 on_message()
3.信息的發布on_publish()/信息的訂閱on_subscribe()
**介紹:**
1. 使用connect()/connect_async() 連接MQTT代理
2. 頻繁的調用loop()來維持與MQTT代理之間的流量
2.1. 或者使用loop_start()來設置一個線程為你調用loop()
2.2. 或者在一個阻塞的函數中調用loop_forever()來為你調用loop()
3.使用subscribe()訂閱一個主題(topic)並接受消息(messages)
4.使用publish()來發送消息
5.使用disconnect()來斷開與MQTT代理的連接
二、paho-mqtt 在Python中的安裝方法
pip install paho-mqtt
三、on_connect()回調函數介紹
當代理響應連接請求時調用。
on_connect(client, userdata, flags, rc):
rc的值決定了連接成功或者不成功:
值 連接情況
0 連接成功
1 協議版本錯誤
2 無效的客戶端標識
3 服務器無法使用
4 錯誤的用戶名或密碼
5 未經授權
import paho.mqtt.client as mqtt
#定義一個on_connect方法
def on_connect(client,userdata,flags,rc):
return str(rc)
class IotSubDevViewSet(viewsets.ModelViewSet):
#系統啟動后,會把SUBSCRIBED狀態的設備加入訂閱進程
def init_subscribe():
iotsubdevs = IotSubDev.objects.all()
for iotsubdev in iotsubdevs:
try:
devices_pk = iotsubdev.device.id
client = mqtt.Client()
client.username_pw_set(username=settings.MQTT_USERNAME, password=settings.MQTT_PASSWORD)#設置mqtt服務器用戶名和密碼
client.on_connect = on_connect
client.on_message = on_message
rc = client.connect(settings.MQTT_HOST, port=1883, keepalive=60)
if(rc==0 and iotsubdev.status=="SUBSCRIBED"):
print("初始化開始sub")
client.subscribe(topic=str(devices_pk),qos=0)
client.loop_start()
print("初始化sub結束")
else:
# print("連接失敗")
pass
except:
pass
init_subscribe()
三、on_message()回調函數介紹
import json
def on_message(client, userdata, msg):
msg = msg.payload #將信息轉換成json格式
try:
params = json.loads(msg)
except:
return (False)
#Beilai BL102
if tmp =="sensorDatas":
for dc_tmp in params[tmp]:
print(dc_tmp)
timestamp = datetime.now()
try:
ctrlchannel = CtrlChannel.objects.filter(id=dc_tmp['flag']).first()
metricdata = MetricData(ctrlchannel=ctrlchannel,
timestamp=timestamp,
value=dc_tmp['value'],
direction="UP")
metricdata.save()
except:
pass
else:
tmp = "Wrong Parameters"
return tmp
print("Subscribed is OK")
return True
四、on_publish()回調函數介紹
import paho.mqtt.publish as publish
class MetricDataViewSet(viewsets.ModelViewSet):
""""
list:
查詢數據點信息列表
create:
創建數據點信息
如果方向為DOWN,支持MQTT發布信息
retrieve:
查詢數據點信息詳情
update:
更新數據點信息,不建議使用
partial_update:
更新數據點信息的部分屬性,不建議使用
destroy:
刪除數據點信息
"""
serializer_class = MetricDataSerializer
permission_classes = (permissions.IsAuthenticated,)
# pagination_class = StanderResultsSetPagination
authentication_classes = (authentication.JWTAuthentication,)
queryset = MetricData.objects.all()
def create(self,request , *args, **kwargs):
serializer =self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
try:
if request.data['direction'] == 'DOWN':
ctrlchannels_id = request.data["ctrlchannel"]
ctrlchannels_value = request.data["value"]
# payload = json.dumps(request.data)
#beilai BL102
jsonload ={"sensorDatas":[{"sensorsId":100,
"flag":ctrlchannels_id,
"value": str(ctrlchannels_value)}],
"down":"down"}
print(jsonload)
payload = json.dumps(jsonload)
# print(ctrlchannels_id)
publish.single(topic=ctrlchannels_id+"/100",
payload=payload,
hostname=settings.MQTT_HOST,
auth={'username':settings.MQTT_USERNAME, 'password':settings.MQTT_PASSWORD})
# self.perform_create(serializer)
except:
pass
return Response(serializer.data,status=status.HTTP_201_CREATED,headers=headers)
return Response(serializer.errors,status=status.HTTP_400_BAD_REQUEST)