InfluxDB 是一個由 InfluxData 開發的開源時序型數據庫。 它由 Go 寫成,着力於高性能地查詢與存儲時序型數據,相比上一期中介紹的 OpenTSDB 數據庫 InfluxDB 較為輕量,在 InfluxData 官方給出的各項指標基准測試用 InfluxDB 都強於 OpenTSDB。
面對大規模快速增長的物聯網傳感器采集、交易記錄等數據,時間序列數據累計速度非常快,時序數據庫通過提高效率來處理這種大規模數據,並帶來性能的提升,包括:更高的容納率(Ingest Rates)、更快的大規模查詢(盡管有一些比其他數據庫支持更多的查詢)以及更好的數據壓縮。
本文以 CentOS 7.2
系統中的實際例子來說明如何通過 InfluxDB 來存儲相關的信息。
安裝與驗證 InfluxDB 服務器
讀者可以參考 InfluxDB 官方文檔(https://docs.influxdata.com/influxdb) 或 Docker (https://hub.docker.com/_/influxdb) 來下載安裝 InfluxDB 服務器,本文使用 InfluxDB 1.7 版本。
配置 EMQ X 服務器
通過 RPM 方式安裝的 EMQ X,InfluxDB 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_influxdb.conf
,考慮到功能定位,InfluxDB 插件僅支持消息存儲功能。
配置連接地址與連接池大小:
## InfluxDB UDP Server
## 僅使用 UDP 接入
backend.influxdb.pool1.server = 127.0.0.1:8089
## InfluxDB Pool Size
backend.influxdb.pool1.pool_size = 5
## Whether or not set timestamp when encoding InfluxDB line
backend.influxdb.pool1.set_timestamp = trues
**InfluxDB Backend 消息存儲規則參數: **
通過 topic 過濾器,設置需要存儲消息的主題,pool 參數區別多個數據源:
## Store Publish Message
backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
啟動該插件,啟動插件的方式有 命令行
和 控制台
兩種方式,用戶可以任選其一。
消息模板
由於 MQTT Message 無法直接寫入 InfluxDB, InfluxDB Backend 提供了 emqx_backend_influxdb.tmpl 模板文件將 MQTT Message 轉換為可寫入 InfluxDB 的 DataPoint。
消息模板功能需要重啟 EMQ X 才能應用更改。
tmpl 文件位於 data/templates/emqx_backend_influxdb_example.tmpl
,使用 json 格式, 用戶可以為不同 Topic 定義不同的 Template, 類似:
{
"timestamp": <Where is value of timestamp>
"measurement": <Where is value of measurement>,
"tags": {
<Tag Key>: <Where is value of tag>
},
"fields": {
<Field Key>: <Where is value of field>
}
}
其中, measurement 與 fields 為必選項, tags 與 timestamp 為可選項。
$key
提取變量名為
key
的變量,支持的變量如下:
- qos: 消息 QoS
- form: 發布者信息
- topic: 發布主題
- timestamp: 時間戳
- payload.*: JSON 消息體內任意變量,如
{ "data": [{ "temp": 1 }] }
使用["$payload", "data", "temp"]
可以提取出1
來
本示例設定模板如下:
{
"sample": {
"measurement": "$topic",
"tags": {
"host": ["$payload", "data", "$0", "host"],
"region": ["$payload", "data", "$0", "region"],
"qos": "$qos",
"from": "$from"
},
"fields": {
"temperature": ["$payload", "data", "$0", "temp"]
},
"timestamp": "$timestamp"
}
}
當 Topic 為 "sample" 的 MQTT Message 擁有以下 Payload 時:
{
"data": [
{
"temp": 1,
"host": "serverA",
"region": "hangzhou"
},
{
"temp": 2,
"host": "serverB",
"region": "ningbo"
}
]
}
Backend 會將 MQTT Message 轉換為:
[
{
"measurement": "sample",
"tags": {
"from": "mqttjs_ebcc36079a",
"host": "serverA",
"qos": "0",
"region": "hangzhou"
},
"fields": {
"temperature": "1"
},
"timestamp": "1560743513626681000"
},
{
"measurement": "sample",
"tags": {
"from": "mqttjs_ebcc36079a",
"host": "serverB",
"qos": "0",
"region": "ningbo"
},
"fields": {
"temperature": "2"
},
"timestamp": "1560743513626681000"
}
]
使用示例
EMQ X 管理控制台 WebSocket 頁面中,向 sample
主題發布如上格式消息消息,消息將解析存儲到 InfluxDB udp
數據庫對應的 measurement
中。
總結
讀者在理解了 InfluxDB 中所存儲的數據結構,學習使用消息模板配置寫入消息字段格式后可以結合 InfluxDB 拓展相關應用。