文章大綱

InfluxDB 簡介
InfluxDB是GO語言編寫的分布式時間序列化數據庫,非常適合對數據(跟隨時間變化而變化的數據)的跟蹤、監控和分析。在我們的項目中,主要是用來收集設備實時上傳的值。從而分析該設備值的趨勢圖和各個設備的能耗占比等一系列功能。InfluxDB的功能很強大,文檔也很詳細。可美中不足的是,它的單機性能並不是很理想。因為InfluxDB存儲的數據量本身是非常巨大的,在執行一些時間范圍比較大的sql語句,耗時會很長,甚至直接崩潰。而開源的InfluxDB目前已經不再支持集群。若要通過搭建集群提升性能問題,可以考慮企業版。當然,我們寫的程序也有很大的性能優化空間。
能耗趨勢圖分析
需求:統計指定設備、指定區域、指定分項或者指定能耗類型的能耗趨勢圖。如下圖所示,縱坐標是能耗值,橫坐標是時刻(每小時、每天、每周、每月)。
分析:獲取某個區間時刻的值,可以用GROUP BY time 進行時間分組。再用聚合函數LAST或者SUM統計。但這個看似很簡單的需求卻暗藏殺機。SQL語句如下
SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC
第一:先要清楚,數據是通過什么規則保存到InfluxDB數據庫
為了記錄設備能耗的實時數據,我們會通過訂閱MQTT通道,當值發生變化后存儲到InfluxDB數據庫中,或者在指定時間范圍內沒有變化也會上傳。這樣做的好處可以避免一些冗余數據,同時也埋下了一個坑。
例如:一台設備在InfluxDB數據庫中最后一次記錄的時間是15分鍾前。但是sql語句是從5分鍾前開始統計。這會導致該設備的其點值就是null。簡單來說:設備的存儲的值正好在分組統計的時間范圍外。解決方法有很多:比如用FILL(previous)函數填充;比如使用time(time_interval,offset_interval)進行時間推移等。但是我比較推薦下面的方法:
先獲取指定開始時間之前的最后值(lastValue),然后再根據返回值是否為null,來決定是否替換或者更新lastValue。偽代碼如下。
## 獲取該設備的最后記錄值
val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'"
## 遍歷查詢結果,將currentValue為 null的值替換
"SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC".forEach {
lastValue = currentValue?: lastValue
result[time] = currentValue?: lastValue
}
你以為這樣就結束了嗎?還不夠,返回的time格式化后,你會發現有8小時的時區問題。
第二:解決InfluxDB時區問題
InfluxDB 默認以UTC時間存儲並返回時間戳,查詢返回的時間戳對應的也是UTC時間。我們需要通過tz()子句指定時區名稱,比如Asia/Shanghai。若InfluxDB安裝在Windows環境上,可能還會出現 error parsing query: unable to find time zone 錯誤,解決方法是安裝GO語言環境,文章也詳細介紹過。
SELECT LAST("currentValue"), * FROM "$TABLE_NAME"
WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id'
GROUP BY time($timeSpan)
ORDER BY time DESC tz('Asia/Shanghai')
實用tz() 子句后,返回的時間格式:"2019-11-18T13:50:00+08:00"。需要通過 "yyyy-MM-dd'T'HH:mm:ss" 將其格式化。
第三:GROUP BY time 自然月
group by time 支持秒、分鍾、小時、天和周,卻唯獨不支持自然月。如果對數據的精准性要求不高,可以考慮使用30d實現。或者分12次統計。或者有更好的方法,請不吝賜教😲!
Spring 整合 InfluxDB
初始化配置
整合分三步:導包、配置、初始化連接
compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP
influx.port=8086
influx.username=admin
influx.password=admin
influx.dbname=database
import org.influxdb.InfluxDB
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.influxdb.dto.Query
import org.influxdb.impl.InfluxDBResultMapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy
@Component
class InfluxDbConnector {
val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java)
@Value("\${influx.server}")
lateinit var serverUrl: String
@Value("\${influx.port}")
lateinit var serverPort: String
@Value("\${influx.db-name}")
lateinit var dbName: String
@Value("\${influx.user-name}")
lateinit var userName: String
@Value("\${influx.password}")
lateinit var password: String
lateinit var connection: InfluxDB
val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper()
@PostConstruct
fun initConnection() {
val connectionUrl = "$serverUrl:$serverPort"
connection = InfluxDBFactory.connect(connectionUrl, userName, password)
connection.setDatabase(dbName)
connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS)
}
@PreDestroy
fun closeConnection() {
connection.close()
}
fun <T> query(sql: String, type: Class<T>): List<T> {
logger.info("exec influx query: {}", sql)
val result = connection.query(Query(sql, dbName))
return resultMapper.toPOJO(result, type)
}
fun query(sql: String) {
logger.info("exec influx query: {}", sql)
connection.query(Query(sql, dbName))
}
fun save(points: List<Point>) {
points.forEach { connection.write(it) }
}
}
存儲和查詢數據
定義實體
import java.time.Instant;
@Measurement(name = "tableName")
public class StringVariableResultJ {
@Column(name = "currentValue")
public String value;
@Column(name = "time")
public Instant time;
// ......
}
批量保存數據
val points = equipmentEnergies.map {
Point.measurement(TABLE_NAME_EQUIPMENT)
.tag("equipmentId", it.equipmentId)
.tag("locationId", it.locationId)
.tag("subItemInstanceId", it.subItemInstanceId)
.tag("subItemId", it.subItemId)
.tag("projectId", it.projectId)
.time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS)
.addField("currentValue", it.value.toString().toBigDecimalOrNull()).build()
}
influxDbConnector.save(points)
查詢數據
influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }
項目是用kotlin寫的,可是在用InfluxDBResultMapper.toPOJO 時會出現數據轉換異常的問題。若換成Java的實體類就沒有問題。原因目前沒有找到。
刪除數據
我在官網文檔上並沒有找到刪除數據的內容,只有修改數據庫存儲策略。但實際上執行delete sql語句是生效的😂。數據保留策略目的是讓InfluxDB能夠知道哪些數據是可以丟棄的,從而節省空間,更高效的處理數據。默認是不限制。以下是常見的命令。
# 查看庫存儲規則
> SHOW RETENTION POLICIES ON 數據庫名稱;
[out]:
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 720h0m0s 168h0m0s 1 true
# 修改存儲規則
> ALTER RETENTION POLICY autogen ON 數據庫名稱 DURATION 0;
# 設為默認
> ALTER RETENTION POLICY autogen ON 數據庫名稱 DEFAULT;
#創建規則
> CREATE RETENTION POLICY "規則名" ON 數據庫名稱 DURATION 360h REPLICATION 1;
# 刪除規則
> DROP RETENTION POLICY 規則名 ON 數據庫名稱;
duration 表示在這個時間外的數據將不會被保留,0表示不限制。default 表示是否為默認規則。其它含義沒有深究。
實際場景中,不同表的數據需要保留的時間也不一樣。此時可以考慮用sql語句,用程序定時刪除數據。
influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$時間' ")
查詢性能優化
對於免費版的InfluxDB是不支持集群,並且默認單次查詢結果最大不超過一萬條。考慮到性能問題,一般通道分頁查詢來減輕服務器壓力。但是對於聚合函數的操作,普通的limit 和 offset並不能滿足其需求,我采取的是分時間端查詢,減少每次查詢的時間范圍。獲取下一次查詢時間點方法。
/**
* 分時間端查詢,減輕Influxdb服務器壓力
*/
fun getInfluxNextEndTime(startTime: Instant, timeSpan: String, number: Long = 2): Instant {
val currentTime = Instant.now()
val localStartTime = LocalDateTime.ofInstant(startTime, ZoneId.systemDefault())
val span = timeSpan.substring(timeSpan.length - 1)
var nextEndTime = when (span) {
"s", "S" -> {
localStartTime.plusHours(number).atZone(ZoneOffset.systemDefault()).toInstant()
}
"m", "M" -> {
localStartTime.plusDays(number).atZone(ZoneOffset.systemDefault()).toInstant()
}
else -> {
currentTime
}
}
if (nextEndTime.isAfter(currentTime)) {
nextEndTime = currentTime
}
return nextEndTime
}
文章到這里就結束了,更多的聚合函數可以看官方文檔:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/