EMQ X 規則引擎系列(九)- 消息寫入到 TDEngine


TDEngine 是什么

TDengine 是濤思數據(北京濤思數據科技有限公司)推出的一款開源的專為物聯網、車聯網、工業互聯網、IT 運維等設計和優化的大數據平台。除核心的快 10 倍以上的時序數據庫功能外,還提供緩存、數據訂閱、流式計算等功能,最大程度減少研發和運維的復雜度。

TDengine 作為時序處理引擎,可以完全不用 Kafka、HDFS/HBase/Spark、Redis 等軟件,大幅簡化大數據平台的設計,降低研發成本和運營成本。因為需要集成的開源組件少,因而系統可以更加健壯,也更容易保證數據的一致性。

TDEngine 提供社區版、企業版和雲服務版,安裝/使用教程詳見 TDEngine 使用文檔 https://www.taosdata.com/cn/products/

場景介紹

本文以通過 MQTT 協議接入 EMQ X 的智能門鎖為例進行說明。

智能門鎖已經成為了智能家居的重點關注產品,為了保證用戶更安全的開鎖體驗,智能門鎖通常可以實現指紋開鎖、密碼開鎖、IC卡開鎖、鑰匙開鎖、遠程開鎖等功能。智能門鎖每個業務環節都涉及到操作敏感指令和狀態數據的發送、傳輸,這些數據在應當存儲起來以備后續審計使用。

采集流程

智能門鎖下發指令與上報數據通過 MQTT 協議經 EMQ X 傳輸,可選在 EMQ X 上使用規則引擎篩選或設置消費客戶端處理,將滿足條件的數據寫入 TDEngine 數據平台,整個數據流轉流程如下:

該場景中擬設智能門鎖通過 lock/:id/control_receipt 主題( id 為門鎖連接客戶端的 clientid,同門鎖 id) 上報操作回執與狀態信息,數據格式為如下 JSON 消息:

{
  "id": "51dc0c50f55d11e9a4fec59e26b058d5", // 門鎖 id
  "longitude": 102.8622543, // 當前位置經度
  "latitude": 24.8614503, // 當前位置緯度
  "command": "unlock", // 指令
  "LockState": 0, // 門鎖狀態
  "LockType": 0, // 開鎖方式
  "KeyNickName": "", // 鑰匙昵稱
  "KeyID": "c944c8d0f55e11e9a4fec59e26b058d5", // 鑰匙 ID
  "ErrorCode": 0, // 執行故障代碼
  "pid": "84a2e10f55d11e9a4fec59e26b058d5", // 下發的指令 ID
  "alarm": "", // 當前告警信息
  "ts": 1570838400000 // 執行時間
}

准備

盡管 TDEngine 是關系型數據庫模型,但要求每個采集設備單獨建表,因此我們按照門鎖 id 每個門鎖建表一張,同時浮點數據壓縮比相對整型數據壓縮比很差,經度緯度通常精確到小數點后 7 位,因此將經度緯度增大 1E7 倍轉為長整型存儲

創建數據庫的語句為:

create database db cache 8192 ablocks 2 tblocks 1000 tables 10000;
use db;

創建超級表的SQL語句為:

create table lock(
  ts timestamp,
  id nchar(50),
  pid nchar(50),
  longitude bigint,
  latitude bigint,
  command nchar(50),
  LockState smallint,
  LockType smallint,
  KeyNickName nchar(255),
  KeyID nchar(255),
  ErrorCode smallint,
  alarm nchar(255)
) tags(card int, model binary(10));

TDEngine 是關系型數據庫模型,但要求每個采集設備單獨建表,以門鎖 id 作為采集表表名,例如 id 為 51dc0c50f55d11e9a4fec59e26b058d5,那么創建數據表的語句為:

-- 使用 using 指定其所屬 超級表
create table "v_51dc0c50f55d11e9a4fec59e26b058d5" using lock tags('51dc0c50f55d11e9a4fec59e26b058d5', 0);

在該數據模型下,以門鎖 id 51dc0c50f55d11e9a4fec59e26b058d5 為例,寫入一條記錄到表 v_51dc0c50f55d11e9a4fec59e26b058d5 的 SQL 語句為:

insert into v_51dc0c50f55d11e9a4fec59e26b058d5 values(
  1570838400000,
  '51dc0c50f55d11e9a4fec59e26b058d5',
  'e84a2e10f55d11e9a4fec59e26b058d5',
  1028622543,
  248614503,
  'unlock',
  0,
  0,
  '',
  'c944c8d0f55e11e9a4fec59e26b058d5',
  0,
  '[]',
);

實際使用中請先依次給每個智能門鎖建表

數據寫入方式

目前 EMQ X 消息數據直接寫入 TDEngine 的功能還在規划中,但得益於 TDEngine 提供了諸多連接器,我們選用以下兩種方式完成數據寫入:

  • 使用 TDEngine 的 RESTful Connector:通過 REST API 調用,將數據拼接為 SQL 語句發送到 TDEngine 執行寫入,規則引擎內置表達式與函數可以預處理數據;
  • 通過 TDEngine 提供的客戶端庫/連接器,編寫代碼通過訂閱/消費的方式獲取 EMQ X 消息,處理后轉發寫入到 TDEngine 中。

使用規則引擎寫入數據

資源准備

EMQ X Dashboard 中點擊 規則 主菜單,在 資源 頁面新建一個 WebHook 資源,用於向 TDEngine RESTful Connector 發送數據,新增請求頭:

  • Authorization:值為 TDEngine 請求 TOKEN 用於連接認證,為 {username}:{password} 經過 Base64 編碼之后的字符串。

有關 RESTful Connector 使用教程詳見:TDEngine RESTful Connector

點擊 測試連接,測試通過后點擊 確定 按鈕完成創建。

創建規則

資源創建完畢后我們可以進行規則創建,規則引擎 --> 規則 頁面中點擊 新建 按鈕進入規則創建頁面。

選擇 消息發布 事件,處理傳感器消息上報(發布)時的數據。根據 可用字段 提示,傳感器等信息可以從 payload 中選取。

由於需要將浮點值處理為整型,我們使用簡單計算功能,請留意 SQL 中的注釋項,最終整個 SQL 語句如下:

SELECT
  -- JSON 數據解碼
  json_decode(payload) as p,
  -- 經緯度放大 10E7 倍存儲
  p.longitude * 10000000 as p.longitude,
  p.latitude * 10000000 as p.latitude
FROM
  "message.publish"
WHERE
  -- 通過 topic 篩選數據源
  topic =~ 'lock/+/control_receipt' 

使用 SQL 測試功能,輸入原始上報數據與相關變量,得到如下輸出結果:

{
  "p": {
    "ErrorCode": 0,
    "KeyID": "c944c8d0f55e11e9a4fec59e26b058d5",
    "KeyNickName": "",
    "LockState": 0,
    "LockType": 0,
    "alarm": "",
    "command": "unlock",
    "id": "51dc0c50f55d11e9a4fec59e26b058d5",
    "latitude": 248614503,
    "longitude": 1028622543,
    "pid": "84a2e10f55d11e9a4fec59e26b058d5",
    "ts": 1570838400000
  }
}

從輸出結果看,經緯度浮點值已經轉為整型,說明該步操作正確,可以進行后續操作。

響應動作

點擊創建頁面下方 添加動作 按鈕,在彈出的 新增動作 彈框里動作類型選擇 發送數據到 Web 服務使用資源 選擇上一步中創建的資源,消息內容模板 內容模板里面,使用 ${} 語法提取 條件 SQL 篩選出來的數據,拼接寫入 SQL 語句如下:

insert into db.v_${p.id} values(
  ${p.ts},
  '${p.id}',
  '${p.pid}',
  ${p.longitude},
  ${p.latitude},
  '${p.command}',
  ${p.LockState},
  ${p.LockType},
  '${p.KeyNickName}',
  '${p.KeyID}',
  ${p.ErrorCode},
  '${p.alarm}',
);

點擊 創建 完成規則的創建,智能門鎖上報數據時數據將寫入到 DBEngine,整個工作和業務流程如下:

  • 智能門鎖上報數據至 EMQ X
  • message.publish 事件觸發規則引擎 ,開始按照條件 SQL 中的 where 條件匹配 topicpayload 數據字段
  • 規則命中后觸發響應動作列表,按照響應動作中的消息內容模板拼接出該動作所需請求參數,在這個規則中請求參數是一個 SQL 語句,包含有智能門鎖的上報數據信息
  • 按照動作類型和使用的資源發起請求, 調用 RESTful API 將指令發送到 TDEngine 執行,完成數據寫入。

使用 TDEngine SDK 寫入數據

TDEngine 提供多種語言平台適用的 SDK,程序可以通過訂閱 MQTT 主題或消費消息中間件數據獲取智能門鎖上報到 EMQ X 的數據,隨后將數據拼接成寫入 SQL 最終寫入到 TDEngine 中。

本文使用訂閱 MQTT 主題的方式獲取智能門鎖上報數據。考慮到消息量可能增長到單個訂閱客戶端無法承受的數據量,我們使用 共享訂閱 的方式來消費數據。

在共享訂閱中,訂閱同一個主題的客戶端會輪流的收到這個主題下的消息,也就是說同一個消息不會發送到多個訂閱者,從而實現訂閱端的多個節點之間的負載均衡。

代碼示例

該示例使用 Node.js 平台,借助 TDEngine 的 RESTful Connector 實現數據寫入操作。

使用方式:安裝 Node.js、安裝 npm、安裝依賴、修改相應參數並運行執行

// index.js
const mqtt = require("mqtt");
const axios = require("axios");

/**
 * 通過 RESTful Connector 執行 TDEngine 操作
 * @param {string} 需要執行的 sql
 */
function exec(sql = "") {
  return axios({
    method: "post",
    url: "http://127.0.0.1:6020/rest/sql",
    auth: {
      username: "root",
      password: "taosdata"
    },
    data: sql
  });
}

// MQTT 處理訂閱消息回調
async function handleMessage(topic, message) {
  try {
    // JSON 轉對象
    const p = JSON.parse(message.toString());
    // 處理浮點數據
    p.longitude = p.longitude * 10e7;
    p.latitude = p.latitude * 10e7;
    const resp = await exec(`
      INSERT INTO db.v_${p.id} values(
        ${p.ts},
        '${p.id}',
        '${p.pid}',
        ${p.longitude},
        ${p.latitude},
        '${p.command}',
        ${p.LockState},
        ${p.LockType},
        '${p.KeyNickName}',
        '${p.KeyID}',
        ${p.ErrorCode},
        '${p.alarm}',
      );`);
    console.log(`Exec success:`, resp.data);
  } catch (e) {
    console.log(
      "exec insert error:",
      e.message,
      e.response ? e.response.data : ""
    );
  }
}

function createConsumer(config = {}) {
  const client = mqtt.connect("mqtt://127.0.0.1:1883", config);

  client.on("connect", () => {
    // 使用共享訂閱 $share/ 前綴
    client.subscribe("$share//lock/+/control_receipt", (err, granded = []) => {
      if (!err && granded[0].qos <= 2) {
        console.log("Consumer client ready");
      }
    });
  });

  client.on("message", handleMessage);
}

// 創建 10 個共享訂閱消費者
for (let i = 0; i < 10; i++) {
  createConsumer();
}

測試

通過 EMQ X Dashboard 內置的 MQTT 客戶端(WebSocket)可以快速模擬測試規則可用性。打開 工具 -> WebSocket 頁面,輸入按照智能門鎖連接信息建立連接,在 發布 功能里面輸入上報主題、上報數據點擊發布進行模擬測試:

  • 發布主題:lock/${id}/control_receipt

  • Payload:

      {
        "id": "51dc0c50f55d11e9a4fec59e26b058d5",
        "longitude": 102.8622543,
        "latitude": 24.8614503,
        "command": "unlock",
        "LockState": 0,
        "LockType": 0,
        "KeyNickName": "", 
        "KeyID": "c944c8d0f55e11e9a4fec59e26b058d5",
        "ErrorCode": 0,
        "pid": "84a2e10f55d11e9a4fec59e26b058d5",
        "alarm": "",
        "ts": 1570838400000
      }
    

發布多次,在 規則引擎 列表里,點擊 監控 圖標可以快速查看當前規則執行數據,由下圖可見 4 條消息命中 3 次,成功 3 次:

在 TDEngine 控制台查看 db.v_51dc0c50f55d11e9a4fec59e26b058d5 中的數據,此時有 3 條數據:

use db;
select count(*) from v_51dc0c50f55d11e9a4fec59e26b058d5;

taos> select count(*) from v_51dc0c50f55d11e9a4fec59e26b058d5;
      count(*)       |
======================
                    3|
Query OK, 1 row(s) in set (0.000612s)

刪除該條規則,啟動 TDEngine SDK 寫入代碼,重復該上述測試操作,可以看到程序打印日志如下:

{ status: 'succ', head: [ 'affected_rows' ], data: [ [ 1 ] ], rows: 1 }
{ status: 'succ', head: [ 'affected_rows' ], data: [ [ 1 ] ], rows: 1 }
{ status: 'succ', head: [ 'affected_rows' ], data: [ [ 1 ] ], rows: 1 }

至此,寫入 EMQ X 數據到 TDEngine 的整個功能已開發/配置完成。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM