使用InfluxDB的連續查詢解決聚合性能問題


==背景==

數據庫:我們的生產環境中有一個設備運行的數據庫使用的是InfluxDB,這里面存儲了所有設備上報上來的實時運行數據,數據量增速較快。

功能需求:產品有一個叫趨勢分析的功能,用來按照不同的算子(mean、max等),不同的時間段(1分鍾、30分鍾)等對數據進行聚合。

 

==版本==

1.7.1、單機版

 

==問題==

經過壓力測試之后,發現當聚合時間選擇1分鍾、5分鍾等細粒度的時間的是偶,聚合的速度非常的慢。

概括一句話:基於原始數據進行實時聚合,不合理

 

==解決思路==

InfluxDB提供了連續查詢的高級功能,嘗試在每天凌晨的時候將數據聚合好,

官方文檔:https://docs.influxdata.com/influxdb/v1.7/query_language/continuous_queries/

強烈建議把官方文檔從頭到尾瀏覽一遍,是學習一門技術最好的入門方法。

 

==初次嘗試==

1、創建存儲聚合結果的數據庫

create database rexel_analysis

 

2、為數據庫創建保存策略

設置數據留存時間為1年(365天)。

create retention policy one_year on rexel_analysis duration 365d replication 1 default

 

3、創建數據庫權限

出於安全考慮,為數據庫做了ACL權限。

GRANT read ON rexel_analysis TO devread
GRANT write ON rexel_analysis TO devwrite
GRANT all ON rexel_analysis TO devall

 

4、創建一個連續查詢

CREATE CONTINUOUS QUERY cq_mean_1m ON rexel_private BEGIN SELECT mean(*) INTO rexel_analysis.one_year.data_up_1m FROM rexel_private.one_year.device_data_up GROUP BY time(1m) END

 

5、查看已有連續查詢

SHOW CONTINUOUS QUERIES

 

6、查看連續查詢的計算結果

從結果上可以看到,連續查詢按照我預設的每分鍾執行1次,並將結果插入到了另一個數據庫中。

use rexel_analysis
show measurements
select mean_AI01_0001, mean_AR03_0256 from data_up_1m order by desc tz('Asia/Shanghai')

 

7、刪除連續查詢

DROP CONTINUOUS QUERY cq_mean_1m ON rexel_private

 

8、修改連續查詢

根據官網的介紹,創建CQ之后,無法進行更改,如果需要更改需要drop掉之后重新create。

 

9、查詢連續查詢的日志

待補充

 

==初次嘗試體驗==

以上是初次嘗試InfluxDB的連續查詢的過程,有幾個體驗:

【好的體驗】

1、可以看到連續查詢會按照指定的時間計划對數據進行聚合,並將結果保存到指定的地方,是一個很好的解決性能的思路。

2、表中的字段有好幾千個,使用帶有通配符(*)的函數和INTO查詢的反向引用語法,可以自動對數據庫中所有度量和數字字段中的數據進行降采樣。

 

【不好的體驗】

1、每次連續查詢時間間隔很短(時間間隔 = now() - group by time())

2、查詢結果的字段別名比較惡心,比如原來字段叫AI01_0001,因為計算的是mean,結果庫中的字段名就變為了mean_AI01_0001。

 

==配置采樣頻率與時間范圍==

連續查詢提供了高級語法:RESAMPLE EVERY FOR

CREATE CONTINUOUS QUERY <cq_name> ON <database_name> 
[RESAMPLE [EVERY <interval>] [FOR <interval>]] 
BEGIN SELECT <function>(<stuff>)[,<function>(<stuff>)] INTO <different_measurement> 
FROM <current_measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<stuff>] 
END

RESAMPLE EVERY :采樣執行頻次。如RESAMPLE EVERY 30m:表示30分鍾執行一次。

RESAMPLE FOR :采樣時間范圍。如RESAMPLE FOR 60m:時間范圍 = now() - for間隔(60m)。

RESAMPLE EVERY 30m FOR 60m:表示每30分鍾執行一次60分鍾內的數據計算。

 

注意:

如果此時在<cq_query>中使用了GROUP BY time duration,那么FOR定義的duration必須大於或者等於GROUP BY指定的time duration,不然就會報錯。

反過來,如果EVERY定義的duration 大於GROUP BY指定的time duration,那么執行將按照EVERY定義的duration來執行。

例如:如果GROUP BY time(5m)且EVERY間隔為10m,則CQ每十分鍾執行一次

 

==語句樣例==

每1分鍾執行1次平均值計算,時間范圍1分鍾
CREATE CONTINUOUS QUERY cq_mean_1m ON rexel_private  BEGIN SELECT mean(*) INTO rexel_analysis.one_year.data_up_1m FROM rexel_private.one_year.device_data_up GROUP BY time(1m) END

每1分鍾執行1次平均值計算,時間范圍1天
CREATE CONTINUOUS QUERY cq_mean_1m ON rexel_private RESAMPLE FOR 1d BEGIN SELECT mean(*) INTO rexel_analysis.one_year.data_up_1m FROM rexel_private.one_year.device_data_up GROUP BY time(1m) END

 

==項目實踐==

經過上面一番體驗之后,對連續查詢已經有了基本的了解,那么實際中如何使用呢?

我們的場景:

1、可選的時間組(共8個):1分鍾、5分鍾、30分鍾、1小時、6小時、12小時、1天、1周

2、可選的聚合模式(共8個):最老值(last)、最新值(first)、最大值(max)、最小值(min)、平均值(mean)、中間值(median)、極差值(spread)、累加值(sum)

3、時間范圍:最多3個月

 

那么,連續查詢策略該如何設計呢?

【方案一】

按照時間組和聚合模式的排列組合創建查詢策略。如下圖所示,這種方案一共需要創建64個連續查詢,感覺有些啰嗦。

 

【方案二】

按照和時間組創建查詢策略。如下圖所以,每一行的查詢策略是一樣的,各個聚合方法的結果放在同一張表中。

這樣減少了連續查詢的數量,維護也方便了很多。

表中的數據大概是這個樣子的

 

【方案三】

將方案二工具化,在mysql中創建一個關於influxdb連續查詢的字典表,根據這個表來自動創建連續查詢。(思想:讓機器做的更多

建表語句及數據如下:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for influx_cq_dict
-- ----------------------------
DROP TABLE IF EXISTS `influx_cq_dict`;
CREATE TABLE `influx_cq_dict`  (
  `cq_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '連續查詢的名稱',
  `from_database` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '源數據庫',
  `from_retention_policy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '源存儲策略',
  `from_measurement` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '源表名',
  `to_database` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '目標數據庫',
  `to_retention_policy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '目標存儲策略',
  `to_measurement` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '目標表名',
  `for_interval` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '時間間隔',
  `every` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '執行頻率',
  `field` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '查詢字段',
  `func` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '聚合功能',
  `group_by_time` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GROUP BY指定的time duration',
  `fill` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '空白填充方式',
  `is_delete` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0' COMMENT '是否刪除 0,未刪除;1:刪除',
  PRIMARY KEY (`cq_name`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 146 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'InfluxDB連續查詢字典表' ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of influx_cq_dict
-- ----------------------------
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_12h', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_12h', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '12h', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1d', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1d', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1d', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1h', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1h', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1h', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1m', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1m', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1m', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1w', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1w', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1w', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_30m', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_30m', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '30m', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_5m', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_5m', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '5m', 'none', '0');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_5m_test', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_5m_test', '1h', '5m', 'AI01_0001,AI01_0002', 'last,first,max,min,mean,median,spread,sum', '5m', 'none', '1');
INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_6h', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_6h', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '6h', 'none', '0');

SET FOREIGN_KEY_CHECKS = 1;

 

==Java代碼==

1、Controller類

package com.rexel.backstage.project.tool.init.controller;

import com.rexel.backstage.project.tool.init.service.IInfluxCqDictService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.rexel.backstage.framework.web.controller.BaseController;
import com.rexel.backstage.framework.web.domain.AjaxResult;

/**
 * InfluxDB連續查詢Controller
 *
 * @date 2020-07-30
 */
@RestController
@RequestMapping("/rexel/tool/influx/continuousQuery")
public class InfluxCqDictController extends BaseController {
    private IInfluxCqDictService influxCqDictService;

    @Autowired
    public void setInfluxCqDictService(IInfluxCqDictService influxCqDictService) {
        this.influxCqDictService = influxCqDictService;
    }

    /**
     * 創建InfluxDB連續查詢
     */
    @PostMapping("/refresh")
    public AjaxResult refresh(@RequestParam("type") String type) {
        return AjaxResult.success(influxCqDictService.refreshContinuousQuery(type));
    }
}

 

2、Service接口類

package com.rexel.backstage.project.tool.init.service;

import com.alibaba.fastjson.JSONObject;

/**
 * InfluxDB連續查詢Service接口
 *
 * @author admin
 * @date 2020-07-30
 */
public interface IInfluxCqDictService {
    /**
     * 刷新InfluxDB連續查詢
     *
     * @param type create/drop
     * @return 結果
     */
    JSONObject refreshContinuousQuery(String type);
}

 

3、Service實現類

package com.rexel.backstage.project.tool.init.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rexel.backstage.project.tool.init.domain.InfluxCqDict;
import com.rexel.backstage.project.tool.init.mapper.InfluxCqDictMapper;
import com.rexel.backstage.project.tool.init.service.IInfluxCqDictService;
import com.rexel.influxdb.InfluxUtils;
import com.rexel.influxdb.constans.InfluxSql;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * InfluxDB連續查詢Service業務層處理
 * 
 * @author admin
 * @date 2020-07-30
 */
@Service
@Slf4j
public class InfluxCqDictServiceImpl implements IInfluxCqDictService {
    private InfluxUtils influxUtils = InfluxUtils.getInstance();
    private InfluxCqDictMapper influxCqDictMapper;
    private List<InfluxCqDict> influxCqDictList;
    private final static String INIT = "init";
    private final static String DROP = "drop";
    private final static String CREATE = "create";

    @Autowired
    public void setInfluxCqDictMapper(InfluxCqDictMapper influxCqDictMapper) {
        this.influxCqDictMapper = influxCqDictMapper;
    }

    /**
     * 刷新InfluxDB連續查詢
     * 
     * @return 結果
     */
    @Override
    public JSONObject refreshContinuousQuery(String type) {
        influxCqDictList = influxCqDictMapper.selectInfluxCqDictList();

        // 首次
        if (INIT.toLowerCase().equals(type.toLowerCase())) {
            recreateDatabase();
            dropAllCp();
            createAllCp();
        }

        // 刪除
        if (DROP.toLowerCase().equals(type.toLowerCase())) {
            dropAllCp();
        }

        // 創建
        if (CREATE.toLowerCase().equals(type.toLowerCase())) {
            dropAllCp();
            createAllCp();
        }

        return new JSONObject();
    }

    /**
     * 獲取源數據庫列表
     *
     * @return 列表
     */
    private List<String> getDatabaseFrom() {
        List<String> result = new ArrayList<>();
        for(InfluxCqDict influxCqDict : influxCqDictList) {
            String database = influxCqDict.getFromDatabase();
            if (!result.contains(database)) {
                result.add(database);
            }
        }
        return result;
    }

    /**
     * 獲取目標數據庫列表
     *
     * @return 列表
     */
    private List<String> getDatabaseTo() {
        List<String> result = new ArrayList<>();
        for(InfluxCqDict influxCqDict : influxCqDictList) {
            String database = influxCqDict.getToDatabase();
            if (!result.contains(database)) {
                result.add(database);
            }
        }
        return result;
    }

    /**
     * 重新創建database
     */
    private void recreateDatabase() {
        List<String> dbList = getDatabaseTo();
        for(String database : dbList) {
            influxUtils.dropDatabase(database);
            influxUtils.createDatabase(database);
            influxUtils.createRetentionPolicy(database);
        }
    }

    /**
     * 刪除所有連續查詢
     */
    private void dropAllCp() {
        JSONArray jsonArray = influxUtils.getContinuousQueries();
        List<String> dbList =  getDatabaseFrom();
        for(String database : dbList) {
            for (int i = 0; i < jsonArray.size(); i++) {
                JSONObject jsonObject = jsonArray.getJSONObject(i);
                influxUtils.dropContinuousQuery(jsonObject.getString("name"), database);
            }
        }
    }

    /**
     * 創建所有連續查詢
     */
    private void createAllCp() {
        for(InfluxCqDict influxCqDict : influxCqDictList) {
            String createCqStr = makeOneCqStr(influxCqDict);
            influxUtils.createContinuousQuery(createCqStr);
        }
    }

    /**
     * 生成單個連續查詢語句
     *
     * @param influxCqDict InfluxCqDict
     * @return 連續查詢語句
     */
    private String makeOneCqStr(InfluxCqDict influxCqDict) {
        String every = makeEvery(influxCqDict);
        String fields = makeFields(influxCqDict);
        String groupBy = makeGroupBy(influxCqDict);

        JSONObject paramJson = new JSONObject();
        paramJson.put("cqName", influxCqDict.getCqName());
        paramJson.put("onDatabase", influxCqDict.getFromDatabase());
        paramJson.put("every", every);
        paramJson.put("forInterval", influxCqDict.getForInterval());
        paramJson.put("fields", fields);
        paramJson.put("toDatabase", influxCqDict.getToDatabase());
        paramJson.put("toRetentionPolicy", influxCqDict.getToRetentionPolicy());
        paramJson.put("toMeasurement", influxCqDict.getToMeasurement());
        paramJson.put("fromDatabase", influxCqDict.getFromDatabase());
        paramJson.put("fromRetentionPolicy", influxCqDict.getFromRetentionPolicy());
        paramJson.put("fromMeasurement", influxCqDict.getFromMeasurement());
        paramJson.put("groupBy", groupBy);
        paramJson.put("fill", influxCqDict.getFill());
        return InfluxUtils.formatSql(InfluxSql.CREATE_CONTINUOUS_QUERY, paramJson);
    }

    /**
     * 生成語句Field字段
     *
     * @param influxCqDict InfluxCqDict
     * @return Field字段
     */
    private String makeFields(InfluxCqDict influxCqDict) {
        String[] fields = influxCqDict.getField().split(",");
        String[] funcs = influxCqDict.getFunc().split(",");

        StringBuilder sb = new StringBuilder();
        for (String field : fields) {
            for (String func : funcs) {
                sb.append(func).append("(").append(field).append("),");
            }
        }
        return sb.substring(0, sb.length() - 1);
    }

    /**
     * 生成GroupBy字段
     *
     * @param influxCqDict InfluxCqDict
     * @return GroupBy字段
     */
    private String makeGroupBy(InfluxCqDict influxCqDict) {
        List<String> tagKeys = influxUtils.getMeasurementTagKeys(
            influxCqDict.getFromDatabase(), influxCqDict.getFromMeasurement());

        StringBuilder sb = new StringBuilder();
        sb.append("time(").append(influxCqDict.getGroupByTime()).append(")");
        if (tagKeys.size() > 0) {
            sb.append(",");
        }
        for (String tagKey : tagKeys) {
            sb.append(tagKey).append(",");
        }
        return sb.substring(0, sb.length() - 1);
    }

    /**
     * 生成EVERY字段
     *
     * @param influxCqDict InfluxCqDict
     * @return EVERY字段
     */
    private String makeEvery(InfluxCqDict influxCqDict) {
        String every = influxCqDict.getEvery();
        if (every != null && !every.isEmpty()) {
            return " EVERY " + every;
        }
        return "";
    }
}

 

4、Domain類

package com.rexel.backstage.project.tool.init.domain;

import lombok.Data;

/**
 * InfluxDB連續查詢domain類
 *
 * @author admin
 * @date 2020-07-30
 */
@Data
public class InfluxCqDict {
    /** 連續查詢的名稱 */
    private String cqName;

    /** 源數據庫 */
    private String fromDatabase;

    /** 源存儲策略 */
    private String fromRetentionPolicy;

    /** 源表名 */
    private String fromMeasurement;

    /** 目標數據庫 */
    private String toDatabase;

    /** 目標存儲策略 */
    private String toRetentionPolicy;

    /** 目標表名 */
    private String toMeasurement;

    /** 時間間隔 */
    private String forInterval;

    /** 執行頻率 */
    private String every;

    /** 查詢字段 */
    private String field;

    /** 聚合功能 */
    private String func;

    /** GROUP BY指定的time duration */
    private String groupByTime;

    /** 空白填充方式 */
    private String fill;
}

 

5、Mapper類

package com.rexel.backstage.project.tool.init.mapper;

import com.rexel.backstage.project.tool.init.domain.InfluxCqDict;
import java.util.List;
import org.springframework.stereotype.Repository;

/**
 * InfluxDB連續查詢Mapper接口
 *
 * @author admin
 * @date 2020-07-30
 */
@Repository
public interface InfluxCqDictMapper {
    /**
     * 查詢InfluxDB連續查詢
     *
     * @return InfluxDB連續查詢列表
     */
     List<InfluxCqDict> selectInfluxCqDictList();

    /**
     * 新增InfluxDB連續查詢
     *
     * @param influxCqDict InfluxDB連續查詢
     * @return 結果
     */
    int insertInfluxCqDict(InfluxCqDict influxCqDict);

    /**
     * 刪除InfluxDB連續查詢
     *
     * @param database 源數據庫名
     * @return 結果
     */
    int deleteInfluxCqDictByDatabase(String database);
}

 

6、MyBatis的XML文件

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rexel.backstage.project.tool.init.mapper.InfluxCqDictMapper">
    
    <resultMap type="com.rexel.backstage.project.tool.init.domain.InfluxCqDict" id="InfluxCqDictResult">
        <result property="cqName" column="cq_name"/>
        <result property="fromDatabase" column="from_database"/>
        <result property="fromRetentionPolicy" column="from_retention_policy"/>
        <result property="fromMeasurement" column="from_measurement"/>
        <result property="toDatabase" column="to_database"/>
        <result property="toRetentionPolicy" column="to_retention_policy"/>
        <result property="toMeasurement" column="to_measurement"/>
        <result property="forInterval" column="for_interval"/>
        <result property="every" column="every"/>
        <result property="field" column="field"/>
        <result property="func" column="func"/>
        <result property="groupByTime" column="group_by_time"/>
        <result property="fill" column="fill"/>
    </resultMap>

    <sql id="selectInfluxCqDictVo">
        select cq_name, from_database, from_retention_policy, from_measurement, to_database, to_retention_policy, to_measurement, for_interval, every, field, func, group_by_time, fill from influx_cq_dict
    </sql>

    <select id="selectInfluxCqDictList" resultMap="InfluxCqDictResult">
        <include refid="selectInfluxCqDictVo"/>
        where is_delete = 0;
    </select>

    <insert id="insertInfluxCqDict" parameterType="com.rexel.backstage.project.tool.init.domain.InfluxCqDict">
        insert into influx_cq_dict
        <trim prefix="(" suffix=")" suffixOverrides=",">
            <if test="cqName != null  and cpName != ''">cq_name,</if>
            <if test="fromDatabase != null  and fromDatabase != ''">from_database,</if>
            <if test="fromRetentionPolicy != null  and fromRetentionPolicy != ''">from_retention_policy,</if>
            <if test="fromMeasurement != null  and fromMeasurement != ''">from_measurement,</if>
            <if test="toDatabase != null  and toDatabase != ''">to_database,</if>
            <if test="toRetentionPolicy != null  and toRetentionPolicy != ''">to_retention_policy,</if>
            <if test="toMeasurement != null  and toMeasurement != ''">to_measurement,</if>
            <if test="for != null  and for != ''">for,</if>
            <if test="every != null  and every != ''">every,</if>
            <if test="field != null  and field != ''">field,</if>
            <if test="func != null  and func != ''">func,</if>
            <if test="groupByTime != null  and groupByTime != ''">group_by_time,</if>
            <if test="fill != null  and fill != ''">fill,</if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides=",">
            <if test="cqName != null  and cqName != ''">#{qpName},</if>
            <if test="fromDatabase != null  and fromDatabase != ''">#{fromDatabase},</if>
            <if test="fromRetentionPolicy != null  and fromRetentionPolicy != ''">#{fromRetentionPolicy},</if>
            <if test="fromMeasurement != null  and fromMeasurement != ''">#{fromMeasurement},</if>
            <if test="toDatabase != null  and toDatabase != ''">#{toDatabase},</if>
            <if test="toRetentionPolicy != null  and toRetentionPolicy != ''">#{toRetentionPolicy},</if>
            <if test="toMeasurement != null  and toMeasurement != ''">#{toMeasurement},</if>
            <if test="for != null  and for != ''">#{for},</if>
            <if test="every != null  and every != ''">#{every},</if>
            <if test="field != null  and field != ''">#{field},</if>
            <if test="func != null  and func != ''">#{func},</if>
            <if test="groupByTime != null  and groupByTime != ''">#{groupByTime},</if>
            <if test="fill != null  and fill != ''">#{fill},</if>
        </trim>
    </insert>

    <delete id="deleteInfluxCqDictByDatabase" parameterType="String">
        delete from influx_cq_dict where from_database = #{fromDatabase}
    </delete>
</mapper>

 

7、InfluxUtils類

package com.rexel.influxdb;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rexel.influxdb.constans.InfluxSql;
import com.rexel.influxdb.query.QueryDeviceMeta;
import com.rexel.influxdb.query.QueryDeviceMetaResult;
import com.rexel.influxdb.query.QueryProductMeta;
import com.rexel.influxdb.query.QueryProductMetaResult;
import com.rexel.utils.times.TimeUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Result;
import org.influxdb.dto.QueryResult.Series;

/**
 * @ClassName InfluxUtils
 * @Description InfluxDB共通類
 * @Author: chunhui.qu
 * @Date: 2020/6/26
 */
@Slf4j
public class InfluxUtils {
    private InfluxDB influxDb;
    private volatile Map<String, JSONObject> productMetaData = new HashMap<>();
    private volatile Map<String, JSONObject> deviceMetaData = new HashMap<>();

    /**
     * 構造函數
     */
    private InfluxUtils() {
        // do nothing
    }

    /**
     * 單例模式
     */
    private static class SingletonInstance {
        private static final InfluxUtils INSTANCE = new InfluxUtils();
    }

    /**
     * 獲取對象句柄
     */
    public static InfluxUtils getInstance() {
        return SingletonInstance.INSTANCE;
    }

    /**
     * 創建InfluxDB連接
     *
     * @return InfluxDB
     */
    public InfluxDB connect() {
        if (influxDb != null) {
            return influxDb;
        }

        Properties properties = getProperties();
        String url = properties.getProperty("influx.url");
        String username = properties.getProperty("influx.username");
        String password = properties.getProperty("influx.password");
        log.info("influx.url=" + url);
        log.info("influx.username=" + username);
        log.info("influx.password=" + password);

        OkHttpClient.Builder client =
            new OkHttpClient.Builder().readTimeout(100, TimeUnit.SECONDS);
        influxDb = InfluxDBFactory.connect(url, username, password, client);

        return influxDb;
    }

    /**
     * 創建database
     *
     * @param database database
     */
    public void createDatabase(String database) {
        connect();
        JSONObject params = new JSONObject();
        params.put("database", database);
        String sql =  formatSql(InfluxSql.CREATE_DATA_BASE, params);
        QueryResult queryResult = influxDb.query(new Query(sql));
        log.info(queryResult.toString());
    }

    /**
     * 刪除database
     *
     * @param database database
     */
    public void dropDatabase(String database) {
        connect();
        JSONObject params = new JSONObject();
        params.put("database", database);
        String sql =  formatSql(InfluxSql.DROP_DATA_BASE, params);
        QueryResult queryResult = influxDb.query(new Query(sql));
        log.info(queryResult.toString());
    }

    /**
     * 創建數據保存策略
     *
     * @param database database
     */
    public void createRetentionPolicy(String database) {
        connect();
        JSONObject params = new JSONObject();
        params.put("database", database);
        String sql =  formatSql(InfluxSql.CREATE_RETENTION_POLICY, params);
        QueryResult queryResult = influxDb.query(new Query(sql));
        log.info(queryResult.toString());
    }

    /**
     * 查詢連續查詢
     *
     * @return 結果
     */
    public JSONArray getContinuousQueries() {
        connect();
        QueryResult queryResult = influxDb.query(new Query(InfluxSql.SHOW_CONTINUOUS_QUERIES));
        return convert(queryResult, false);
    }

    /**
     * 刪除指定連續查詢
     *
     * @param cpName 連續查詢名稱
     * @param database database
     */
    public void dropContinuousQuery(String cpName, String database) {
        connect();
        JSONObject params = new JSONObject();
        params.put("cpName", cpName);
        params.put("database", database);
        String sql =  formatSql(InfluxSql.DROP_CONTINUOUS_QUERY, params);
        QueryResult queryResult = influxDb.query(new Query(sql));
        log.info(queryResult.toString());
    }

    /**
     * 創建連續查詢
     *
     * @param createCqStr 創建語句
     */
    public void createContinuousQuery(String createCqStr) {
        connect();
        QueryResult queryResult = influxDb.query(new Query(createCqStr));
        log.info(queryResult.toString());
    }

    /**
     * 查詢指定measurement的tag key
     *
     * @param database database
     * @param measurement measurement
     * @return tag key列表
     */
    public List<String> getMeasurementTagKeys(String database, String measurement) {
        connect();
        JSONObject params = new JSONObject();
        params.put("database", database);
        params.put("measurement", measurement);
        String sql =  formatSql(InfluxSql.SHOW_TAG_KEYS, params);
        QueryResult queryResult = influxDb.query(new Query(sql));
        JSONArray jsonArray = convert(queryResult, false);

        List<String> tagKeys = new ArrayList<>();
        for (int i = 0; i < jsonArray.size(); i++) {
            JSONObject jsonObject = jsonArray.getJSONObject(i);
            String tagKey = jsonObject.getString("tagKey");
            if (!tagKeys.contains(tagKey)) {
                tagKeys.add(tagKey);
            }
        }
        return tagKeys;
    }

    /**
     * InfluxQL格式化
     *
     * @param sql 原始SQL
     * @param params 參數
     * @return 格式化結果
     */
    public static String formatSql(String sql, JSONObject params) {
        Set<Entry<String, Object>> set = params.entrySet();
        for (Entry<String, Object> entry : set) {
            String param = "{" + entry.getKey() + "}";
            sql = sql.replace(param, String.valueOf(entry.getValue()));
        }
        return sql;
    }

    /**
     * 轉換QueryResult
     *
     * @param queryResult QueryResult
     * @return JSONArray
     */
    public static JSONArray convert(QueryResult queryResult, boolean removeTime) {
        JSONArray jsonArray = new JSONArray();
        List<Result> results = queryResult.getResults();
        for (Result result : results) {
            List<Series> seriesList = result.getSeries();
            if (seriesList == null) {
                continue;
            }
            for (Series series : seriesList) {
                List<List<Object>> valuesList = series.getValues();
                if (valuesList == null) {
                    continue;
                }
                for (List<Object> values : valuesList) {
                    List<String> columns = series.getColumns();
                    JSONObject jsonObject = new JSONObject();
                    for (int i = 0; i < columns.size(); i++) {
                        String column = columns.get(i);
                        if ("time".equals(column)) {
                            if (!removeTime) {
                                jsonObject.put(column, TimeUtils.time8ToDateString(values.get(i).toString()));
                            }
                        } else {
                            Object value = values.get(i);
                            if (value != null) {
                                jsonObject.put(column, value);
                            }
                        }
                    }
                    jsonArray.add(jsonObject);
                }
            }
        }

        return jsonArray;
    }
/**
     * 讀取資源文件
     *
     * @return Properties
     */
    private Properties getProperties() {
        Properties props = new Properties();
        try(InputStream is = InfluxUtils.class
            .getClassLoader().getResourceAsStream("application.properties")) {
            props.load(is);
        } catch (IOException e) {
            log.error("[讀取資源文件異常:]",e);
        }
        return props;
    }
}

 

8、接口地址

http://localhost:9200/rexel/tool/influx/continuousQuery/refresh?type=init
http://localhost:9200/rexel/tool/influx/continuousQuery/refresh?type=drop
http://localhost:9200/rexel/tool/influx/continuousQuery/refresh?type=create

 

9、實現結果

 

==相關配置==

在整個過程中有幾個相關的配置需要注意一下:

1、coordinator

query-timeout = "0s"

不要設置查詢超時時間(因為首次查詢90天的數據,是很有可能超時的,后面按需再設置)

 

2、continuous_queries

enabled = true:開啟連續查詢

log-enabled = true:開啟連續查詢日志

query-stats-enabled = true:將使用有關連續查詢的運行時間及其持續時間的信息來寫入數據_internal

 

==遇到的坑==

【坑1】

發生時間:2020年7月31日

問題描述:查看連續查詢的日志((/var/log/messages)),存在error=timeout的問題,

我在配置文件中已經把query-timeout設置為0了,依然出現這個問題。暫時還不知道原因。。。。很是惆悵。

 

2020年8月3日 追記:

未能在組件本身上找到原因及解決辦法,嘗試着將一個大的連續查詢拆解為多個小的連續查詢之后,問題得以解決。

拆解前:

CREATE CONTINUOUS QUERY cq_device_data_up_sum_6h ON rexel_online RESAMPLE EVERY 1d FOR 1d BEGIN SELECT first(*), last(*), max(*), mean(*), median(*), min(*), spread(*), sum(*) INTO rexel_online_analysis.one_year.device_data_up_sum_6h FROM rexel_online.one_year.device_data_up GROUP BY time(6h), deviceName, event, productKey fill(none) END

拆解后:

CREATE CONTINUOUS QUERY cq_device_data_up_sum_6h ON rexel_online RESAMPLE EVERY 1d FOR 1d BEGIN SELECT sum(*) INTO rexel_online_analysis.one_year.device_data_up_sum_6h FROM rexel_online.one_year.device_data_up GROUP BY time(6h), deviceName, event, productKey fill(none) END

 

--END--


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM