InfluxDB 2.0


特點

  • 基於時間序列,支持與時間有關的相關函數(如window()mean(),rate()等);
  • 可度量性:你可以實時對大量數據進行計算;
  • 無結構(無模式):可以是任意數量的列;
  • 支持min, max, sum, count, mean, median 等一系列函數;
  • 內置http支持,使用http讀寫;
  • 強大的類SQL語法;函數式語言Flux
  • 自帶管理界面,方便使用

數據模型

image.png
Timestamp
InfluxDB中存儲的所有數據都有一個存儲時間戳的_time列。在磁盤上,時間戳以納秒格式存儲。InfluxDB格式時間戳以[RFC 3339](https://tools.ietf.org/html/rfc3339)(如:2020-01-01T00:00:00.00Z)格式顯示與數據關聯的日期和時間。
Measurement
度量名稱,字符串類型,充當標記、字段和時間戳的容器。

Fields
必需,非索引。
Field key
字段鍵是表示字段名稱的字符串。
Field value
字段值表示關聯字段的值。值的類型為string, float, integer, uInteger, boolean.
Field set
字段集是與時間戳關聯的字段鍵值對的集合。

Tags
可選,索引。
Tag key
索引鍵
Tag value
索引值
Tag set
tag key 和 tag value 組成的 set 集合

Series
Series = Measurement + Tag set + Field key
Point
Point = Series + Field value + Timestamp
Bucket
所有的數據都存儲在一個Bucket中。Bucket結合了Database和Retention period(每個數據點過期的時間)的概念。Bucket 屬於一個organization。
Organization
一組 dashboard,task, bucket 和 users 屬於一個 organization.

物理模型

Write Ahead Log (WAL)

當存儲引擎收到寫入請求時:

  • 在WAL文件中追加一條寫入操作
  • 數據以fsync()寫入磁盤
  • 更新內存中的Cache
  • 數據成功寫入磁盤,返回響應

WAL文件的內容與內存中的Cache相同,其作用是為了持久化數據(防止數據丟失),當系統崩潰后可以通過WAL文件恢復還沒有寫入到TSM文件中的數據。

Cache

Cache是wal文件在內存中的副本。特點如下:

  • 按照 Series 組織數據並存儲在其自己的時間順序范圍內
  • 存儲未壓縮的數據
  • InfluxDB啟動時,會遍歷所有的WAL文件,重新構造Cache,即使系統出現故障,也不會導致數據丟失。
  • 插入數據時,是往CacheWAL中寫入數據,可以認為CacheWAL文件中的數據在內存中的緩存
  • 查詢:對存儲引擎的查詢將Cache中的數據與TSM文件中的數據合並。查詢在查詢處理時對從Cache生成的數據副本執行。寫數據不影響查詢的結果。

Cache中的數據不是無限增長的,有一個 maxSize 參數(默認上限為25MB)用於控制當Cache中的數據占用多少內存后就會將數據寫入 TSM 文件。每當 Cache 中的數據達到閥值后,會將當前的 Cache 進行一次快照,之后清空當前Cache中的內容,再創建一個新的WAL文件用於寫入,剩下的WAL文件最后會被刪除,快照中的數據會經過排序寫入一個新的TSM文件中。

Time-Structured Merge Tree (TSM)

為了有效地壓縮和存儲數據,存儲引擎按Series對字段值進行分組,然后按時間對這些字段值進行排序。
存儲引擎使用TSM存儲數據,TSM文件以列格式存儲壓縮的序列數據。為了提高效率,存儲引擎只存儲一系列值之間的差異(或增量)。
單個TSM file大小最大為2GB,用於存放數據。

Time Series Index (TSI)

一種服務機制,隨着數據增長,保證一定的查詢效率。每隔1秒會檢查一次是否有需要壓縮合並的數據。主要進行兩種操作:

  • Cache中的數據大小達到閥值后,進行快照,之后轉存到一個新的TSM文件中
  • 合並當前的TSM文件,將多個小的TSM文件合並成一個,使每一個文件盡量達到單個文件的最大大小,減少文件的數量,並且一些數據的刪除操作也是在這個時候完成。

文件結構

image.png

Engine path

  • /data:存儲TSM文件
  • /wal:存儲WAL文件

Blot Path

Boltdb數據庫的存儲路徑,存儲格式是 Go 的key-value,數據是非時間序列的,包括InfluxDB user, dashboard task等等。

Shard & Shard group

Shards

shard包含由Shard group duration定義的給定時間范圍內編碼壓縮的時間序列數據。在指定的shard group duration內,同一 Series 的所有 Point 都存儲在同一個shard中。單個shard包含多個series、磁盤上的一個或多個TSM文件,並且屬於一個Shard group。

Shard group

一個shard group 屬於一個 bucket,包含由shard group duration定義的特定時間范圍的 series 。
Shard group duration
指定每個shard group的時間范圍,並確定創建新shard group的頻率
image.png
Shard group diagram
下圖表示一個4天保留時間,shard group持續時間為1天的bucket
image.png

Shard life-cycle

Shard precreation
InfluxDB shard precreation服務根據shard組持續時間為每個shard組預先創建具有未來開始和結束時間的shard。precreator服務不會為過去的時間范圍預創建碎片。在回填歷史數據時,InfluxDB會根據需要為過去的時間范圍創建碎片,從而暫時降低寫入吞吐量。
Shard writes
InfluxDB 將時間序列數據寫入未壓縮或 "hot" shard 。當一個 shard 長時間不被寫入時,InfluxDB會壓縮 shard 數據,從而產生 "cold" shard。
通常,InfluxDB會將數據寫入最近的 shard group("hot" shard),但在回填歷史數據時,InfluxDB 會將數據寫入先解壓縮的舊碎片。回填完成后,InfluxDB 重新壓縮舊碎片。
Shard compaction
InfluxDB 有以下四個壓縮級別:

  • L1:InfluxDB 將內存Cache中保存的所有新寫入的數據刷新到磁盤
  • L2:InfluxDB 通過將 L1 產生的包含相同series的多個塊組合到一個或多個新文件中的較少塊中
  • L3:InfluxDB 遍歷 L2 產生壓縮文件塊(超過一定大小),並將包含相同 series 的多個塊組合到新文件中的一個塊中。
  • L4:完全壓縮,InfluxDB 對 L3 產生的壓縮文件塊進行遍歷,並將包含相同 series 的多個塊合並到新文件中的一個塊中。

Shard deletion

InfluxDB 的 retention enforcement service 定期檢查早於其存儲桶保留期的 shard group。一旦 shard group 的開始時間超過bucket的保留期,InfluxDB 就會刪除 shard group 以及關聯的 shard 和 TSM 文件。

優化策略

  • 控制series的數量;
  • 使用批量寫;
  • 使用恰當的時間粒度;
  • 存儲的時候盡量對 Tag 進行排序;
  • 根據數據情況,調整shard的 duration;
  • 無關的數據寫不同的 bucket;
  • 控制Tag Key與Tag Value值的大小;
  • 存儲分離,將 wal 目錄與 data 目錄分別映射到不同的磁盤上,以減少讀寫操作的相互影響。

Influx CLI

Bucket

influx bucket create \
  --org dx \
  --token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
  --name newBucket

influx bucket list \
  --token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
  --org dx
  
influx bucket delete \
  --org dx \
  --token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
  --name newBucket

Write

influx write --bucket demo \
  --org dx \
  --token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
  'word_count,word=cc word_count=10'
  

Query

influx query \
  --org dx \
  --token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
  'from(bucket: "demo")
  |> range(start: -1d, stop: -0s)
  |> filter(fn: (r) => r["_measurement"] == "world_count")
  |> drop(columns: ["_start","_stop"])'

FluxQL

from(bucket: "demo")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "word_count")
//   |> window(every: 5m)
//   |> mean()
//   |> duplicate(column: "_stop", as: "_time")
//   |> window(every: inf)
  |> aggregateWindow(every:5m, fn: mean, createEmpty: false)
  |> yield(name: "mean") //   |> sort(columns:["_value"])
//   |> group(columns:["_value"])
//   |> keep(columns:["_value"])
//   |> drop(columns: ["_start","_stop"])
//   |> limit(n: 3, offset: 2)
//   |> yield(name: "mean")

Delete

influx delete --bucket demo \
  --org dx \
  --token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g== \
  --start '1970-01-01T00:00:00Z' \
  --stop $(date +"%Y-%m-%dT%H:%M:%SZ") \
  --predicate '_measurement="myMeasurement"'

InfluxDB API

通過HTTP的方式對 InfluxDB 進行管理

寫入

curl --request POST "http://121.5.133.125:8086/api/v2/write?org=dx&bucket=demo&precision=s" \
  --header "Authorization: Token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==" \
  --data-raw "
word_count,word=dd word_count=3i 1624986861
word_count,word=dd word_count=4i 1624987862
word_count,word=dd word_count=6i 1624988863
word_count,word=dd word_count=2i 1624989864
word_count,word=ee word_count=2i 1624990865
word_count,word=ee word_count=4i 1624991866
"

查詢

curl --location --request POST 'http://121.5.133.125:8086/api/v2/query?org=dx' \
--header 'Authorization: Token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==' \
--header 'Accept: application/csv' \
--header 'Content-type: application/vnd.flux' \
--data-raw 'from(bucket: "demo")
    |> range(start: -1d, stop: -0s)
    |> filter(fn: (r) => r["_measurement"] == "world_count")'

刪除

curl --location --request POST 'http://121.5.133.125:8086/api/v2/delete/?org=dx&bucket=demo' \
--header 'Authorization: Token 2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==' \
--header 'Content-Type: application/json' \
--data-raw '{
    "start": "2021-06-29T00:00:00Z",
    "stop": "2021-07-01T00:00:00Z",
    "predicate": "_measurement=word_count AND word=cc"
  }'

Java API

依賴

<dependency>
  <groupId>com.influxdb</groupId>
  <artifactId>influxdb-client-java</artifactId>
  <version>2.3.0</version>
</dependency>

應用

public class BaseInfluxDBTest {

    protected static InfluxDBClient influxDBClient;

    private static char[] token = "2y7jSDzWL6PfafiQ1g_RrinwaI2SmgvY-Oo2AdBxkS4ULnUrgbvr1G-P05MixgBqt1pINAf3GmOa1mjBfHz39g==".toCharArray();

    @BeforeAll
    static void open() {
        influxDBClient = InfluxDBClientFactory.create("http://121.5.133.125:8086", token, "dx", "demo");
    }

    @AfterAll
    static void close() {
        influxDBClient.close();
    }


}

Telegraf

Telegraf是一個插件驅動的服務器代理,用於從數據庫、系統和物聯網傳感器收集和發送度量和事件。Telegraf是用Go編寫的,可以編譯成一個沒有外部依賴關系的二進制文件,並且需要非常小的內存占用。
APM-Diagram-1.webp
應用:

  • 數據庫:連接到MongoDB、MySQL、Redis等數據源,收集和發送度量數據。
  • 系統:從雲平台、容器和編排器的現代堆棧中收集度量。
  • 物聯網傳感器:從物聯網傳感器和設備收集關鍵狀態數據(壓力水平、溫度水平等)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM