
本文以常見物聯網使用場景為例,介紹了如何利用 EMQ X MQTT 服務器 + InfluxDB + Grafana 構建物聯網數據可視化平台,將物聯網設備上傳的時序數據便捷地展現出來。
在物聯網項目中接入平台的設備數據和數據存儲方案有以下特點:
- 數據采集的維度、頻率、以及設備數量都比較多,采集的數據量比較大,對消息服務器的接入吞吐量、后端數據庫的存儲空間消耗有很大壓力。
- 數據按照采集周期進行上報、傳輸、存儲一般都按照時間序列。
因此在物聯網項目中使用時序數據庫是比較好的選擇。時序數據庫 可以帶來顯著的性能提升,包括更高的容納率、更快的大規模查詢,以及更好的數據壓縮率等。數據入庫后,往往需要數據可視化平台將數據按照規則統計、展現出來,實現數據的監控、指標統計等業務需求,以便充分發揮數據的價值。
業務場景
假設現有一批設備,每個設備有一個 Client ID,所有設備均通過 MQTT 協議 往 MQTT 服務器 上相應的主題發送數據,主題的設計如下:
devices/{client_id}/messages
每個設備發送的數據格式為 JSON,發送的通過該傳感器采集的溫度與濕度數據。
{
"temperature": 30,
"humidity" : 20
}
現在需要實時存儲數據以便在后續任意時間查看,具體需求如下:
- 每個設備按照每 5 秒鍾一次的頻率進行數據上報,數據庫需存儲每條數據;
- 通過可視化系統查看 任意時間區間內 的溫度/濕度平均值、最大值、最小值,與 所有時間段內 溫度/濕度的平均值。
可視化平台最終的展示效果如下圖。儀表盤右上角可以選擇時間區間、自動刷新時間,此時設備持續發送數據,儀表盤數據值會隨之變化,實現了功能比較全面的可視化效果。

方案介紹
目前市面上已有多款物聯網消息中間件、時序數據庫和數據可視化產品,結合數據的采集上報、聯網接入、消息存儲與可視化功能來看,EMQ X(高性能物聯網 MQTT 消息中間件) + InfluxDB(時序數據庫)+ Grafana(美觀、強大的可視化監控指標展示工具)組合無疑是最佳的物聯網數據可視化集成方案。
方案整體架構如下圖所示:

- EMQ X:EMQ X 是基於高並發的 Erlang/OTP 語言平台開發,支持百萬級連接和分布式集群架構,發布訂閱模式的開源 MQTT 消息服務器。EMQ X 內置了大量開箱即用的功能,其企業版 EMQ X Enterprise 支持通過規則引擎或消息持久化插件將設備消息高性能地存儲到 InfluxDB,開源用戶需自行處理消息存儲環節。
- InfluxDB:InfluxDB 是一個由 InfluxData 開源的時序型數據庫。它由 Go 寫成,着力於高性能地查詢與存儲時序型數據。InfluxDB 被廣泛應用於存儲系統的監控數據,IoT 行業的實時數據等場景。
- Grafana: Grafana 是一個跨平台、開源的度量分析和可視化工具,可以通過靈活的配置查詢采集到的數據並進行可視化展示。它可以快速靈活的創建客戶端圖表,官方庫中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式。支持 Graphite,InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch 和 KairosDB 等數據源。可以創建自定義告警規則並通知到其他消息處理服務或組件中。
實現步驟
本文所用各個組件均有 Docker 鏡像,除 EMQ X 需要使用下載安裝外(方便修改部分配置),InfluxDB 與 Grafana 均使用 Docker 搭建,詳細的安裝步驟本文不再贅述。
三大部件的官網均有不同操作系統的安裝包資源與教程:
EMQ X 安裝
安裝
訪問 EMQ X 下載 頁面下載適合您操作系統的安裝包,由於數據持久化是企業功能,您需要下載 EMQ X 企業版(可以申請 License 試用)。 寫本文的時候 EMQ X 企業版最新版本為 v3.4.5,本教程需要使用該版本及以上版本,下載后的啟動步驟如下 :
## 解壓下載好的安裝包
unzip emqx-ee-macosx-v3.4.4.zip
cd emqx
## 將 License 文件復制到 EMQ X 指定目錄 etc/, License 需自行申請試用或通過購買授權獲取
cp ../emqx.lic ./etc
## 以 console 模式啟動 EMQ X
./bin/emqx console
修改配置
本文中需要用到的配置文件如下:
-
License 文件,EMQ X 企業版 License 文件,使用可用的 License 覆蓋:
etc/emqx.lic -
EMQ X InfluxDB 消息存儲插件配置文件,用於配置 InfluxDB 連接信息、選取入庫 Topic:
etc/plugins/emqx_backend_influxdb.conf根據部署實際情況填寫插件配置信息如下:
backend.influxdb.pool1.server = 127.0.0.1:8089 backend.influxdb.pool1.pool_size = 5 ## Whether or not set timestamp when encoding InfluxDB line backend.influxdb.pool1.set_timestamp = true ## Store Publish Message ## 由於業務僅需 devices/{client_id}/messages 主題,此處修改默認配置的主題過濾器 backend.influxdb.hook.message.publish.1 = {"topic": "devices/+/messages", "action": {"function": "on_message_publish"}, "pool": "pool1"} -
EMQ X InfluxDB 消息存儲插件消息模板文件,用於定義消息解析入庫模板:
## 模板文件 data/templates/emqx_backend_influxdb_example.tmpl ## 重命名修改為 data/templates/emqx_backend_influxdb.tmpl由於 MQTT Message 無法直接寫入 InfluxDB, EMQ X 提供了 emqx_backend_influxdb.tmpl 模板文件將 MQTT Message 轉換為可寫入 InfluxDB 的 DataPoint:
{ "devices/+/messages": { "measurement": "devices", "tags": { "client_id": "$client_id" }, "fields": { "temperature": ["$payload", "temperature"], "humidity": ["$payload", "humidity"] }, "timestamp": "$timestamp" } }關於 EMQ X InfluxDB 使用詳細教程見 [ InfluxDB 數據存儲](
InfluxDB 安裝
通過 Docker 進行安裝,映射數據文件夾與 8089 udp 端口與 8086 端口(Grafana 使用):
EMQ X 僅支持 InfluxDB UDP 通道,需要 influx_udp 插件支持,且數據庫名稱指定為 db
## 使用 influx_udp 插件
git clone https://github.com/palkan/influx_udp.git
## 進入插件目錄
cd influx_udp
## 通過插件配置創建並啟動容器
docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp \
-v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf \
-e INFLUXDB_DB=db \
influxdb:latest
## 啟動后檢查容器運行狀態
docker ps -a
至此,可以重啟 EMQ X 並啟動插件以應用以上配置:
./bin/emqx stop
./bin/emqx start
## 或使用 console 模式可以看到更多信息
./bin/emqx console
## 啟動插件
./bin/emqx_ctl plugins load emqx_backend_influxdb
## 啟動成功后會有以下提示
Plugin emqx_backend_influxdb loaded successfully.
Grafana 安裝
使用以下命令通過 Docker 安裝並啟動 Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana
啟動成功后瀏覽器訪問 http://127.0.0.1:3000 訪問 Grafana 可視化面板,使用 admin admin 默認用戶名密碼完成初次登錄,登錄后按照提示修改密碼使用新密碼登錄進入主界面:

寫入模擬數據
進行可視化配置之前需要寫入模擬數據,方便配置過程中進行效果預覽。
以下腳本模擬完成了 100 個設備在過去 12 小時內、每隔 5 秒鍾上報一條模擬溫濕度數據並發送到 EMQ X 的場景,讀者安裝 Node.js 平台后可以通過以下命令啟動:
npm install mqtt mockjs --save
node mock.js
模擬腳本執行完畢后,數據將寫入 InfluxDB db 數據庫中,通過以下命令進入 InfluxDB 容器並查看數據:
## 進入 docker 容器
docker exec -it influxdb bash
## 進入 influxdb 命令行
root@581bde65650d:/# influx
## 切換到 db 數據庫
use db;
## 查詢數據
select * from devices limit 1;
## 查詢結果
name: devices
time client_id humidity temperature
---- --------- -------- -----------
1574578725608000000 mock_client_1 54.33 98.5
附:模擬腳本如下:
// Node.js
// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
class MockData {
constructor(clientNum = 20) {
this.EMQX_SERVER = 'mqtt://localhost:1883'
this.clientNum = clientNum
this.clients = {}
this.startMock()
}
async startMock() {
const now = Date.now()
// last 12h every 5s
for (let ts = now - 12 * 3600 * 1000; ts <= now; ts += 5 * 1000) {
for (let i = 0; i < this.clientNum; i++) {
const clientId = `mock_client_${i}`
const client = this.clients[clientId] || await this.createClient(clientId)
this.clients[clientId] = client
const mockData = this.getMockData()
client.publish(`devices/${clientId}/messages`, JSON.stringify(mockData))
console.log(`${clientId} send temperature ${mockData.temperature} humidity ${mockData.humidity}`)
}
}
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
createClient(clientId) {
return new Promise((resolve, reject) => {
const client = mqtt.connect(this.EMQX_SERVER, {
clientId,
})
client.on('connect', () => {
console.log('client s% connected', clientId)
resolve(client)
})
client.on('error', (e) => {
reject(e)
})
})
}
/**
* Generate mock data
*/
getMockData() {
return {
temperature: parseFloat(Mock.Random.float(22, 100).toFixed(2)),
humidity: parseFloat(Mock.Random.float(12, 86).toFixed(2)),
}
}
}
// startup
new MockData(100)
可視化配置
組件安裝完成,模擬數據寫入成功后,按照 Grafana 可視化界面的操作指引,完成業務所需數據可視化配置。
添加數據源(Add data source)
添加數據源,即顯示的數據源信息。選取 InfluxDB 類型數據源,輸入連接參數進行配置,默認情況下,關鍵配置信息如下:
- URL:填寫 InfluxDB 連接地址,由於我們使用 Docker 安裝,Grafana 由於 InfluxDB 容器網絡不互通,此處可以輸入當前服務器內網/局域網地址而非
127.0.0.1或localhost; - Auth:InfluxDB 默認啟動無認證方式,根據實際情況填寫;
- Database:填寫
db,為 EMQ X 默認寫入數據庫名。
添加儀表盤
添加好數據源后,添加需要顯示的數據儀表盤信息。儀表盤為多個可視化面板的集合,點擊 New Dashboard 后,選擇 Add Query 通過查詢來添加數據面板:

創建面板需要四個步驟,分別是 Queries(查詢)、Visualization(可視化)、General(圖表配置)、Alert(告警),下面按照業務需求完成創建。
溫、濕度平均值面板
使用 Grafana 的可視化查詢構建工具,查詢出所有設備的平均值:
- FROM:選取數據的
measurement,按照emqx_backend_influxdb.tmpl文件配置,此處measurement為devices; - SELECT:選取、計算的字段,此處兩個查詢需要使用 Aggregation 功能處理,分別選擇
temperaturemean和humiditymean,查詢並計算溫度、濕度字段的平均值; - GROUP BY:默認使用時間區間聚合。
time($__interval)函數表示取$__interval時間區間內的數據,如time(5s)表示從每5秒時間區間原始數據內取出值來進行計算(SELECT 中的計算)fill參數表示沒有值時候的默認值,為null的時候該數據點不會在圖表顯示出來;tag可選,按照指定 tag 進行顯示。
- ALIAS BY:該查詢的別名,方便可視化查看。
Visualization 默認不做更改,General 里面修改面板名稱為 Device temperature and humidity mean value,如果需要對業務進行監控告警,可以在 Alert 里編排告警規則,此處僅做可視化展示,不使用此功能。

完成創建后,點擊左上角返回按鈕,該 Dashboard 里成功添加一個數據面板。點擊頂部導航欄保存圖標,輸入 Dashboard 名稱完成 Dashboard 的創建。

溫、濕度最大、最小值面板
繼續點擊 Dashboard 的 Add panel 按鈕,添加溫度最大值、最小值圖表。操作步驟同添加平均值,僅對查詢中 SELECT 統計方法字段做出調整,調整為 Selectors 功能中的 max 和 min 方法。
溫、濕度總平均值、數據條數面板
繼續點擊 Dashboard 的 Add panel 按鈕,添加溫、濕度總平均值,數據條數面板。操作步驟近似上面兩個步驟,分別使用 count 和 mean 方法對指定字段操作,取消 GROUP BY 字段即可完成查詢。Visualization 配置中選擇圖表類型為 Gauge(儀表) 即可。
保存儀表盤,拖拽調整每個數據面板大小、位置,最終得到一個視覺效果較好的數據儀表盤。最終報表完成后,呈現的就是文章開頭展示的效果。
總結
至此我們完成了 EMQ X + InfluxDB + Grafana 物聯網數據可視化平台的搭建。通過本文,讀者可以了解到利用 EMQ X 豐富的拓展能力在數據可視化解決方案里可以非常快速、靈活地開發出基於 InfluxDB + Grafana 的可視化系統,實現海量數據存儲、計算分析與展現。深入學習掌握 Grafana 的其他功能后,用戶可以定制出更完善的數據可視化乃至監控告警系統。
版權聲明: 本文為 EMQ 原創,轉載請注明出處。
