Docker安裝InfluxDB1.x和InfluxDB2.x以及與SpringBoot整合


兩者區別:

1.x 版本使用 influxQL 查詢語言,2.x 和 1.8+(beta) 使用 flux 查詢語法;相比V1 移除了database 和 RP,增加了bucket。
V2具有以下幾個概念:
timestamp、field key、field value、field set、tag key、tag value、tag set、measurement、series、point、bucket、bucket schema、organization
新增的概念:
bucket:所有 InfluxDB 數據都存儲在一個存儲桶中。一個桶結合了數據庫的概念和存儲周期(時間每個數據點仍然存在持續時間)。一個桶屬於一個組織
bucket schema:具有明確的schema-type的存儲桶需要為每個度量指定顯式架構。測量包含標簽、字段和時間戳。顯式模式限制了可以寫入該度量的數據的形狀。
organization:InfluxDB組織是一組用戶的工作區。所有儀表板、任務、存儲桶和用戶都屬於一個組織。

一、InfluxDB1.x Docker安裝以及與Boot整合

A、docker安裝InfluxDB1.x (influxdb1.8.4)
1、安裝:
docker run -d  --name influxdb -p 8086:8086 influxdb:1.8.4 
2、查看

docker ps -a
3、進入docker的influx中
docker exec -it daf88772adc9 /bin/bash
4、直接輸入influx啟動
influx
5、修改賬戶密碼
# 顯示用戶
SHOW USERS
# 創建用戶
CREATE USER "username" WITH PASSWORD 'password'
# 賦予用戶管理員權限
GRANT ALL PRIVILEGES TO username
# 創建管理員權限的用戶
CREATE USER <username> WITH PASSWORD '<password>' WITH ALL PRIVILEGES
# 修改用戶密碼
SET PASSWORD FOR username = 'password'
# 撤消權限
REVOKE ALL ON mydb FROM username
# 查看權限
SHOW GRANTS FOR username
# 刪除用戶
DROP USER "username"

6、在配置文件啟用認證
默認情況下,influxdb的配置文件是禁用認證策略的,所以需要修改設置一下。
編輯配置文件vim /etc/influxdb/influxdb.conf,把 [http] 下的 auth-enabled 選項設置為 true
7、設置保存策略(多長時間之前的數據需要刪除)---默認為 autogen 永久不刪除
a、查看數據庫的保存策略
show retention policies on 數據庫名
例子:
# 選擇使用telegraf數據庫
> use influx_test;
Using database influx_test
# 查詢數據保存策略
> show retention policies on influx_test
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 0s       168h0m0s           1        true

name 策略名稱:默認autogen
duration 持續時間: 0s 代表無限制
shardGroupDuration shardGroup數據存儲時間:shardGroup是InfluxDB的一個基本存儲結構, 應該大於這個時間的數據在查詢效率上應該有所降低。
replicaN 副本個數:1 代表只有一個副本
default 是否默認策略:true 代表設置為該數據庫的默認策略
b、設置保存策略
# 新建一個策略
CREATE RETENTION POLICY "策略名稱" ON 數據庫名 DURATION 時長 REPLICATION 副本個數;
# 新建一個策略並且直接設置為默認策略
CREATE RETENTION POLICY "策略名稱" ON 數據庫名 DURATION 時長 REPLICATION 副本個數 DEFAULT;
例子:
# 創建新的默認策略role_01保留數據時長1小時
> CREATE RETENTION POLICY "1hour" ON influx_test DURATION 1h REPLICATION 1 DEFAULT;
c、修改保存策略

ALTER RETENTION POLICY "策略名稱" ON "數據庫名" DURATION 時長
ALTER RETENTION POLICY "策略名稱" ON "數據庫名" DURATION 時長 DEFAULT
d、刪除保存策略

drop retention POLICY "策略名" ON "數據庫名"
8、使用桌面可視化工具連接數據庫

如果剛才沒有設置密碼,這里可以不需要填寫密碼,如果有賬號密碼則需要勾上下面的Use SSL
連接成功后如下:

B、InfluxDB1.x與Spring整合(只列舉部分代碼,后面會放上整個項目的GitHub地址)
整個項目結構如下:

1、引入依賴 (其他依賴未顯示全,后面會放上整個項目的GitHub地址)

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.20</version>
</dependency>
2、新建yml文件

influx:
  url: 'http://xxx.xx.xxx.xx:8086'
  password: 'password'
  username: 'username'
3、連接配置 InfluxDBConfig

@Data
@Configuration
@ConfigurationProperties(prefix = "influx")
public class InfluxDBConfig {
    private String url;
    private String username;
    private String password;
    /**
      * description: 用於查詢
      * date: 2022/1/20 23:11
      * author: zhouhong
      * @param  * @param null
      * @return
      */
    @Bean(destroyMethod = "close")
    public InfluxDB influxDBClient(){
        return InfluxDBFactory.connect(this.url, this.username, this.password);
    }
    /**
      * description: 用於寫入
      * date: 2022/1/20 23:12
      * author: zhouhong
      * @param  * @param null
      * @return
      */
    @Bean(name = "influxDbWriteApi",destroyMethod = "close")
    public WriteApi influxDbWriteApi(){
        InfluxDBClient influxDBClient = InfluxDBClientFactory.createV1(this.url, this.username,
                this.password.toCharArray(), "influx_test", "autogen");
        return influxDBClient.getWriteApi();
    }
}
4、封裝用於查詢的方法

@Component
public class InfluxUtil {
    /**
      * description: 通用查詢
      * date: 2022/1/20 23:13
      * author: zhouhong
      * @param  * @param null
      * @return
      */
    public QueryResult query(String command, String database, InfluxDB influxDB) {
        Query query = new Query(command, database);
        return influxDB.query(query);
    }
}
5、新建需要寫入的數據的實體類、需要返回的類(省略,具體參考github示例)InsertParams.java InfluxResult.java
6、新建server層和impl實現類
InfluxServiceImpl.java 如下:

/**
  * description: 時序數據庫Impl
  * date: 2022/1/16 20:47
  * author: zhouhong
  */
@Service
@Slf4j
public class InfluxServiceImpl implements InfluxService {
    @Resource(name = "influxDbWriteApi")
    private WriteApi influxDbWriteApi;
    @Resource(name = "influxDBClient")
    private InfluxDB influxDBClient;
    @Autowired
    private InfluxUtil influxUtil;
    @Override
    public void insert(InsertParams insertParams) {
        influxDbWriteApi.writeMeasurement(WritePrecision.MS, insertParams);
    }
    @Override
    public Object queryAll(InsertParams insertParams) {
        List<InfluxResult> list = new ArrayList<>();
        InfluxResult influxResult = new InfluxResult();
        String sql = "SELECT * FROM \"influx_test\" WHERE time > '2022-01-16'  tz('Asia/Shanghai')";
        QueryResult queryResult = influxUtil.query(sql, "influx_test", influxDBClient);
        queryResult.getResults().get(0).getSeries().get(0).getValues().forEach(item -> {
            influxResult.setTime(item.get(0).toString());
            influxResult.setCurrent(item.get(1).toString());
            influxResult.setEnergyUsed(item.get(2).toString());
            influxResult.setPower(item.get(3).toString());
            influxResult.setVoltage(item.get(4).toString());
            list.add(influxResult);
        });
        return list;
    }
    @Override
    public Object querySumByOneDay(InsertParams insertParams) {
        String sql = "SELECT  SUM(voltage)  FROM \"influx_test\" WHERE time > '2022-01-18'  GROUP BY time(1d)  tz('Asia/Shanghai')";
        QueryResult queryResult = influxUtil.query(sql, "influx_test", influxDBClient);
        return queryResult.getResults().get(0).getSeries().get(0);
    }
}
7、controller層 InfluxDbController.java(返回結果是封裝過后的,詳情見github示例)

@RestController
public class InfluxDbController {
    @Autowired
    private InfluxService influxService;
    /**
      * description: 時序數據庫插入測試
      * date: 2022/1/16 23:00
      * author: zhouhong
      * @param  * @param null
      * @return
      */
    @PostMapping("/influxdb/insert")
    public ResponseData insert(@RequestBody InsertParams insertParams) {
        influxService.insert(insertParams);
        return new SuccessResponseData();
    }
    /**
     * description: 時序數據庫查詢全部數據測試
     * date: 2022/1/16 23:00
     * author: zhouhong
     * @param  * @param null
     * @return
     */
    @PostMapping("/influxdb/queryAll")
    public ResponseData query(@RequestBody InsertParams insertParams) {
        return new SuccessResponseData(influxService.queryAll(insertParams));
    }
    /**
     * description: 時序數據庫按天查詢當前電壓總和測試
     * date: 2022/1/16 23:00
     * author: zhouhong
     * @param  * @param null
     * @return
     */
    @PostMapping("/influxdb/queryByOneDay")
    public ResponseData queryByOneDay(@RequestBody InsertParams insertParams) {
        return new SuccessResponseData(influxService.querySumByOneDay(insertParams));
    }
}
8、PostMan測試(注意需要先新建一個 數據庫---influx_test)
8.1 插入測試 localhost:9998/influxdb/insert
入參:

{
    "energyUsed":243.78,
    "power":54.50,
    "current":783.34,
    "voltage":44.09
}
返回:

{
    "success": true,
    "code": 200,
    "message": "請求成功",
    "localizedMsg": "請求成功",
    "data": null
}
8.2、查詢全部(注意,這里返回結果我封裝了一下)localhost:9998/influxdb/queryAll
入參:

{
}
返回:

{
    "success": true,
    "code": 200,
    "message": "請求成功",
    "localizedMsg": "請求成功",
    "data": [
        {
            "energyUsed": "243.78",
            "power": "54.5",
            "current": "783.34",
            "voltage": "44.09",
            "time": "2022-01-20T23:44:00.626+08:00"
        },
        {
            "energyUsed": "243.78",
            "power": "54.5",
            "current": "783.34",
            "voltage": "44.09",
            "time": "2022-01-20T23:44:00.626+08:00"
        }
    ]
}
8.3聚合查詢(統計2022-01-18到現在,以天為單位每天的用電量之和) localhost:9998/influxdb/queryByOneDay 精度問題暫時沒處理
入參:
{ }
返回:

{
    "success": true,
    "code": 200,
    "message": "請求成功",
    "localizedMsg": "請求成功",
    "data": {
        "name": "influx_test",
        "tags": null,
        "columns": [
            "time",
            "sum"
        ],
        "values": [
            [
                "2022-01-18T00:00:00+08:00",
                null
            ],
            [
                "2022-01-19T00:00:00+08:00",
                null
            ],
            [
                "2022-01-20T00:00:00+08:00",
                481.07000000000005
            ]
        ]
    }
}
C、常見的查詢SQL 后面加上 tz('Asia/Shanghai') 解決時區差
1、查所指定時間之后的所有

SELECT * FROM "real_water_amount" where time  > '2022-01-01' tz('Asia/Shanghai')

2、查詢平均值 mean()

SELECT mean(value) FROM "real_water_amount" where time  > '2022-01-01' tz('Asia/Shanghai')

3、查詢最大最小值 max() min()

SELECT max(value) FROM "real_water_amount" where time  > '2022-01-01' tz('Asia/Shanghai')

4、按年、月、天、周、小時、分鍾、秒統計

SELECT sum(value) FROM "real_water_amount" where time  > '2022-01-01'  group by time(1d)  tz('Asia/Shanghai')

5、按照列過濾

SELECT * FROM "real_water_amount" where time  > '2022-01-01'   and  iotId = '8ecJY59UJd1jwPLBmJA5000000'

二、InfluxDB2.x Docker安裝以及與Boot整合

A、Docker安裝InfluxDB2.x
1、安裝:默認拉取最新版本
docker run -d --name influxdb -p 8086:8086 influxdb
2、查看
docker ps -a
3、瀏覽器訪問 IP:8086 (注意:部署在遠程服務器上需要開啟8086端口安全組)設置賬號密碼

從上到下為:賬號(zhouhong)、密碼(66668888)、確認密碼(66668888)、組織(my_influxdb)、Buucket(Tom);完了之后點擊 Quick Start
4、然后點擊 Data -- > Buucket 就可以看到我們剛才創建的 名字為 Tom 的 Buucket了

5、點擊 API Tokens 獲取當前用戶的 Token(整合時需要)

6、設置Bucket的保存策略

准備工作完成,開始整合
B、InfluxDB2.x與SpringBoot整合
1、依賴

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.20</version>
</dependency>
2、yml配置文件

influx:
  influxUrl: 'http://XXX.XX.XXX.XX:8086'
  bucket: 'tom'
  org: 'my_influxdb'
  token: 'Rt23UemGI_cfS-lFDrurtjh46P1enfhrji-KrZYR04wUR1Yxw_oBCZPL6GmFYSDn20Q9gM_P9DIBhHc2RJjNkA=='
3、配置類

@Setter
@Getter
public class InfluxBean{
    /**
     * 數據庫url地址
     */
    private String influxUrl;
    /**
     * 桶(表)
     */
    private String bucket;
    /**
     * 組織
     */
    private String org;
    /**
     * token
     */
    private String token;
    /**
     * 數據庫連接
     */
    private InfluxDBClient client;
    /**
     * 構造方法
     */
    public InfluxBean(String influxUrl, String bucket, String org, String token) {
        this.influxUrl = influxUrl;
        this.bucket = bucket;
        this.org = org;
        this.token = token;
        this.client = getClient();
    }
    /**
     * 獲取連接
     */
    private InfluxDBClient getClient() {
        if (client == null) {
            client  = InfluxDBClientFactory.create(influxUrl, token.toCharArray());
        }
        return client;
    }
    /**
     * 寫入數據(以秒為時間單位)
     */
    public void write(Object object){
        try (WriteApi writeApi = client.getWriteApi()) {
            writeApi.writeMeasurement(bucket, org, WritePrecision.NS, object);
        }
    }
    /**
     * 讀取數據
     */
    public List<FluxTable> queryTable(String fluxQuery){
        return client.getQueryApi().query(fluxQuery, org);
    }
}
@Data
@Configuration
@ConfigurationProperties(prefix = "influx")
public class InfluxConfig {
    /**
     * url地址
     */
    private String influxUrl;
    /**
     * 桶(表)
     */
    private String bucket;
    /**
     * 組織
     */
    private String org;
    /**
     * token
     */
    private String token;
    /**
     * 初始化bean
     */
    @Bean(name = "influx")
    public InfluxBean InfluxBean() {
        return new InfluxBean(influxUrl, bucket, org, token);
    }
}
4、實現類

@Service
@Slf4j
public class InfluxServiceImpl implements InfluxService {
    @Resource
    private InfluxBean influxBean;
    @Override
    public void insert(InsertParams insertParams) {
        insertParams.setTime(Instant.now());
        influxBean.write(insertParams);
    }
    @Override
    public List<InfluxResult> queue(){
        // 下面兩個 private 方法 賦值給 list 查詢對應的數據
        List<FluxTable> list = queryInfluxAll();
        List<InfluxResult> results = new ArrayList<>();
        for (int i = 0; i < list.size(); i++) {
            for (int j = 0; j < list.get(i).getRecords().size(); j++) {
                InfluxResult influxResult = new InfluxResult();
                influxResult.setCurrent(list.get(i).getRecords().get(j).getValues().get("current").toString());
                influxResult.setEnergyUsed(list.get(i).getRecords().get(j).getValues().get("energyUsed").toString());
                influxResult.setPower(list.get(i).getRecords().get(j).getValues().get("power").toString());
                influxResult.setVoltage(list.get(i).getRecords().get(j).getValues().get("voltage").toString());
                influxResult.setTime(list.get(i).getRecords().get(j).getValues().get("_time").toString());
                System.err.println(list.get(i).getRecords().get(j).getValues().toString());
                results.add(influxResult);
            }
        }
        return results;
    }
    /**
      * description: 查詢一小時內的InsertParams所有數據
      * date: 2022/1/21 13:44
      * author: zhouhong
      * @param  * @param null
      * @return
      */
    private List<FluxTable> queryInfluxAll(){
        String query = " from(bucket: \"tom\")" +
                "  |> range(start: -60m, stop: now())" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"influx_test\")" +
                "  |> pivot( rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\" )";
        return influxBean.queryTable(query);
    }
    /**
      * description: 根據某一個字段的值過濾(查詢 用電量 energyUsed 為 322 的那條記錄)
      * date: 2022/1/21 12:44
      * author: zhouhong
      * @param  * @param null
      * @return
      */
    public List<FluxTable> queryFilterByEnergyUsed(){
        String query = " from(bucket: \"tom\")" +
                "  |> range(start: -60m, stop: now())" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"influx_test\")" +
                "  |> filter(fn: (r) => r[\"energyUsed\"] == \"322\")" +
                "  |> pivot( rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\" )";
        return influxBean.queryTable(query);
    }
}
C、測試
1、插入 localhost:9998/inlfuxdb/insert
入參:

{
    "energyUsed":"23.12",
    "power":"321.60",
    "current":"782.72",
    "voltage":"67.43"
}
返回:
{
    "success": true,
    "code": 200,
    "message": "請求成功",
    "localizedMsg": "請求成功",
    "data": null
}
2、查詢所有
入參:
{}
返回:

{
    "success": true,
    "code": 200,
    "message": "請求成功",
    "localizedMsg": "請求成功",
    "data": [
        {
            "energyUsed": "23.12",
            "power": "321.60",
            "current": "782.72",
            "voltage": "67.43",
            "time": "2022-01-20T17:51:01.819Z"
        },
        {
            "energyUsed": "243.78",
            "power": "541.50",
            "current": "32.34",
            "voltage": "89.09",
            "time": "2022-01-20T17:33:47.246Z"
        }
    ]
}
D、Flux常見查詢語句
1、指定數據源:from(bucket:"tom")
指定時間范圍:
使用管道轉發運算符 ( |>) 將數據從數據源通過管道傳輸到range() 函數,該函數指定查詢的時間范圍。它接受兩個參數:start和stop。范圍可以是使用相對負持續時間 或使用絕對時間

//使用絕對時間
from(bucket:"tom")
  |> range(start: 2022-01-05T23:30:00Z, stop: 2022-01-21T00:00:00Z) 
//過去十五天的數據
from(bucket:"tom")
  |> range(start: -15d)

2、數據過濾
將范圍數據傳遞到filter()函數中,以根據數據屬性或列縮小結果范圍
// 根據 _measurement 和 _field 過濾 
 from(bucket:"tom")
  |> range(start: -15d)
  |> filter(fn: (r) =>
    r._measurement == "influx_test" and
    r._field == "power" and
    r.energyUsed == "23.12"
  )

3、數據轉換
使用函數,將數據聚合為平均值、下采樣數據等
from(bucket:"tom")
  |> range(start: -15d)
  |> filter(fn: (r) =>
    r._measurement == "influx_test"
  )
  |> window(every: 10m)
  
  from(bucket:"tom")
  |> range(start: -15d)
  |> filter(fn: (r) =>
    r._measurement == "influx_test"
  )
  |> window(every: 10m)
  |> mean()
其他查詢函數請查看官網: https://docs.influxdata.com/flux/v0.x/stdlib/universe/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM