==背景==
數據庫:我們的生產環境中有一個設備運行的數據庫使用的是InfluxDB,這里面存儲了所有設備上報上來的實時運行數據,數據量增速較快。
功能需求:產品有一個叫趨勢分析的功能,用來按照不同的算子(mean、max等),不同的時間段(1分鍾、30分鍾)等對數據進行聚合。
==版本==
1.7.1、單機版
==問題==
經過壓力測試之后,發現當聚合時間選擇1分鍾、5分鍾等細粒度的時間的是偶,聚合的速度非常的慢。
概括一句話:基於原始數據進行實時聚合,不合理
==解決思路==
InfluxDB提供了連續查詢的高級功能,嘗試在每天凌晨的時候將數據聚合好,
官方文檔:https://docs.influxdata.com/influxdb/v1.7/query_language/continuous_queries/
強烈建議把官方文檔從頭到尾瀏覽一遍,是學習一門技術最好的入門方法。
==初次嘗試==
1、創建存儲聚合結果的數據庫
create database rexel_analysis
2、為數據庫創建保存策略
設置數據留存時間為1年(365天)。
create retention policy one_year on rexel_analysis duration 365d replication 1 default
3、創建數據庫權限
出於安全考慮,為數據庫做了ACL權限。
GRANT read ON rexel_analysis TO devread
GRANT write ON rexel_analysis TO devwrite
GRANT all ON rexel_analysis TO devall
4、創建一個連續查詢
CREATE CONTINUOUS QUERY cq_mean_1m ON rexel_private BEGIN SELECT mean(*) INTO rexel_analysis.one_year.data_up_1m FROM rexel_private.one_year.device_data_up GROUP BY time(1m) END
5、查看已有連續查詢
SHOW CONTINUOUS QUERIES
6、查看連續查詢的計算結果
從結果上可以看到,連續查詢按照我預設的每分鍾執行1次,並將結果插入到了另一個數據庫中。
use rexel_analysis show measurements select mean_AI01_0001, mean_AR03_0256 from data_up_1m order by desc tz('Asia/Shanghai')
7、刪除連續查詢
DROP CONTINUOUS QUERY cq_mean_1m ON rexel_private
8、修改連續查詢
根據官網的介紹,創建CQ之后,無法進行更改,如果需要更改需要drop掉之后重新create。
9、查詢連續查詢的日志
待補充
==初次嘗試體驗==
以上是初次嘗試InfluxDB的連續查詢的過程,有幾個體驗:
【好的體驗】
1、可以看到連續查詢會按照指定的時間計划對數據進行聚合,並將結果保存到指定的地方,是一個很好的解決性能的思路。
2、表中的字段有好幾千個,使用帶有通配符(*)的函數和INTO查詢的反向引用語法,可以自動對數據庫中所有度量和數字字段中的數據進行降采樣。
【不好的體驗】
1、每次連續查詢時間間隔很短(時間間隔 = now() - group by time())
2、查詢結果的字段別名比較惡心,比如原來字段叫AI01_0001,因為計算的是mean,結果庫中的字段名就變為了mean_AI01_0001。
==配置采樣頻率與時間范圍==
連續查詢提供了高級語法:RESAMPLE EVERY FOR
CREATE CONTINUOUS QUERY <cq_name> ON <database_name> [RESAMPLE [EVERY <interval>] [FOR <interval>]] BEGIN SELECT <function>(<stuff>)[,<function>(<stuff>)] INTO <different_measurement> FROM <current_measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<stuff>] END
RESAMPLE EVERY :采樣執行頻次。如RESAMPLE EVERY 30m:表示30分鍾執行一次。
RESAMPLE FOR :采樣時間范圍。如RESAMPLE FOR 60m:時間范圍 = now() - for間隔(60m)。
RESAMPLE EVERY 30m FOR 60m:表示每30分鍾執行一次60分鍾內的數據計算。
注意:
如果此時在<cq_query>中使用了GROUP BY time duration,那么FOR定義的duration必須大於或者等於GROUP BY指定的time duration,不然就會報錯。
反過來,如果EVERY定義的duration 大於GROUP BY指定的time duration,那么執行將按照EVERY定義的duration來執行。
例如:如果GROUP BY time(5m)且EVERY間隔為10m,則CQ每十分鍾執行一次
==語句樣例==
每1分鍾執行1次平均值計算,時間范圍1分鍾 CREATE CONTINUOUS QUERY cq_mean_1m ON rexel_private BEGIN SELECT mean(*) INTO rexel_analysis.one_year.data_up_1m FROM rexel_private.one_year.device_data_up GROUP BY time(1m) END 每1分鍾執行1次平均值計算,時間范圍1天 CREATE CONTINUOUS QUERY cq_mean_1m ON rexel_private RESAMPLE FOR 1d BEGIN SELECT mean(*) INTO rexel_analysis.one_year.data_up_1m FROM rexel_private.one_year.device_data_up GROUP BY time(1m) END
==項目實踐==
經過上面一番體驗之后,對連續查詢已經有了基本的了解,那么實際中如何使用呢?
我們的場景:
1、可選的時間組(共8個):1分鍾、5分鍾、30分鍾、1小時、6小時、12小時、1天、1周
2、可選的聚合模式(共8個):最老值(last)、最新值(first)、最大值(max)、最小值(min)、平均值(mean)、中間值(median)、極差值(spread)、累加值(sum)
3、時間范圍:最多3個月
那么,連續查詢策略該如何設計呢?
【方案一】
按照時間組和聚合模式的排列組合創建查詢策略。如下圖所示,這種方案一共需要創建64個連續查詢,感覺有些啰嗦。
【方案二】
按照和時間組創建查詢策略。如下圖所以,每一行的查詢策略是一樣的,各個聚合方法的結果放在同一張表中。
這樣減少了連續查詢的數量,維護也方便了很多。
表中的數據大概是這個樣子的
【方案三】
將方案二工具化,在mysql中創建一個關於influxdb連續查詢的字典表,根據這個表來自動創建連續查詢。(思想:讓機器做的更多)
建表語句及數據如下:
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for influx_cq_dict -- ---------------------------- DROP TABLE IF EXISTS `influx_cq_dict`; CREATE TABLE `influx_cq_dict` ( `cq_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '連續查詢的名稱', `from_database` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '源數據庫', `from_retention_policy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '源存儲策略', `from_measurement` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '源表名', `to_database` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '目標數據庫', `to_retention_policy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '目標存儲策略', `to_measurement` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '目標表名', `for_interval` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '時間間隔', `every` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '執行頻率', `field` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '查詢字段', `func` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '聚合功能', `group_by_time` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GROUP BY指定的time duration', `fill` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '空白填充方式', `is_delete` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0' COMMENT '是否刪除 0,未刪除;1:刪除', PRIMARY KEY (`cq_name`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 146 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'InfluxDB連續查詢字典表' ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of influx_cq_dict -- ---------------------------- INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_12h', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_12h', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '12h', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1d', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1d', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1d', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1h', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1h', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1h', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1m', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1m', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1m', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_1w', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_1w', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '1w', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_30m', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_30m', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '30m', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_5m', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_5m', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '5m', 'none', '0'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_5m_test', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_5m_test', '1h', '5m', 'AI01_0001,AI01_0002', 'last,first,max,min,mean,median,spread,sum', '5m', 'none', '1'); INSERT INTO `influx_cq_dict` VALUES ('cq_device_data_up_6h', 'rexel_online', 'one_year', 'device_data_up', 'rexel_online_analysis', 'one_year', 'device_data_up_6h', '1d', '1d', '*', 'last,first,max,min,mean,median,spread,sum', '6h', 'none', '0'); SET FOREIGN_KEY_CHECKS = 1;
==Java代碼==
1、Controller類
package com.rexel.backstage.project.tool.init.controller; import com.rexel.backstage.project.tool.init.service.IInfluxCqDictService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.rexel.backstage.framework.web.controller.BaseController; import com.rexel.backstage.framework.web.domain.AjaxResult; /** * InfluxDB連續查詢Controller * * @date 2020-07-30 */ @RestController @RequestMapping("/rexel/tool/influx/continuousQuery") public class InfluxCqDictController extends BaseController { private IInfluxCqDictService influxCqDictService; @Autowired public void setInfluxCqDictService(IInfluxCqDictService influxCqDictService) { this.influxCqDictService = influxCqDictService; } /** * 創建InfluxDB連續查詢 */ @PostMapping("/refresh") public AjaxResult refresh(@RequestParam("type") String type) { return AjaxResult.success(influxCqDictService.refreshContinuousQuery(type)); } }
2、Service接口類
package com.rexel.backstage.project.tool.init.service; import com.alibaba.fastjson.JSONObject; /** * InfluxDB連續查詢Service接口 * * @author admin * @date 2020-07-30 */ public interface IInfluxCqDictService { /** * 刷新InfluxDB連續查詢 * * @param type create/drop * @return 結果 */ JSONObject refreshContinuousQuery(String type); }
3、Service實現類
package com.rexel.backstage.project.tool.init.service.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rexel.backstage.project.tool.init.domain.InfluxCqDict; import com.rexel.backstage.project.tool.init.mapper.InfluxCqDictMapper; import com.rexel.backstage.project.tool.init.service.IInfluxCqDictService; import com.rexel.influxdb.InfluxUtils; import com.rexel.influxdb.constans.InfluxSql; import java.util.ArrayList; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * InfluxDB連續查詢Service業務層處理 * * @author admin * @date 2020-07-30 */ @Service @Slf4j public class InfluxCqDictServiceImpl implements IInfluxCqDictService { private InfluxUtils influxUtils = InfluxUtils.getInstance(); private InfluxCqDictMapper influxCqDictMapper; private List<InfluxCqDict> influxCqDictList; private final static String INIT = "init"; private final static String DROP = "drop"; private final static String CREATE = "create"; @Autowired public void setInfluxCqDictMapper(InfluxCqDictMapper influxCqDictMapper) { this.influxCqDictMapper = influxCqDictMapper; } /** * 刷新InfluxDB連續查詢 * * @return 結果 */ @Override public JSONObject refreshContinuousQuery(String type) { influxCqDictList = influxCqDictMapper.selectInfluxCqDictList(); // 首次 if (INIT.toLowerCase().equals(type.toLowerCase())) { recreateDatabase(); dropAllCp(); createAllCp(); } // 刪除 if (DROP.toLowerCase().equals(type.toLowerCase())) { dropAllCp(); } // 創建 if (CREATE.toLowerCase().equals(type.toLowerCase())) { dropAllCp(); createAllCp(); } return new JSONObject(); } /** * 獲取源數據庫列表 * * @return 列表 */ private List<String> getDatabaseFrom() { List<String> result = new ArrayList<>(); for(InfluxCqDict influxCqDict : influxCqDictList) { String database = influxCqDict.getFromDatabase(); if (!result.contains(database)) { result.add(database); } } return result; } /** * 獲取目標數據庫列表 * * @return 列表 */ private List<String> getDatabaseTo() { List<String> result = new ArrayList<>(); for(InfluxCqDict influxCqDict : influxCqDictList) { String database = influxCqDict.getToDatabase(); if (!result.contains(database)) { result.add(database); } } return result; } /** * 重新創建database */ private void recreateDatabase() { List<String> dbList = getDatabaseTo(); for(String database : dbList) { influxUtils.dropDatabase(database); influxUtils.createDatabase(database); influxUtils.createRetentionPolicy(database); } } /** * 刪除所有連續查詢 */ private void dropAllCp() { JSONArray jsonArray = influxUtils.getContinuousQueries(); List<String> dbList = getDatabaseFrom(); for(String database : dbList) { for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); influxUtils.dropContinuousQuery(jsonObject.getString("name"), database); } } } /** * 創建所有連續查詢 */ private void createAllCp() { for(InfluxCqDict influxCqDict : influxCqDictList) { String createCqStr = makeOneCqStr(influxCqDict); influxUtils.createContinuousQuery(createCqStr); } } /** * 生成單個連續查詢語句 * * @param influxCqDict InfluxCqDict * @return 連續查詢語句 */ private String makeOneCqStr(InfluxCqDict influxCqDict) { String every = makeEvery(influxCqDict); String fields = makeFields(influxCqDict); String groupBy = makeGroupBy(influxCqDict); JSONObject paramJson = new JSONObject(); paramJson.put("cqName", influxCqDict.getCqName()); paramJson.put("onDatabase", influxCqDict.getFromDatabase()); paramJson.put("every", every); paramJson.put("forInterval", influxCqDict.getForInterval()); paramJson.put("fields", fields); paramJson.put("toDatabase", influxCqDict.getToDatabase()); paramJson.put("toRetentionPolicy", influxCqDict.getToRetentionPolicy()); paramJson.put("toMeasurement", influxCqDict.getToMeasurement()); paramJson.put("fromDatabase", influxCqDict.getFromDatabase()); paramJson.put("fromRetentionPolicy", influxCqDict.getFromRetentionPolicy()); paramJson.put("fromMeasurement", influxCqDict.getFromMeasurement()); paramJson.put("groupBy", groupBy); paramJson.put("fill", influxCqDict.getFill()); return InfluxUtils.formatSql(InfluxSql.CREATE_CONTINUOUS_QUERY, paramJson); } /** * 生成語句Field字段 * * @param influxCqDict InfluxCqDict * @return Field字段 */ private String makeFields(InfluxCqDict influxCqDict) { String[] fields = influxCqDict.getField().split(","); String[] funcs = influxCqDict.getFunc().split(","); StringBuilder sb = new StringBuilder(); for (String field : fields) { for (String func : funcs) { sb.append(func).append("(").append(field).append("),"); } } return sb.substring(0, sb.length() - 1); } /** * 生成GroupBy字段 * * @param influxCqDict InfluxCqDict * @return GroupBy字段 */ private String makeGroupBy(InfluxCqDict influxCqDict) { List<String> tagKeys = influxUtils.getMeasurementTagKeys( influxCqDict.getFromDatabase(), influxCqDict.getFromMeasurement()); StringBuilder sb = new StringBuilder(); sb.append("time(").append(influxCqDict.getGroupByTime()).append(")"); if (tagKeys.size() > 0) { sb.append(","); } for (String tagKey : tagKeys) { sb.append(tagKey).append(","); } return sb.substring(0, sb.length() - 1); } /** * 生成EVERY字段 * * @param influxCqDict InfluxCqDict * @return EVERY字段 */ private String makeEvery(InfluxCqDict influxCqDict) { String every = influxCqDict.getEvery(); if (every != null && !every.isEmpty()) { return " EVERY " + every; } return ""; } }
4、Domain類
package com.rexel.backstage.project.tool.init.domain; import lombok.Data; /** * InfluxDB連續查詢domain類 * * @author admin * @date 2020-07-30 */ @Data public class InfluxCqDict { /** 連續查詢的名稱 */ private String cqName; /** 源數據庫 */ private String fromDatabase; /** 源存儲策略 */ private String fromRetentionPolicy; /** 源表名 */ private String fromMeasurement; /** 目標數據庫 */ private String toDatabase; /** 目標存儲策略 */ private String toRetentionPolicy; /** 目標表名 */ private String toMeasurement; /** 時間間隔 */ private String forInterval; /** 執行頻率 */ private String every; /** 查詢字段 */ private String field; /** 聚合功能 */ private String func; /** GROUP BY指定的time duration */ private String groupByTime; /** 空白填充方式 */ private String fill; }
5、Mapper類
package com.rexel.backstage.project.tool.init.mapper; import com.rexel.backstage.project.tool.init.domain.InfluxCqDict; import java.util.List; import org.springframework.stereotype.Repository; /** * InfluxDB連續查詢Mapper接口 * * @author admin * @date 2020-07-30 */ @Repository public interface InfluxCqDictMapper { /** * 查詢InfluxDB連續查詢 * * @return InfluxDB連續查詢列表 */ List<InfluxCqDict> selectInfluxCqDictList(); /** * 新增InfluxDB連續查詢 * * @param influxCqDict InfluxDB連續查詢 * @return 結果 */ int insertInfluxCqDict(InfluxCqDict influxCqDict); /** * 刪除InfluxDB連續查詢 * * @param database 源數據庫名 * @return 結果 */ int deleteInfluxCqDictByDatabase(String database); }
6、MyBatis的XML文件
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.rexel.backstage.project.tool.init.mapper.InfluxCqDictMapper"> <resultMap type="com.rexel.backstage.project.tool.init.domain.InfluxCqDict" id="InfluxCqDictResult"> <result property="cqName" column="cq_name"/> <result property="fromDatabase" column="from_database"/> <result property="fromRetentionPolicy" column="from_retention_policy"/> <result property="fromMeasurement" column="from_measurement"/> <result property="toDatabase" column="to_database"/> <result property="toRetentionPolicy" column="to_retention_policy"/> <result property="toMeasurement" column="to_measurement"/> <result property="forInterval" column="for_interval"/> <result property="every" column="every"/> <result property="field" column="field"/> <result property="func" column="func"/> <result property="groupByTime" column="group_by_time"/> <result property="fill" column="fill"/> </resultMap> <sql id="selectInfluxCqDictVo"> select cq_name, from_database, from_retention_policy, from_measurement, to_database, to_retention_policy, to_measurement, for_interval, every, field, func, group_by_time, fill from influx_cq_dict </sql> <select id="selectInfluxCqDictList" resultMap="InfluxCqDictResult"> <include refid="selectInfluxCqDictVo"/> where is_delete = 0; </select> <insert id="insertInfluxCqDict" parameterType="com.rexel.backstage.project.tool.init.domain.InfluxCqDict"> insert into influx_cq_dict <trim prefix="(" suffix=")" suffixOverrides=","> <if test="cqName != null and cpName != ''">cq_name,</if> <if test="fromDatabase != null and fromDatabase != ''">from_database,</if> <if test="fromRetentionPolicy != null and fromRetentionPolicy != ''">from_retention_policy,</if> <if test="fromMeasurement != null and fromMeasurement != ''">from_measurement,</if> <if test="toDatabase != null and toDatabase != ''">to_database,</if> <if test="toRetentionPolicy != null and toRetentionPolicy != ''">to_retention_policy,</if> <if test="toMeasurement != null and toMeasurement != ''">to_measurement,</if> <if test="for != null and for != ''">for,</if> <if test="every != null and every != ''">every,</if> <if test="field != null and field != ''">field,</if> <if test="func != null and func != ''">func,</if> <if test="groupByTime != null and groupByTime != ''">group_by_time,</if> <if test="fill != null and fill != ''">fill,</if> </trim> <trim prefix="values (" suffix=")" suffixOverrides=","> <if test="cqName != null and cqName != ''">#{qpName},</if> <if test="fromDatabase != null and fromDatabase != ''">#{fromDatabase},</if> <if test="fromRetentionPolicy != null and fromRetentionPolicy != ''">#{fromRetentionPolicy},</if> <if test="fromMeasurement != null and fromMeasurement != ''">#{fromMeasurement},</if> <if test="toDatabase != null and toDatabase != ''">#{toDatabase},</if> <if test="toRetentionPolicy != null and toRetentionPolicy != ''">#{toRetentionPolicy},</if> <if test="toMeasurement != null and toMeasurement != ''">#{toMeasurement},</if> <if test="for != null and for != ''">#{for},</if> <if test="every != null and every != ''">#{every},</if> <if test="field != null and field != ''">#{field},</if> <if test="func != null and func != ''">#{func},</if> <if test="groupByTime != null and groupByTime != ''">#{groupByTime},</if> <if test="fill != null and fill != ''">#{fill},</if> </trim> </insert> <delete id="deleteInfluxCqDictByDatabase" parameterType="String"> delete from influx_cq_dict where from_database = #{fromDatabase} </delete> </mapper>
7、InfluxUtils類
package com.rexel.influxdb; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.rexel.influxdb.constans.InfluxSql; import com.rexel.influxdb.query.QueryDeviceMeta; import com.rexel.influxdb.query.QueryDeviceMetaResult; import com.rexel.influxdb.query.QueryProductMeta; import com.rexel.influxdb.query.QueryProductMetaResult; import com.rexel.utils.times.TimeUtils; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult.Result; import org.influxdb.dto.QueryResult.Series; /** * @ClassName InfluxUtils * @Description InfluxDB共通類 * @Author: chunhui.qu * @Date: 2020/6/26 */ @Slf4j public class InfluxUtils { private InfluxDB influxDb; private volatile Map<String, JSONObject> productMetaData = new HashMap<>(); private volatile Map<String, JSONObject> deviceMetaData = new HashMap<>(); /** * 構造函數 */ private InfluxUtils() { // do nothing } /** * 單例模式 */ private static class SingletonInstance { private static final InfluxUtils INSTANCE = new InfluxUtils(); } /** * 獲取對象句柄 */ public static InfluxUtils getInstance() { return SingletonInstance.INSTANCE; } /** * 創建InfluxDB連接 * * @return InfluxDB */ public InfluxDB connect() { if (influxDb != null) { return influxDb; } Properties properties = getProperties(); String url = properties.getProperty("influx.url"); String username = properties.getProperty("influx.username"); String password = properties.getProperty("influx.password"); log.info("influx.url=" + url); log.info("influx.username=" + username); log.info("influx.password=" + password); OkHttpClient.Builder client = new OkHttpClient.Builder().readTimeout(100, TimeUnit.SECONDS); influxDb = InfluxDBFactory.connect(url, username, password, client); return influxDb; } /** * 創建database * * @param database database */ public void createDatabase(String database) { connect(); JSONObject params = new JSONObject(); params.put("database", database); String sql = formatSql(InfluxSql.CREATE_DATA_BASE, params); QueryResult queryResult = influxDb.query(new Query(sql)); log.info(queryResult.toString()); } /** * 刪除database * * @param database database */ public void dropDatabase(String database) { connect(); JSONObject params = new JSONObject(); params.put("database", database); String sql = formatSql(InfluxSql.DROP_DATA_BASE, params); QueryResult queryResult = influxDb.query(new Query(sql)); log.info(queryResult.toString()); } /** * 創建數據保存策略 * * @param database database */ public void createRetentionPolicy(String database) { connect(); JSONObject params = new JSONObject(); params.put("database", database); String sql = formatSql(InfluxSql.CREATE_RETENTION_POLICY, params); QueryResult queryResult = influxDb.query(new Query(sql)); log.info(queryResult.toString()); } /** * 查詢連續查詢 * * @return 結果 */ public JSONArray getContinuousQueries() { connect(); QueryResult queryResult = influxDb.query(new Query(InfluxSql.SHOW_CONTINUOUS_QUERIES)); return convert(queryResult, false); } /** * 刪除指定連續查詢 * * @param cpName 連續查詢名稱 * @param database database */ public void dropContinuousQuery(String cpName, String database) { connect(); JSONObject params = new JSONObject(); params.put("cpName", cpName); params.put("database", database); String sql = formatSql(InfluxSql.DROP_CONTINUOUS_QUERY, params); QueryResult queryResult = influxDb.query(new Query(sql)); log.info(queryResult.toString()); } /** * 創建連續查詢 * * @param createCqStr 創建語句 */ public void createContinuousQuery(String createCqStr) { connect(); QueryResult queryResult = influxDb.query(new Query(createCqStr)); log.info(queryResult.toString()); } /** * 查詢指定measurement的tag key * * @param database database * @param measurement measurement * @return tag key列表 */ public List<String> getMeasurementTagKeys(String database, String measurement) { connect(); JSONObject params = new JSONObject(); params.put("database", database); params.put("measurement", measurement); String sql = formatSql(InfluxSql.SHOW_TAG_KEYS, params); QueryResult queryResult = influxDb.query(new Query(sql)); JSONArray jsonArray = convert(queryResult, false); List<String> tagKeys = new ArrayList<>(); for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); String tagKey = jsonObject.getString("tagKey"); if (!tagKeys.contains(tagKey)) { tagKeys.add(tagKey); } } return tagKeys; } /** * InfluxQL格式化 * * @param sql 原始SQL * @param params 參數 * @return 格式化結果 */ public static String formatSql(String sql, JSONObject params) { Set<Entry<String, Object>> set = params.entrySet(); for (Entry<String, Object> entry : set) { String param = "{" + entry.getKey() + "}"; sql = sql.replace(param, String.valueOf(entry.getValue())); } return sql; } /** * 轉換QueryResult * * @param queryResult QueryResult * @return JSONArray */ public static JSONArray convert(QueryResult queryResult, boolean removeTime) { JSONArray jsonArray = new JSONArray(); List<Result> results = queryResult.getResults(); for (Result result : results) { List<Series> seriesList = result.getSeries(); if (seriesList == null) { continue; } for (Series series : seriesList) { List<List<Object>> valuesList = series.getValues(); if (valuesList == null) { continue; } for (List<Object> values : valuesList) { List<String> columns = series.getColumns(); JSONObject jsonObject = new JSONObject(); for (int i = 0; i < columns.size(); i++) { String column = columns.get(i); if ("time".equals(column)) { if (!removeTime) { jsonObject.put(column, TimeUtils.time8ToDateString(values.get(i).toString())); } } else { Object value = values.get(i); if (value != null) { jsonObject.put(column, value); } } } jsonArray.add(jsonObject); } } } return jsonArray; } /** * 讀取資源文件 * * @return Properties */ private Properties getProperties() { Properties props = new Properties(); try(InputStream is = InfluxUtils.class .getClassLoader().getResourceAsStream("application.properties")) { props.load(is); } catch (IOException e) { log.error("[讀取資源文件異常:]",e); } return props; } }
8、接口地址
http://localhost:9200/rexel/tool/influx/continuousQuery/refresh?type=init http://localhost:9200/rexel/tool/influx/continuousQuery/refresh?type=drop http://localhost:9200/rexel/tool/influx/continuousQuery/refresh?type=create
9、實現結果
==相關配置==
在整個過程中有幾個相關的配置需要注意一下:
1、coordinator
query-timeout = "0s"
不要設置查詢超時時間(因為首次查詢90天的數據,是很有可能超時的,后面按需再設置)
2、continuous_queries
enabled = true:開啟連續查詢
log-enabled = true:開啟連續查詢日志
query-stats-enabled = true:將使用有關連續查詢的運行時間及其持續時間的信息來寫入數據_internal
==遇到的坑==
【坑1】
發生時間:2020年7月31日
問題描述:查看連續查詢的日志((/var/log/messages)),存在error=timeout的問題,
我在配置文件中已經把query-timeout設置為0了,依然出現這個問題。暫時還不知道原因。。。。很是惆悵。
2020年8月3日 追記:
未能在組件本身上找到原因及解決辦法,嘗試着將一個大的連續查詢拆解為多個小的連續查詢之后,問題得以解決。
拆解前:
CREATE CONTINUOUS QUERY cq_device_data_up_sum_6h ON rexel_online RESAMPLE EVERY 1d FOR 1d BEGIN SELECT first(*), last(*), max(*), mean(*), median(*), min(*), spread(*), sum(*) INTO rexel_online_analysis.one_year.device_data_up_sum_6h FROM rexel_online.one_year.device_data_up GROUP BY time(6h), deviceName, event, productKey fill(none) END
拆解后:
CREATE CONTINUOUS QUERY cq_device_data_up_sum_6h ON rexel_online RESAMPLE EVERY 1d FOR 1d BEGIN SELECT sum(*) INTO rexel_online_analysis.one_year.device_data_up_sum_6h FROM rexel_online.one_year.device_data_up GROUP BY time(6h), deviceName, event, productKey fill(none) END
--END--