一、前言
時隔一年半,技術變化特別快,學習也要跟上才行。以前寫過EMQ數據轉存問題,當時用了比較笨的方法,通過寫插件的方式,把MQTT里面的數據發送到數據庫進行存儲。當時也是為了學習erlang和emq。現在隨着對物聯網的深入,也結合實際需求,不停的學習。
下面將介紹我實驗測試可行的物聯網數據分析解決方案。采用的還是開源方案。通過訂閱MQTT的根Topic,把所有物聯網數據轉存到InfluxDB時序數據庫,然后通過Grafana進行圖表顯示。這應該是目前比較流行的方案。
二、安裝InfluxDB
InfluxDB是時序數據庫,特別適合做數據監控和物聯網數據存儲。【也可以說適合我現在參與架構的物聯網平台的技術選型】
針對InfluxDB也沒有什么可以多說的,詳細可以查閱官方文檔,或者網上的博客文章。我寫的都是平時實踐過程的操作記錄,寫博客,主要是為了以后忘記的時候,回看查閱用的。另一方面是加強跟同行讀者交流的渠道。有一點要注意,一開始為了新,我用InfluxDB 2.0 版本,發現不行,那個太新的,很多對應的開發庫沒有完善好。所以還是采用InfluxDB 1.x版本。這樣在spring boot 里面也有自帶的starter庫可以使用,操作起來特別方便。
InfluxDB官方文檔: https://docs.influxdata.com/influxdb/v1.7/ 安裝:
1 wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add - 2 echo "deb https://repos.influxdata.com/debian stretch stable" | sudo tee /etc/apt/sources.list.d/influxdb.list 3 apt-get update 4 apt-get install influxdb
三、InfluxDB基礎命令使用
修改配置文件 /etc/influxdb/influxdb.conf
1 [http] 2 enabled = true 3 bind-address = ":8086" 4 auth-enabled = false 5 log-enabled = true 6 write-tracing = false 7 pprof-enabled = true
這里先設置不授權,等一下創建用戶后,再修改為 auth-enabled=true,這個一般也是屬於內部應用,不用ssl加密了。即使要也是通過Nginx進行反向代理。
用戶管理
1 --顯示所有用戶: 2 show users 3 --新增用戶: 4 --普通用戶 (注意:用戶名用雙引號,密碼用單引號) 5 create user "user" with password 'user' 6 --管理員用戶 7 create user "admin" with password 'admin' with all privileges 8 --刪除用戶 9 drop user "user"
創建好后,注意修改influxdb.conf 中的 auth-enable=true, 然后重啟服務 service influxdb restart
1 --創建數據庫 2 create database wunaozai 3 --創建好后,就可以不用管了。一些簡單的操作,可以參考其他博客資料。 4 --刪除數據庫 5 drop database wunaozai 6 --切換使用數據庫 7 use wunaozai
1 --顯示所有表 2 show measurements 3 --新建表(往表里面插入數據,就是新建表了) 4 --插入數據的語法有點特殊,采用的是InfluxDB特有的語法: 5 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] 6 insert wnztable,tag=mqtt value=33 7 --刪除表 8 drop measurements wunaozai
其他的高級語法,不如查詢還有策略就不展開,暫時不是重點,等以后深入研究后,在寫博客介紹。
四、EMQ轉存InfluxDB
EMQ如何把消息轉存到InfluxDB呢,就是本章節的重點,利用上一篇博客中提到的,SpringBoot客戶端監聽EMQ的根Topic,然后把需要進行轉存的Topic及其對應的Payload,構造成InfluxDB表數據,然后插入到InfluxDB中。
下面介紹一下用到的InfluxDB工具類
先在pom.xml中引入InfluxDB相關jar包
1 <!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java --> 2 <dependency> 3 <groupId>org.influxdb</groupId> 4 <artifactId>influxdb-java</artifactId> 5 <version>2.15</version> 6 </dependency>
相關工具類代碼
1 import org.influxdb.InfluxDB; 2 import org.influxdb.InfluxDBFactory; 3 import org.influxdb.dto.Point; 4 5 /** 6 * 數據緩存至InfluxDB 7 * @author wunaozai 8 * 9 */ 10 public class InfluxDBService { 11 12 private static String INFLUXDB_URL = "http://127.0.0.1:8086"; 13 private static String INFLUXDB_USERNAME = "admin"; 14 private static String INFLUXDB_PASSWORD = "admin"; 15 private static String INFLUXDB_DATABASE = "wunaozai"; //注意這里對應數據庫,一般要先在命令行中創建數據庫 16 private static InfluxDB influxDB = null; 17 18 private InfluxDBService(){ 19 20 } 21 22 public static InfluxDB getInstance(){ 23 if(influxDB == null){ 24 influxDB = InfluxDBFactory.connect(INFLUXDB_URL, INFLUXDB_USERNAME, INFLUXDB_PASSWORD); 25 influxDB.setDatabase(INFLUXDB_DATABASE); 26 influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); 27 } 28 return influxDB; 29 } 30 public static int writePoint(Point point){ 31 getInstance().write(point); 32 return 0; 33 } 34 }
在上一篇博客中的MqttPushCallback.java中的
public void messageArrived(String topic, MqttMessage message);
這個函數來轉存。
1 @Override 2 public void messageArrived(String topic, MqttMessage message) throws Exception { 3 try{ 4 System.out.println(topic); 5 String json = new String(message.getPayload()); 6 MQTTProtocolVoModel protocol = BaseModel.parseJSON(json, MQTTProtocolVoModel.class); 7 8 String cmd = protocol.getCmd(); 9 String customer_id = protocol.getProfile().getCustomer_id(); //廠商ID 10 String product_id = protocol.getProfile().getProduct_id(); //產品ID 11 String device_sn = protocol.getProfile().getDevice_sn(); //設備ID 12 Map<String, String> para = protocol.getDatapoint().getPara(); 13 Map<String, Object> fields = new HashMap<>(); //這里是客戶端傳過來的數據點,就是需要被顯示和監控的數據 14 for (Map.Entry<String, String> entry : para.entrySet()) { 15 fields.put(entry.getKey(), entry.getValue()); 16 } 17 Map<String, String> tag = new HashMap<>(); 18 tag.put("customer_id", customer_id); 19 tag.put("product_id", product_id); 20 tag.put("device_sn", device_sn); 21 //這里可以添加很多Tag,為了簡單演示,這里隱藏部分Tag 22 //構造數據點 23 Point point = Point.measurement("datapoint") 24 .tag(tag).fields(fields).build(); 25 InfluxDBService.writePoint(point); 26 }catch (Exception e) { 27 e.printStackTrace(); 28 } 29 }
這里可以通過EMQ Dashboard自帶的Websocket進行發送,也可以通過前面小節用到的PC工具,網上Web端MQTT客戶也很多,可以通過任意MQTT工具進行測試。
下面這個是查詢InfluxDB得到的表數據。
參考資料:
https://www.cnblogs.com/jason1990/p/11076310.html
https://blog.csdn.net/caodanwang/article/details/51967393
https://docs.influxdata.com/influxdb/v1.7/
https://www.cnblogs.com/shhnwangjian/p/6897216.html
本文地址: https://www.cnblogs.com/wunaozai/p/11160730.html