EMQ X 持久化插件系列(二)- InfluxDB 數據存儲


InfluxDB 是一個由 InfluxData 開發的開源時序型數據庫。 它由 Go 寫成,着力於高性能地查詢與存儲時序型數據,相比上一期中介紹的 OpenTSDB 數據庫 InfluxDB 較為輕量,在 InfluxData 官方給出的各項指標基准測試用 InfluxDB 都強於 OpenTSDB。

面對大規模快速增長的物聯網傳感器采集、交易記錄等數據,時間序列數據累計速度非常快,時序數據庫通過提高效率來處理這種大規模數據,並帶來性能的提升,包括:更高的容納率(Ingest Rates)、更快的大規模查詢(盡管有一些比其他數據庫支持更多的查詢)以及更好的數據壓縮。

本文以 CentOS 7.2 系統中的實際例子來說明如何通過 InfluxDB 來存儲相關的信息。

安裝與驗證 InfluxDB 服務器

讀者可以參考 InfluxDB 官方文檔(https://docs.influxdata.com/influxdb) 或 Docker (https://hub.docker.com/_/influxdb) 來下載安裝 InfluxDB 服務器,本文使用 InfluxDB 1.7 版本。

配置 EMQ X 服務器

通過 RPM 方式安裝的 EMQ X,InfluxDB 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_influxdb.conf,考慮到功能定位,InfluxDB 插件僅支持消息存儲功能。

配置連接地址與連接池大小:

## InfluxDB UDP Server
## 僅使用 UDP 接入
backend.influxdb.pool1.server = 127.0.0.1:8089

## InfluxDB Pool Size
backend.influxdb.pool1.pool_size = 5

## Whether or not set timestamp when encoding InfluxDB line
backend.influxdb.pool1.set_timestamp = trues

**InfluxDB Backend 消息存儲規則參數: **

通過 topic 過濾器,設置需要存儲消息的主題,pool 參數區別多個數據源:

## Store Publish Message
backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

啟動該插件,啟動插件的方式有 命令行控制台兩種方式,用戶可以任選其一。

消息模板

由於 MQTT Message 無法直接寫入 InfluxDB, InfluxDB Backend 提供了 emqx_backend_influxdb.tmpl 模板文件將 MQTT Message 轉換為可寫入 InfluxDB 的 DataPoint。

消息模板功能需要重啟 EMQ X 才能應用更改。

tmpl 文件位於 data/templates/emqx_backend_influxdb_example.tmpl,使用 json 格式, 用戶可以為不同 Topic 定義不同的 Template, 類似:

{
    "timestamp": <Where is value of timestamp>
		"measurement": <Where is value of measurement>,
    "tags": {
        <Tag Key>: <Where is value of tag>
    },
		"fields": {
    	<Field Key>: <Where is value of field>
    }
}

其中, measurement 與 fields 為必選項, tags 與 timestamp 為可選項。 支持通過占位符如 $key 提取變量名為 key 的變量,支持的變量如下:

  • qos: 消息 QoS
  • form: 發布者信息
  • topic: 發布主題
  • timestamp: 時間戳
  • payload.*: JSON 消息體內任意變量,如 { "data": [{ "temp": 1 }] } 使用 ["$payload", "data", "temp"] 可以提取出 1

本示例設定模板如下:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "from": "$from"
        },
        "fields": {
            "temperature": ["$payload", "data", "$0", "temp"]
        },
        "timestamp": "$timestamp"
    }
}

當 Topic 為 "sample" 的 MQTT Message 擁有以下 Payload 時:

{
  "data": [
    {
      "temp": 1,
      "host": "serverA",
      "region": "hangzhou"
    },
    {
      "temp": 2,
      "host": "serverB",
      "region": "ningbo"
    }
  ]
}

Backend 會將 MQTT Message 轉換為:

[
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverA",
      "qos": "0",
      "region": "hangzhou"
    },
    "fields": {
      "temperature": "1"
    },
    "timestamp": "1560743513626681000"
  },
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverB",
      "qos": "0",
      "region": "ningbo"
    },
    "fields": {
      "temperature": "2"
    },
    "timestamp": "1560743513626681000"
  }
]

使用示例

EMQ X 管理控制台 WebSocket 頁面中,向 sample 主題發布如上格式消息消息,消息將解析存儲到 InfluxDB udp 數據庫對應的 measurement 中。

總結

讀者在理解了 InfluxDB 中所存儲的數據結構,學習使用消息模板配置寫入消息字段格式后可以結合 InfluxDB 拓展相關應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM