物聯網架構成長之路(33)-EMQ數據存儲到influxDB


一、前言
  時隔一年半,技術變化特別快,學習也要跟上才行。以前寫過EMQ數據轉存問題,當時用了比較笨的方法,通過寫插件的方式,把MQTT里面的數據發送到數據庫進行存儲。當時也是為了學習erlang和emq。現在隨着對物聯網的深入,也結合實際需求,不停的學習。
下面將介紹我實驗測試可行的物聯網數據分析解決方案。采用的還是開源方案。通過訂閱MQTT的根Topic,把所有物聯網數據轉存到InfluxDB時序數據庫,然后通過Grafana進行圖表顯示。這應該是目前比較流行的方案。
二、安裝InfluxDB
  InfluxDB是時序數據庫,特別適合做數據監控和物聯網數據存儲。【也可以說適合我現在參與架構的物聯網平台的技術選型】
  針對InfluxDB也沒有什么可以多說的,詳細可以查閱官方文檔,或者網上的博客文章。我寫的都是平時實踐過程的操作記錄,寫博客,主要是為了以后忘記的時候,回看查閱用的。另一方面是加強跟同行讀者交流的渠道。有一點要注意,一開始為了新,我用InfluxDB 2.0 版本,發現不行,那個太新的,很多對應的開發庫沒有完善好。所以還是采用InfluxDB 1.x版本。這樣在spring boot 里面也有自帶的starter庫可以使用,操作起來特別方便。
  InfluxDB官方文檔: https://docs.influxdata.com/influxdb/v1.7/   安裝:

1 wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add -
2 echo "deb https://repos.influxdata.com/debian stretch stable" | sudo tee /etc/apt/sources.list.d/influxdb.list
3 apt-get update
4 apt-get install influxdb

 

三、InfluxDB基礎命令使用
  修改配置文件 /etc/influxdb/influxdb.conf

1 [http]
2     enabled = true
3     bind-address = ":8086"
4     auth-enabled = false
5     log-enabled = true
6     write-tracing = false
7     pprof-enabled = true

  這里先設置不授權,等一下創建用戶后,再修改為 auth-enabled=true,這個一般也是屬於內部應用,不用ssl加密了。即使要也是通過Nginx進行反向代理。

  用戶管理

1 --顯示所有用戶:
2 show users
3 --新增用戶:
4 --普通用戶 (注意:用戶名用雙引號,密碼用單引號)
5 create user "user" with password 'user'
6 --管理員用戶
7 create user "admin" with password 'admin' with all privileges
8 --刪除用戶
9 drop user "user"

  創建好后,注意修改influxdb.conf 中的 auth-enable=true, 然后重啟服務 service influxdb restart

1 --創建數據庫
2 create database wunaozai
3 --創建好后,就可以不用管了。一些簡單的操作,可以參考其他博客資料。
4 --刪除數據庫
5 drop database wunaozai
6 --切換使用數據庫
7 use wunaozai
1 --顯示所有表
2 show measurements
3 --新建表(往表里面插入數據,就是新建表了)
4 --插入數據的語法有點特殊,采用的是InfluxDB特有的語法:
5 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
6 insert wnztable,tag=mqtt value=33
7 --刪除表
8 drop measurements wunaozai

  其他的高級語法,不如查詢還有策略就不展開,暫時不是重點,等以后深入研究后,在寫博客介紹。

四、EMQ轉存InfluxDB
  EMQ如何把消息轉存到InfluxDB呢,就是本章節的重點,利用上一篇博客中提到的,SpringBoot客戶端監聽EMQ的根Topic,然后把需要進行轉存的Topic及其對應的Payload,構造成InfluxDB表數據,然后插入到InfluxDB中。
  下面介紹一下用到的InfluxDB工具類
  先在pom.xml中引入InfluxDB相關jar包

1 <!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java -->
2 <dependency>
3 <groupId>org.influxdb</groupId>
4 <artifactId>influxdb-java</artifactId>
5 <version>2.15</version>
6 </dependency>

  相關工具類代碼

 1 import org.influxdb.InfluxDB;
 2 import org.influxdb.InfluxDBFactory;
 3 import org.influxdb.dto.Point;
 4 
 5 /**
 6  * 數據緩存至InfluxDB
 7  * @author wunaozai
 8  *
 9  */
10 public class InfluxDBService {
11     
12     private static String INFLUXDB_URL = "http://127.0.0.1:8086";
13     private static String INFLUXDB_USERNAME = "admin";
14     private static String INFLUXDB_PASSWORD = "admin";
15     private static String INFLUXDB_DATABASE = "wunaozai"; //注意這里對應數據庫,一般要先在命令行中創建數據庫
16     private static InfluxDB influxDB = null;
17     
18     private InfluxDBService(){
19         
20     }
21     
22     public static InfluxDB getInstance(){
23         if(influxDB == null){
24             influxDB = InfluxDBFactory.connect(INFLUXDB_URL, INFLUXDB_USERNAME, INFLUXDB_PASSWORD);
25             influxDB.setDatabase(INFLUXDB_DATABASE);
26             influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
27         }
28         return influxDB;
29     }
30     public static int writePoint(Point point){
31         getInstance().write(point);
32         return 0;
33     }
34 }

  在上一篇博客中的MqttPushCallback.java中的

public void messageArrived(String topic, MqttMessage message);

  這個函數來轉存。

 1     @Override
 2     public void messageArrived(String topic, MqttMessage message) throws Exception {
 3         try{
 4             System.out.println(topic);
 5             String json = new String(message.getPayload());
 6             MQTTProtocolVoModel protocol = BaseModel.parseJSON(json, MQTTProtocolVoModel.class);
 7             
 8             String cmd = protocol.getCmd();
 9             String customer_id = protocol.getProfile().getCustomer_id(); //廠商ID
10             String product_id = protocol.getProfile().getProduct_id(); //產品ID
11             String device_sn = protocol.getProfile().getDevice_sn(); //設備ID
12             Map<String, String> para = protocol.getDatapoint().getPara();
13             Map<String, Object> fields = new HashMap<>(); //這里是客戶端傳過來的數據點,就是需要被顯示和監控的數據
14             for (Map.Entry<String, String> entry : para.entrySet()) {
15                 fields.put(entry.getKey(), entry.getValue());
16             }
17             Map<String, String> tag = new HashMap<>();
18             tag.put("customer_id", customer_id);
19             tag.put("product_id", product_id);
20             tag.put("device_sn", device_sn);
21             //這里可以添加很多Tag,為了簡單演示,這里隱藏部分Tag
22             //構造數據點
23             Point point = Point.measurement("datapoint")
24                     .tag(tag).fields(fields).build();
25             InfluxDBService.writePoint(point);
26         }catch (Exception e) {
27             e.printStackTrace();
28         }
29     }

  這里可以通過EMQ Dashboard自帶的Websocket進行發送,也可以通過前面小節用到的PC工具,網上Web端MQTT客戶也很多,可以通過任意MQTT工具進行測試。
  下面這個是查詢InfluxDB得到的表數據。

 


參考資料:
  https://www.cnblogs.com/jason1990/p/11076310.html
  https://blog.csdn.net/caodanwang/article/details/51967393
  https://docs.influxdata.com/influxdb/v1.7/
  https://www.cnblogs.com/shhnwangjian/p/6897216.html

本文地址: https://www.cnblogs.com/wunaozai/p/11160730.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM