從零開始搭建物聯網平台(6):消息的持久化


遇到的問題:

查看了EMQ文檔發現並不提供消息的持久化功能,MQTT協議是按照設備一直在線設計的,數據都是保存在內存里的,但是考慮到用戶上傳傳感器數據不可能接收了就扔掉,那樣就沒法查看歷史數據了,所以用戶上傳的消息必須要能夠保存下來,以便查看歷史數據,這樣一來持久化功能就需要我們自己來實現。

另外還會出現一個問題,當兩個設備注冊的主題名一樣的時候,不能分出是哪一個設備發出的消息,在接收訂閱消息的時候發現沒辦法獲取到發送消息的clientID,而且其他設備也可以訂閱到任意設備的消息,對於敏感信息來說存在安全性。

解決方法:

初步打算是,用戶需要在后台注冊自己的設備和數據流信息,后台會對所有注冊的信息進行訂閱接收到消息后,后台會把消息寫入到對應的表中,另外設備發布主題只能使用(clientID/主題名)命名方式,以便后台能夠區分是哪一個設備發送過來的消息。對於MQTT了解還是不夠深,只能使用這樣的笨辦法來解決了,以后若是找到其他的方法在進行改進。

解決問題:

首先需要通過python建立mqtt連接監聽所有注冊的主題信息,這里使用了paho-mqtt庫來實現,為了方便以后的調用將其封裝成一個類,最開始的時候想把一些常用的操作也封裝進去,單獨測試完全可以,但是一旦放到Django請求中處理的時候,mqtt能夠正常返回成功信息,但是實際上並沒有正確執行,這一點始終沒有找到原因,最終只能簡化,只包含最基礎的功能。

class MqClient(object):
    def __init__(self, client_id, username, password):
        self.client = client.Client(client_id=client_id,
                                    clean_session=True)  # 初始化,clean_session為false的時候EMQ會保存訂閱狀態,可以不再次訂閱
        self.client.username_pw_set(username, password)  # 設置連接用戶名
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self._client_status = False  # 連接狀態
        self._cloop = None
        self._connect()  # 實例化會自動連接

    def _connect(self, host="your IP ", port=1883, keepalive=60):
        """連接服務器"""
        self.client.connect_async(host, port, keepalive)
        # 開啟線程執行
        self._cloop = threading.Thread(target=self.client.loop_start())
        self._cloop.start()

    def on_connect(self, client, userdata, flags, rc):
        """連接成功的回調函數"""
        # 修改客戶端狀態
        if rc == 0:
            self._client_status = True

    def init_sub(self):
        # 讀取數據庫中所有的已經注冊過的topic並且訂閱
        for i in models.Device.objects.all():
            for j in i.dev_stream.all():
                self.client.subscribe(str(i.device_id) + '/' + j.name, j.qos)

    @staticmethod
    def on_message(client, userdata, msg):
        client_id = msg.topic.split('/')[0]
        stream = msg.topic.split('/')[1]
        data = msg.payload.decode()
        # 接收訂閱信息寫入到數據庫中
        models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
            models.Data.objects.create(data=data))

有了封裝好的類,現在我們需要做的是:在Django項目啟動完成之后自動執行監聽任務的,最開始的時候打算放到setting或者__init__里面,但是因為類里面封裝了model操作,那時候項目還沒有加載完model會報錯,所以最終新建了一個app,然后放到其下的urls,這樣當項目啟動完成的時候就會自動加載了。

from utils.mqtt_client import MqClient

MQClient = MqClient(your client ID, username, password)
MQClient.init_sub()

接下來測試一下實時新增訂閱的功能,先從urls文件導入示例化之后的對象,調用client的subscribe方法

from mqtt.urls import MQClient
class Test(APIView):
    def post(self, request):
        topic = request.POST.get('topic')
        qos = int(request.POST.get('qos'))
        if topic:
            MQClient.client.subscribe(topic,qos)
        return HttpResponse("ok")

最后就剩下把數據存入數據庫中了,這個操作已經寫在那個類中了。簡單說明一下 ,當paho接收了mqtt請求的時候會產生一個回調,執行下面這個函數,接收到的類容包含在msg中,msg主要有topic和payload兩個屬性,topic是訂閱的主題名,payload則是具體的消息內容,按照之前的規定,主題名為client/stream,對topic內容拆分獲取到client_id和stream,最后就是數據庫的插入操作了,涉及到多表操作,簡單點說就是,先插入一個data數據,然后根據client_id和stream來確定stream,最后再通過add方法將兩者關聯起來,這樣就完成了消息的保存了。

def on_message(client, userdata, msg):
        client_id = msg.topic.split('/')[0]
        stream = msg.topic.split('/')[1]
        data = msg.payload.decode()
        # 接收訂閱信息寫入到數據庫中
        models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
            models.Data.objects.create(data=data))

測試一下,數據庫里面已經准備一些client和stream數據,還是使用EMQ的websocket來測試,發送主題為123456/hum,消息內容為654321,在來看一下數據庫中數據是否插入成功。

 

data表
stream表
stream和data關聯表

 

更新:

最終對這一部分做了修改,沒有將MQTT相關的東西放到Django里面,獨立出來了,這樣也方便日后的擴展和管理,數據庫操作改用了sqlachemy實現,其他內容基本不變

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM