遇到的問題:
查看了EMQ文檔發現並不提供消息的持久化功能,MQTT協議是按照設備一直在線設計的,數據都是保存在內存里的,但是考慮到用戶上傳傳感器數據不可能接收了就扔掉,那樣就沒法查看歷史數據了,所以用戶上傳的消息必須要能夠保存下來,以便查看歷史數據,這樣一來持久化功能就需要我們自己來實現。
另外還會出現一個問題,當兩個設備注冊的主題名一樣的時候,不能分出是哪一個設備發出的消息,在接收訂閱消息的時候發現沒辦法獲取到發送消息的clientID,而且其他設備也可以訂閱到任意設備的消息,對於敏感信息來說存在安全性。
解決方法:
初步打算是,用戶需要在后台注冊自己的設備和數據流信息,后台會對所有注冊的信息進行訂閱接收到消息后,后台會把消息寫入到對應的表中,另外設備發布主題只能使用(clientID/主題名)命名方式,以便后台能夠區分是哪一個設備發送過來的消息。對於MQTT了解還是不夠深,只能使用這樣的笨辦法來解決了,以后若是找到其他的方法在進行改進。
解決問題:
首先需要通過python建立mqtt連接監聽所有注冊的主題信息,這里使用了paho-mqtt庫來實現,為了方便以后的調用將其封裝成一個類,最開始的時候想把一些常用的操作也封裝進去,單獨測試完全可以,但是一旦放到Django請求中處理的時候,mqtt能夠正常返回成功信息,但是實際上並沒有正確執行,這一點始終沒有找到原因,最終只能簡化,只包含最基礎的功能。
class MqClient(object):
def __init__(self, client_id, username, password):
self.client = client.Client(client_id=client_id,
clean_session=True) # 初始化,clean_session為false的時候EMQ會保存訂閱狀態,可以不再次訂閱
self.client.username_pw_set(username, password) # 設置連接用戶名
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self._client_status = False # 連接狀態
self._cloop = None
self._connect() # 實例化會自動連接
def _connect(self, host="your IP ", port=1883, keepalive=60):
"""連接服務器"""
self.client.connect_async(host, port, keepalive)
# 開啟線程執行
self._cloop = threading.Thread(target=self.client.loop_start())
self._cloop.start()
def on_connect(self, client, userdata, flags, rc):
"""連接成功的回調函數"""
# 修改客戶端狀態
if rc == 0:
self._client_status = True
def init_sub(self):
# 讀取數據庫中所有的已經注冊過的topic並且訂閱
for i in models.Device.objects.all():
for j in i.dev_stream.all():
self.client.subscribe(str(i.device_id) + '/' + j.name, j.qos)
@staticmethod
def on_message(client, userdata, msg):
client_id = msg.topic.split('/')[0]
stream = msg.topic.split('/')[1]
data = msg.payload.decode()
# 接收訂閱信息寫入到數據庫中
models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
models.Data.objects.create(data=data))
有了封裝好的類,現在我們需要做的是:在Django項目啟動完成之后自動執行監聽任務的,最開始的時候打算放到setting或者__init__里面,但是因為類里面封裝了model操作,那時候項目還沒有加載完model會報錯,所以最終新建了一個app,然后放到其下的urls,這樣當項目啟動完成的時候就會自動加載了。
from utils.mqtt_client import MqClient
MQClient = MqClient(your client ID, username, password)
MQClient.init_sub()
接下來測試一下實時新增訂閱的功能,先從urls文件導入示例化之后的對象,調用client的subscribe方法
from mqtt.urls import MQClient
class Test(APIView):
def post(self, request):
topic = request.POST.get('topic')
qos = int(request.POST.get('qos'))
if topic:
MQClient.client.subscribe(topic,qos)
return HttpResponse("ok")
最后就剩下把數據存入數據庫中了,這個操作已經寫在那個類中了。簡單說明一下 ,當paho接收了mqtt請求的時候會產生一個回調,執行下面這個函數,接收到的類容包含在msg中,msg主要有topic和payload兩個屬性,topic是訂閱的主題名,payload則是具體的消息內容,按照之前的規定,主題名為client/stream,對topic內容拆分獲取到client_id和stream,最后就是數據庫的插入操作了,涉及到多表操作,簡單點說就是,先插入一個data數據,然后根據client_id和stream來確定stream,最后再通過add方法將兩者關聯起來,這樣就完成了消息的保存了。
def on_message(client, userdata, msg):
client_id = msg.topic.split('/')[0]
stream = msg.topic.split('/')[1]
data = msg.payload.decode()
# 接收訂閱信息寫入到數據庫中
models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
models.Data.objects.create(data=data))
測試一下,數據庫里面已經准備一些client和stream數據,還是使用EMQ的websocket來測試,發送主題為123456/hum,消息內容為654321,在來看一下數據庫中數據是否插入成功。



更新:
最終對這一部分做了修改,沒有將MQTT相關的東西放到Django里面,獨立出來了,這樣也方便日后的擴展和管理,數據庫操作改用了sqlachemy實現,其他內容基本不變