InfluxDB 聚合函數實用案例


文章大綱

![](https://images.cnblogs.com/cnblogs_com/itdragon/1501306/o_191120051650InfluxDB 聚合函數實用案例.png)

InfluxDB 簡介

InfluxDB是GO語言編寫的分布式時間序列化數據庫,非常適合對數據(跟隨時間變化而變化的數據)的跟蹤、監控和分析。在我們的項目中,主要是用來收集設備實時上傳的值。從而分析該設備值的趨勢圖和各個設備的能耗占比等一系列功能。InfluxDB的功能很強大,文檔也很詳細。可美中不足的是,它的單機性能並不是很理想。因為InfluxDB存儲的數據量本身是非常巨大的,在執行一些時間范圍比較大的sql語句,耗時會很長,甚至直接崩潰。而開源的InfluxDB目前已經不再支持集群。若要通過搭建集群提升性能問題,可以考慮企業版。當然,我們寫的程序也有很大的性能優化空間。

能耗趨勢圖分析

需求:統計指定設備、指定區域、指定分項或者指定能耗類型的能耗趨勢圖。如下圖所示,縱坐標是能耗值,橫坐標是時刻(每小時、每天、每周、每月)。

1574150324712

分析:獲取某個區間時刻的值,可以用GROUP BY time 進行時間分組。再用聚合函數LAST或者SUM統計。但這個看似很簡單的需求卻暗藏殺機。SQL語句如下

SELECT LAST("currentValue"), * FROM "$TABLE_NAME" 
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' 
GROUP BY time($timeSpan) 
ORDER BY time DESC

第一:先要清楚,數據是通過什么規則保存到InfluxDB數據庫

為了記錄設備能耗的實時數據,我們會通過訂閱MQTT通道,當值發生變化后存儲到InfluxDB數據庫中,或者在指定時間范圍內沒有變化也會上傳。這樣做的好處可以避免一些冗余數據,同時也埋下了一個坑。

例如:一台設備在InfluxDB數據庫中最后一次記錄的時間是15分鍾前。但是sql語句是從5分鍾前開始統計。這會導致該設備的其點值就是null。簡單來說:設備的存儲的值正好在分組統計的時間范圍外。解決方法有很多:比如用FILL(previous)函數填充;比如使用time(time_interval,offset_interval)進行時間推移等。但是我比較推薦下面的方法:

先獲取指定開始時間之前的最后值(lastValue),然后再根據返回值是否為null,來決定是否替換或者更新lastValue。偽代碼如下。

## 獲取該設備的最后記錄值
val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'"
## 遍歷查詢結果,將currentValue為 null的值替換
"SELECT LAST("currentValue"), * FROM "$TABLE_NAME" 
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' 
GROUP BY time($timeSpan) 
ORDER BY time DESC".forEach {
	lastValue = currentValue?: lastValue
	result[time] = currentValue?: lastValue
}

你以為這樣就結束了嗎?還不夠,返回的time格式化后,你會發現有8小時的時區問題。

第二:解決InfluxDB時區問題

InfluxDB 默認以UTC時間存儲並返回時間戳,查詢返回的時間戳對應的也是UTC時間。我們需要通過tz()子句指定時區名稱,比如Asia/Shanghai。若InfluxDB安裝在Windows環境上,可能還會出現 error parsing query: unable to find time zone 錯誤,解決方法是安裝GO語言環境,文章也詳細介紹過。

SELECT LAST("currentValue"), * FROM "$TABLE_NAME" 
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' 
GROUP BY time($timeSpan) 
ORDER BY time DESC tz('Asia/Shanghai')

實用tz() 子句后,返回的時間格式:"2019-11-18T13:50:00+08:00"。需要通過 "yyyy-MM-dd'T'HH:mm:ss" 將其格式化。

第三:GROUP BY time 自然月

group by time 支持秒、分鍾、小時、天和周,卻唯獨不支持自然月。如果對數據的精准性要求不高,可以考慮使用30d實現。或者分12次統計。或者有更好的方法,請不吝賜教😲!

Spring 整合 InfluxDB

初始化配置

整合分三步:導包、配置、初始化連接

compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP
influx.port=8086
influx.username=admin
influx.password=admin
influx.dbname=database
import org.influxdb.InfluxDB
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.influxdb.dto.Query
import org.influxdb.impl.InfluxDBResultMapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy

@Component
class InfluxDbConnector {

    val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java)

    @Value("\${influx.server}")
    lateinit var serverUrl: String

    @Value("\${influx.port}")
    lateinit var serverPort: String

    @Value("\${influx.db-name}")
    lateinit var dbName: String

    @Value("\${influx.user-name}")
    lateinit var userName: String

    @Value("\${influx.password}")
    lateinit var password: String

    lateinit var connection: InfluxDB
    val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper()

    @PostConstruct
    fun initConnection() {
        val connectionUrl = "$serverUrl:$serverPort"
        connection = InfluxDBFactory.connect(connectionUrl, userName, password)
        connection.setDatabase(dbName)
        connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS)
    }

    @PreDestroy
    fun closeConnection() {
        connection.close()
    }


    fun <T> query(sql: String, type: Class<T>): List<T> {
        logger.info("exec influx query: {}", sql)
        val result = connection.query(Query(sql, dbName))
        return resultMapper.toPOJO(result, type)
    }

    fun query(sql: String) {
        logger.info("exec influx query: {}", sql)
        connection.query(Query(sql, dbName))
    }

    fun save(points: List<Point>) {
        points.forEach { connection.write(it) }
    }
}

存儲和查詢數據

定義實體

import java.time.Instant;

@Measurement(name = "tableName")
public class StringVariableResultJ {

    @Column(name = "currentValue")
    public String value;
    @Column(name = "time")
    public Instant time;

    // ......

}

批量保存數據

val points = equipmentEnergies.map {
    Point.measurement(TABLE_NAME_EQUIPMENT)
    .tag("equipmentId", it.equipmentId)
    .tag("locationId", it.locationId)
    .tag("subItemInstanceId", it.subItemInstanceId)
    .tag("subItemId", it.subItemId)
    .tag("projectId", it.projectId)
    .time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS)
    .addField("currentValue", it.value.toString().toBigDecimalOrNull()).build()
}
influxDbConnector.save(points)

查詢數據

influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }

項目是用kotlin寫的,可是在用InfluxDBResultMapper.toPOJO 時會出現數據轉換異常的問題。若換成Java的實體類就沒有問題。原因目前沒有找到。

刪除數據

我在官網文檔上並沒有找到刪除數據的內容,只有修改數據庫存儲策略。但實際上執行delete sql語句是生效的😂。數據保留策略目的是讓InfluxDB能夠知道哪些數據是可以丟棄的,從而節省空間,更高效的處理數據。默認是不限制。以下是常見的命令。

# 查看庫存儲規則
> SHOW RETENTION POLICIES ON 數據庫名稱;
[out]: 
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 720h0m0s 168h0m0s           1        true
# 修改存儲規則
> ALTER RETENTION POLICY autogen ON 數據庫名稱 DURATION 0;
# 設為默認
> ALTER RETENTION POLICY autogen ON 數據庫名稱 DEFAULT;
#創建規則
> CREATE RETENTION POLICY "規則名" ON 數據庫名稱 DURATION 360h REPLICATION 1;
# 刪除規則
> DROP RETENTION POLICY 規則名 ON 數據庫名稱;

duration 表示在這個時間外的數據將不會被保留,0表示不限制。default 表示是否為默認規則。其它含義沒有深究。

實際場景中,不同表的數據需要保留的時間也不一樣。此時可以考慮用sql語句,用程序定時刪除數據。

influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$時間' ")

查詢性能優化

對於免費版的InfluxDB是不支持集群,並且默認單次查詢結果最大不超過一萬條。考慮到性能問題,一般通道分頁查詢來減輕服務器壓力。但是對於聚合函數的操作,普通的limit 和 offset並不能滿足其需求,我采取的是分時間端查詢,減少每次查詢的時間范圍。獲取下一次查詢時間點方法。

    /**
     * 分時間端查詢,減輕Influxdb服務器壓力
     */
    fun getInfluxNextEndTime(startTime: Instant, timeSpan: String, number: Long = 2): Instant {
        val currentTime = Instant.now()
        val localStartTime = LocalDateTime.ofInstant(startTime, ZoneId.systemDefault())
        val span = timeSpan.substring(timeSpan.length - 1)
        var nextEndTime = when (span) {
            "s", "S" -> {
                localStartTime.plusHours(number).atZone(ZoneOffset.systemDefault()).toInstant()
            }
            "m", "M" -> {
                localStartTime.plusDays(number).atZone(ZoneOffset.systemDefault()).toInstant()
            }
            else -> {
                currentTime
            }
        }
        if (nextEndTime.isAfter(currentTime)) {
            nextEndTime = currentTime
        }

        return nextEndTime
    }

文章到這里就結束了,更多的聚合函數可以看官方文檔:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM