特點
- 基於時間序列,支持與時間有關的相關函數(如
window()
,mean()
,rate()
等); - 可度量性:你可以實時對大量數據進行計算;
- 無結構(無模式):可以是任意數量的列;
- 支持min, max, sum, count, mean, median 等一系列函數;
- 內置http支持,使用http讀寫;
強大的類SQL語法;函數式語言Flux- 自帶管理界面,方便使用
數據模型
Timestamp
InfluxDB中存儲的所有數據都有一個存儲時間戳的_time
列。在磁盤上,時間戳以納秒格式存儲。InfluxDB格式時間戳以[RFC 3339](https://tools.ietf.org/html/rfc3339)
(如:2020-01-01T00:00:00.00Z
)格式顯示與數據關聯的日期和時間。
Measurement
度量名稱,字符串類型,充當標記、字段和時間戳的容器。
Fields
必需,非索引。
Field key
字段鍵是表示字段名稱的字符串。
Field value
字段值表示關聯字段的值。值的類型為string, float, integer, uInteger, boolean.
Field set
字段集是與時間戳關聯的字段鍵值對的集合。
Tags
可選,索引。
Tag key
索引鍵
Tag value
索引值
Tag set
tag key 和 tag value 組成的 set 集合
Series
Series = Measurement + Tag set + Field key
Point
Point = Series + Field value + Timestamp
Bucket
所有的數據都存儲在一個Bucket中。Bucket結合了Database和Retention period(每個數據點過期的時間)的概念。Bucket 屬於一個organization。
Organization
一組 dashboard,task, bucket 和 users 屬於一個 organization.
物理模型
Write Ahead Log (WAL)
當存儲引擎收到寫入請求時:
- 在WAL文件中追加一條寫入操作
- 數據以
fsync()
寫入磁盤 - 更新內存中的
Cache
- 數據成功寫入磁盤,返回響應
WAL
文件的內容與內存中的Cache
相同,其作用是為了持久化數據(防止數據丟失),當系統崩潰后可以通過WAL
文件恢復還沒有寫入到TSM
文件中的數據。
Cache
Cache是wal文件在內存中的副本。特點如下:
- 按照
Series
組織數據並存儲在其自己的時間順序范圍內 - 存儲未壓縮的數據
- InfluxDB啟動時,會遍歷所有的
WAL
文件,重新構造Cache
,即使系統出現故障,也不會導致數據丟失。 - 插入數據時,是往
Cache
與WAL
中寫入數據,可以認為Cache
是WAL
文件中的數據在內存中的緩存 - 查詢:對存儲引擎的查詢將
Cache
中的數據與TSM
文件中的數據合並。查詢在查詢處理時對從Cache
生成的數據副本執行。寫數據不影響查詢的結果。
Cache
中的數據不是無限增長的,有一個 maxSize 參數(默認上限為25MB)用於控制當Cache
中的數據占用多少內存后就會將數據寫入 TSM
文件。每當 Cache
中的數據達到閥值后,會將當前的 Cache
進行一次快照,之后清空當前Cache
中的內容,再創建一個新的WAL
文件用於寫入,剩下的WAL
文件最后會被刪除,快照中的數據會經過排序寫入一個新的TSM
文件中。
Time-Structured Merge Tree (TSM)
為了有效地壓縮和存儲數據,存儲引擎按Series
對字段值進行分組,然后按時間對這些字段值進行排序。
存儲引擎使用TSM存儲數據,TSM
文件以列格式存儲壓縮的序列數據。為了提高效率,存儲引擎只存儲一系列值之間的差異(或增量)。
單個TSM
file大小最大為2GB,用於存放數據。
Time Series Index (TSI)
一種服務機制,隨着數據增長,保證一定的查詢效率。每隔1秒會檢查一次是否有需要壓縮合並的數據。主要進行兩種操作:
Cache
中的數據大小達到閥值后,進行快照,之后轉存到一個新的TSM
文件中- 合並當前的
TSM
文件,將多個小的TSM
文件合並成一個,使每一個文件盡量達到單個文件的最大大小,減少文件的數量,並且一些數據的刪除操作也是在這個時候完成。
文件結構
Engine path
- /data:存儲TSM文件
- /wal:存儲WAL文件
Blot Path
Boltdb數據庫的存儲路徑,存儲格式是 Go 的key-value,數據是非時間序列的,包括InfluxDB user, dashboard task等等。
Shard & Shard group
Shards
shard包含由Shard group duration定義的給定時間范圍內編碼壓縮的時間序列數據。在指定的shard group duration內,同一 Series 的所有 Point 都存儲在同一個shard中。單個shard包含多個series、磁盤上的一個或多個TSM文件,並且屬於一個Shard group。
Shard group
一個shard group 屬於一個 bucket,包含由shard group duration定義的特定時間范圍的 series 。
Shard group duration
指定每個shard group的時間范圍,並確定創建新shard group的頻率
Shard group diagram
下圖表示一個4天保留時間,shard group持續時間為1天的bucket
Shard life-cycle
Shard precreation
InfluxDB shard precreation服務根據shard組持續時間為每個shard組預先創建具有未來開始和結束時間的shard。precreator服務不會為過去的時間范圍預創建碎片。在回填歷史數據時,InfluxDB會根據需要為過去的時間范圍創建碎片,從而暫時降低寫入吞吐量。
Shard writes
InfluxDB 將時間序列數據寫入未壓縮或 "hot" shard 。當一個 shard 長時間不被寫入時,InfluxDB會壓縮 shard 數據,從而產生 "cold" shard。
通常,InfluxDB會將數據寫入最近的 shard group("hot" shard),但在回填歷史數據時,InfluxDB 會將數據寫入先解壓縮的舊碎片。回填完成后,InfluxDB 重新壓縮舊碎片。
Shard compaction
InfluxDB 有以下四個壓縮級別:
- L1:InfluxDB 將內存Cache中保存的所有新寫入的數據刷新到磁盤
- L2:InfluxDB 通過將 L1 產生的包含相同series的多個塊組合到一個或多個新文件中的較少塊中
- L3:InfluxDB 遍歷 L2 產生壓縮文件塊(超過一定大小),並將包含相同 series 的多個塊組合到新文件中的一個塊中。
- L4:完全壓縮,InfluxDB 對 L3 產生的壓縮文件塊進行遍歷,並將包含相同 series 的多個塊合並到新文件中的一個塊中。
Shard deletion
InfluxDB 的 retention enforcement service 定期檢查早於其存儲桶保留期的 shard group。一旦 shard group 的開始時間超過bucket的保留期,InfluxDB 就會刪除 shard group 以及關聯的 shard 和 TSM 文件。
優化策略
- 控制series的數量;
- 使用批量寫;
- 使用恰當的時間粒度;
- 存儲的時候盡量對 Tag 進行排序;
- 根據數據情況,調整shard的 duration;
- 無關的數據寫不同的 bucket;
- 控制Tag Key與Tag Value值的大小;
- 存儲分離,將 wal 目錄與 data 目錄分別映射到不同的磁盤上,以減少讀寫操作的相互影響。
Influx CLI
Bucket
influx bucket create \
--org dx \
--token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
--name newBucket
influx bucket list \
--token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
--org dx
influx bucket delete \
--org dx \
--token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
--name newBucket
Write
influx write --bucket demo \
--org dx \
--token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
'word_count,word=cc word_count=10'
Query
influx query \
--org dx \
--token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
'from(bucket: "demo")
|> range(start: -1d, stop: -0s)
|> filter(fn: (r) => r["_measurement"] == "world_count")
|> drop(columns: ["_start","_stop"])'
FluxQL
from(bucket: "demo")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "word_count")
// |> window(every: 5m)
// |> mean()
// |> duplicate(column: "_stop", as: "_time")
// |> window(every: inf)
|> aggregateWindow(every:5m, fn: mean, createEmpty: false)
|> yield(name: "mean") // |> sort(columns:["_value"])
// |> group(columns:["_value"])
// |> keep(columns:["_value"])
// |> drop(columns: ["_start","_stop"])
// |> limit(n: 3, offset: 2)
// |> yield(name: "mean")
Delete
influx delete --bucket demo \
--org dx \
--token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
--start '1970-01-01T00:00:00Z' \
--stop $(date +"%Y-%m-%dT%H:%M:%SZ") \
--predicate '_measurement="myMeasurement"'
InfluxDB API
通過HTTP的方式對 InfluxDB 進行管理
寫入
curl --request POST "http://121.5.133.125:8086/api/v2/write?org=dx&bucket=demo&precision=s" \
--header "Authorization: Token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==" \
--data-raw "
word_count,word=dd word_count=3i 1624986861
word_count,word=dd word_count=4i 1624987862
word_count,word=dd word_count=6i 1624988863
word_count,word=dd word_count=2i 1624989864
word_count,word=ee word_count=2i 1624990865
word_count,word=ee word_count=4i 1624991866
"
查詢
curl --location --request POST 'http://121.5.133.125:8086/api/v2/query?org=dx' \
--header 'Authorization: Token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==' \
--header 'Accept: application/csv' \
--header 'Content-type: application/vnd.flux' \
--data-raw 'from(bucket: "demo")
|> range(start: -1d, stop: -0s)
|> filter(fn: (r) => r["_measurement"] == "world_count")'
刪除
curl --location --request POST 'http://121.5.133.125:8086/api/v2/delete/?org=dx&bucket=demo' \
--header 'Authorization: Token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==' \
--header 'Content-Type: application/json' \
--data-raw '{
"start": "2021-06-29T00:00:00Z",
"stop": "2021-07-01T00:00:00Z",
"predicate": "_measurement=word_count AND word=cc"
}'
Java API
依賴
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>2.3.0</version>
</dependency>
應用
public class BaseInfluxDBTest {
protected static InfluxDBClient influxDBClient;
private static char[] token = "2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==".toCharArray();
@BeforeAll
static void open() {
influxDBClient = InfluxDBClientFactory.create("http://121.5.133.125:8086", token, "dx", "demo");
}
@AfterAll
static void close() {
influxDBClient.close();
}
}
Telegraf
Telegraf是一個插件驅動的服務器代理,用於從數據庫、系統和物聯網傳感器收集和發送度量和事件。Telegraf是用Go編寫的,可以編譯成一個沒有外部依賴關系的二進制文件,並且需要非常小的內存占用。
應用:
- 數據庫:連接到MongoDB、MySQL、Redis等數據源,收集和發送度量數據。
- 系統:從雲平台、容器和編排器的現代堆棧中收集度量。
- 物聯網傳感器:從物聯網傳感器和設備收集關鍵狀態數據(壓力水平、溫度水平等)。