簡析時序數據庫 InfluxDB


時序數據基礎

時序數據特點

 時序數據TimeSeries是一連串隨時間推移而發生變化的相關事件。

 以下圖的 CPU 監控數據為例,同個 IP 的相關監控數據組成了一條時序數據,不相關數據則分布在不同的時間序列上。

 常見時序數據有:

  • 監控日志:機器的 CPU 負載變化
  • 用戶行為:用戶在電商網站上的訪問記錄
  • 金融行情:股票的日內成交記錄

 這類數據具有以下特點:

  • 必然帶有時間戳,可能存在時效性
  • 數據量巨大,並且生成速度極快
  • 更關注數據變化的趨勢,而非數據本身

關系型數據庫的不足

 當面對時序數據時,傳統的關系型數據庫就顯得有些力不從心。

關系型數據庫模型 時序數據庫需求
數據按主鍵索引組織、存儲 數據按時間戳進行組織、存儲,便於按時間維度查詢
數據持久化后永久存在 數據具有的生命周期,定期清理過期數據
支持復雜的 OLTP 功能(點查、改、刪) 支持的 OLAP 操作(基於時間窗口)
並發修改加鎖,提供事務保證 Last Write Win 解決寫沖突,無需事務

 兩者之間的存在的沖突:

  • 如果主鍵設計的不好,時序數據的順序插入可能變為隨機寫入,影響寫入性能
  • 傳統關系型數據為提高數據生命周期管理功能,需要定時執行清理任務,避免磁盤空間耗盡
  • 關系型數據庫對事務的支持,對於時序數據來說略顯多余,還會影響寫入能力

 除此之外,關系型數據庫還存在以下天然短板:

  • 需要通過分表分庫sharding實現橫向擴展
分庫分表引入額外復雜度,需要維護映射關系。 此時 SQL 語言的查詢優勢不復存在,多數查詢都會退化為 KV 查找
  • 寫時模式schema on write靈活度不足
關系數據庫新增字段屬於 DDL 操作,會導致鎖表鎖庫。 頻繁的庫表變更影響系統穩定,無法適應快速的業務變更。

Redis 的不足

 存儲時序數據庫的另一個挑戰就是其誇張的數據生成速度。以用戶行為數據為例,如果一個接口的QPS是1萬。就意味着一秒鍾內會生成1萬條用戶行為記錄。假設這樣的接口有100個,那么每秒鍾生成的記錄數可達100萬。

 一種解決方式是將消息積壓至 Kafka 這類中間件,然后進行異步處理。但對於某些業務場景來說,這一處理方式則顯得不合時宜。以股票成交數據為例,為了保障行情的時效性,無法采用異步批處理的方式實現。為了實現極高的寫入吞吐量,通常會考慮使用 Redis 實現這一功能。

 然而這一方案也存在以下問題:

  • redis 不適合存儲大 Key,刪除 Key 的內存釋放操作可能導致長時間的阻塞
  • 假設數據以 list 的形式存儲,執行 lrange 命令的時間復雜度為O(S+N),訪問性能較差
  • 內存空間有限,無法存儲大量的數據

時序數據庫

 時序數據庫是一類專門用於存儲時序數據的數據管理系統,這類數據庫的設計思想大致可以總結為下面幾條:

  • 使用特殊設計的外存索引來組織數據
  • 強制使用 timestamp 作為唯一的主鍵
  • 不檢查寫沖突,避免加鎖,提高寫入性能
  • 對按時間順序寫入進行優化,提高寫入性能
  • 不支持細粒度的數據刪除功能,提高查詢寫入性能
  • 犧牲強一致性來提高系統的查詢吞吐量,提高查詢性能
  • 提供基於時間窗口的 OLAP 操作,放棄關聯查詢等高級功能
  • 通過無模式schemaless設計使系統更易於水平擴展

時序數據模型

 類似於關系型數據庫,時序數據庫也有自己的數據模型,並且兩者直接存在不少相似之處:

關系模型 時序模型 含義
table metric / measurement 表 → 指標(時間序列集合)
column value / field 列 → 值(無索引列)
index tag 索引 → 標簽(索引列)
row point 記錄行 → 數據點(時間序列中某個時刻的數據)
primary key timestamp 行主鍵 → 點時間戳(時間序列內唯一標識)

 其中 tag 的概念較為重要:

  • tag 是一個字符串類型的鍵值對
  • tag 並不是時序數據,而是元數據
  • tag 的唯一組合確定一個時間序列
  • tag 可以方便實現粗粒度的聚合
  • tag 決定了索引的組織形式
  • tag 組合的數量數量不宜過多

常見的時序數據庫

 時序數據庫排行榜中介紹了不少的時序數據庫,其中比較具有代表性的有以下兩款。

OpenTSDB

 OpenTSDB 是一種基於 HBase 來構建的分布式、可擴展的時間序列數據庫。OpenTSDB 被廣泛應用於存儲、索引和服務從大規模計算機系統(網絡設備、操作系統、應用程序)采集來的監控指標數據,並且使這些數據易於訪問和可視化。

 OpenTSDB 由時間序列守護程序 (TSD) 以及一組命令行實用程序組成。每個 TSD 都是獨立的。 沒有主節點,沒有共享狀態。

  • 優點:
    TSD 是無狀態的,所有狀態數據保存在 HBase 中,天然支持水平擴展

  • 缺點:

    • Hadoop 全家桶運維難度較大,需要專人負責

      • 新版本的 OpenTSDB 底層支持 Cassandra
    • 存儲模型過於簡化

      • 單值模型且只能存儲數值類型數據
      • 單個 metric 最多支持 8 個 tag key
    • 雖然利用了 HBase 作為底層存儲,但是沒有提供對 MapReduce 的支持。

InfluxDB

 時序數據庫 InfluxDB 是一款專門處理高寫入和查詢負載的時序數據庫,基於 InfluxDB 能夠快速構建具有海量時序數據處理能力的分析和監控軟件。
 該項目的發起者是 influxdata 公司,該公司提供了一套用於處理時序數據的完整解決方案,InfluxDB 是該解決方案中的核心產品。

  • 優點:

    • 開箱即用,運維簡單
    • 多值存儲模型、豐富的數據類型
    • 提供了類 SQL 的查詢語言
    • 獨創 TSM 索引
  • 缺點:

    • 開源版本不支持集群部署,對於大規模應用來說,使用前需要慎重考慮

深入InfluxDB

數據模型

 InfluxDB 的數據模型已經很接近傳統的關系模型:

database 命名空間,相互隔離
retention policy 保存策略,定義數據生命周期
bucket database + retention policy
measurement 相關時間序列集合
tag 標簽索引,索引列
field 時序數據,無索引列
point 數據點,時間序列中某個時刻的數據
time 時間戳,時間序列內唯一標識

 保留策略retention policy 用於管理數據生命周期,其中包含了:

  • 持續時間duration:指定了數據保留時間,過期的數據將自動從數據庫中刪除

  • 副本個數replication factor:指定了集群模式下,需要維護數據副本的個數(僅在集群模式下有效)

  • 分片粒度hard duration):指定了 shard group 的時間跨度(影響數據分片大小)

 保留策略與 database 存在以下關系:

  • 一個 database 可以有多個 RP,每個 RP 只屬於一個 database 1:N
  • 創建 point 時可以指定 RP,一個 measurement 可以有不同的 RP N:N

 這意味着:

同個 measurement 可能存在兩個有着完全相同的 time 的 point。為了解決數據重復的問題,InfluxDB 2 引入了一個 bucket 的概念,用於避免這一情況。

Series

時間序列 Series 在 InfluxDB 中也是個核心概念:

series key measurement + tag + field key
series 時間序列,serial key 相同的數據集合
series cardinality 序列基數,series key 的數量

 為了唯一標識一個時間序列,InfluxDB 引入了 Serieskey 的概念:

每個數據點都有 SerieskeySerieskey 相同的數據點屬於同個時間序列,會存儲在同一個文件中,方便后續查詢

 Serieskey 的數量稱為序列基數 series cardinality

序列基數是一個重要的性能指標,InfluxDB 會為每個 Serieskey 在內存中維護一個索引記錄,因此序列基數能夠直觀反映當前數據的內存壓力

 上圖的 series cardinality 為 4,其中包含以下 series key:

measurement + tags + field
census, location=1, scientist=langstroth, butterflier
census, location=1, scientist=langstroth, honeybees
census, location=1, scientist=perpetual, butterflier
census, location=1, scientist=perpetual, honeybees

 注意:即便兩條記錄的 measurement、time、tag、field 完全一致,但只要使用的是不同的 RP,那么它們就屬於不同的 series,會被存儲在不同的 bucket 中。


查詢語言

InfluxDB 提供了兩種查詢語言:

  • InfluxQL:類 SQL 的聲明式查詢語言,同時具有 DDL 與 DML 功能
  • Flux:函數式查詢語言,僅支持 DML 功能,能支持復雜的查詢條件,但不支持增刪改操作

下面通過一些實際操作來熟悉一下 InfluxQL:

# 創建數據庫
CREATE DATABASE "sample_data"
USE sample_data
 
# 插入樣例數據,格式參考:https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial
INSERT census,location=1,scientist=langstroth butterflies=12i,honeybees=23i 1439827200000000000
INSERT census,location=1,scientist=perpetua butterflies=1i,honeybees=30i 1439827200000000000
INSERT census,location=1,scientist=langstroth butterflies=11i,honeybees=28i 1439827560000000000
INSERT census,location=1,scientist=perpetua butterflies=3i,honeybees=28i 1439827560000000000
INSERT census,location=2,scientist=langstroth butterflies=2i,honeybees=11i 1439848440000000000
INSERT census,location=2,scientist=langstroth butterflies=1i,honeybees=10i 1439848800000000000
INSERT census,location=2,scientist=perpetua butterflies=8i,honeybees=23i 1439849160000000000
INSERT census,location=2,scientist=perpetua butterflies=7i,honeybees=22i 1439849520000000000

# 顯示數據庫中的表
# measurement 無需預先定義,由 InfluxDB 動態創建
SHOW MEASUREMENTS
 
# 顯示數據庫中的 field key
SHOW FIELD KEYS
 
 
# 顯示數據庫中的 tag key
SHOW TAG KEYS
 
# 顯示數據庫中的 tag value
SHOW TAG VALUES WITH KEY = scientist
 
# 查詢所有數據
SELECT * FROM census;
 
# 對 location = 1 的數據求和
SELECT SUM(butterflies) AS butterflies, SUM(honeybees) AS honeybees FROM census WHERE location = '1';
 
# 刪除 location = 1 的數據
DELETE FROM census WHERE location = '1';
SELECT * FROM census;
 
# 更新特定數據
SELECT * FROM census;
INSERT census,location=2,scientist=perpetua butterflies=10i,honeybees=50i 1439849520000000000
SELECT * FROM census;

# 更新數據時要保證數據類型一致,否則會報錯
INSERT census,location=2,scientist=perpetua butterflies=abc,honeybees=efg 1439849520000000000
 
# 刪除數據庫
DROP DATABASE sample_data;

Flux 無命令行支持,只能通過 http 接口請求。有興趣可以參考下面的腳本,動手嘗試一下:

curl -XPOST 127.0.0.1:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-H 'Authorization: Token root:123456' \
-d '
from(bucket:"sample_data")
|> range(start:time(v: 1439827200000), stop:time(v: 143984952000))
|> filter(fn:(r) => r._measurement == "census" and r.location == "1" and (r._field == "honeybees" or r._field == "butterflies"))
|> limit(n: 100)'

整體架構

在了解完查詢語言之后,接下來看看 InfluxDB 的整體架構:

 上圖將 InfluxDB 分為了 4 層,上面 database 與 RP 這兩層之前已經介紹過,我們重點關注下面兩層:

shard 存儲的時序數據的磁盤文件
shard group shard 容器,責管理數據的生命周期,清除過期的數據
shard duration shard duration

 由於時序數據的數據量通常十分巨大,因此 InfluxDB 的設計中引入了分片的策略。並且同時采用了兩種分片策略:

  • shard group 層采用了基於時間的分片策略,方便實現按照時間條件范圍查詢
  • shard 層則是基於 hashmod 進行分片,避免出現寫熱點產生性能瓶頸

 每個 shard 由 WAL、Cache、TSM文件 3 部分組成:

 整個數據的寫入流程簡化為 3 個步驟:

  1. 先寫入 WAL
  2. 然后寫入 Cache
  3. 最終持久化為 TSM File

WAL

 預寫日志Write-Ahead-Log是一種常見的提高數據庫優化手段,能夠在保證數據安全的同時,提升系統的寫入性能。
 InfluxDB WAL 由一組定長的 segement 文件構成,每個文件大小約為 10MB。這些 segment 文件只允許追加,不允許修改。

Cache

 Cache 是 WAL 的一個內存快照,保證 WAL 中的數據對用戶實時可見。
 當 Cache 空閑或者過滿時,對應的 WAL 將被壓縮並轉換為 TSM,最終釋放內存空間。
 每次重啟時會根據 WAL 重新構造 Cache。

TSM File

 TSM 是一組存儲在磁盤上的外存索引文件,細節將在后續進行介紹。

 它們之間的關系可以簡單描述為:

  • Cache = WAL
  • Cache + TSM = 完整的數據

存儲引擎發展史

 在討論 TSM 之前,線回顧一下 InfluxDB 存儲引擎的發展歷程:

LSM tree 時代 (0.8.x)

  • 引擎:LevelDB、RocksDB、HyperLevelDB、LMDB
  • 優點:極高的寫入吞吐量,且支持數據壓縮
  • 缺點:
    • 懶刪除機制導致刪除操作耗時,且過期數據無法及時清理
    • 按照時間維度分庫可以規避上述問題,但是又會導致新問題:
      • 單個進程打開過多的文件導致句柄耗盡
      • 過多的 WAL 可能會令順序追加退化為隨機寫入

B+Tree 時代 (0.9.x)

  • 引擎:BoltDB
  • 優點:單文件模型,穩定性更好,Go 實現更易嵌入
  • 缺點:
    • 文件較大時,寫放大效應會導致 IOPS 會極劇上升
    • 通過在 Bolt 前嵌入自研的 WAL 模塊緩解了這一問題:
      • 合並多個相鄰的寫入操作,減少 fsync
      • 將隨機寫入變為順序追加,減少寫放大

TSM-Tree 時代 (0.9.5)

  • 引擎:Time Structured Merge Tree
  • 特點:
    • 整體實現借鑒 LSM tree 架構,能有效規避寫放大
    • 更少的數據庫文件,避免順序寫退化為隨機寫,不再出現文件句柄耗盡的情況
    • 針對時序數據特性,采用了更具針對性的數據壓縮算法

 關於 LSM-Tree 與 B-Tree 的寫放大分析,可以參考這篇文章:https://www.cnblogs.com/buttercup/p/12991585.html


TSM 解析

數據組織

 TSM 是一個列存引擎columnar storage,內部按照 SeriesKey 對時序數據進行組織:

  • 每個 SeriesKey 對應一個數組,里面存儲着 time,value 構成的時間點數據
  • 同個 SeriesKey 的數據存儲在一起,不同的 SeriesKey 的數據分開存儲

 列存引擎的優勢

  • 高效處理動態 schema 與稀疏數據

    新增列時,對現有的數據無影響。並且由於不同列相互分離,可以直接忽略 null 值,不需要耗費空間存儲標記

  • 同類型的數據能夠進行高效的壓縮

    同個列的數據必然具有相同的數據類型,可以采取不同的壓縮手段進行優化。

  • 查詢時能減少不必要的 I/O

    查詢時能夠指定要返回的數據列,可以按需遍歷用戶指定的列,對於 OLAP 操作更友好。


 列存引擎的劣勢

  • 存儲稠密數據需要付出額外代價

    當多個列具有相同的時間戳時,timestamp 會被重復存儲。

  • 數據變更操作需要更多的 I/O

    列分開存儲后,查、改、刪操作可能要同時修改多個文件。

  • 無法提供原子性操作,事務實現困難

    無法實現高效的悲觀鎖,如果使用場景中需要用到事務,建議使用行存引擎。


文件格式

 TSM File 是一組列存格式的只讀文件,每個文件對應一組特定的 SeriesKey。

 每個文件由 4 部分構成:

  • Header:幻數 + 版本號
  • DataBlock:時序數據塊
  • IndexBlock:時序數據索引
  • Footer:索引塊指針

 Block 與 Series 的對應關系:

  • 每個 IndexBlock 只屬於一個 Series
  • 每個 DataBlock 只保存一個 Field 的數據

 DataBlock 的結構較為簡單,其中存儲了壓縮過的時序數據:

DataBlock
Type 數據類型
Length 時間戳數據長度
Timestamps 時間戳列表(壓縮后)
Values 的時序數據列表(壓縮后)

 IndexBlock 則較為復雜,每個 IndexBlock 由一個 Meta 和多個 Entry 構成:

IndexBlock
Index Meta 索引信息
Index Entry 索引記錄列表
IndexMeta
Series Key 索引所屬的 Series
Index Entry Count 索引記錄數量
IndexEntry
Min Time 數據開始時間
Max Time 數據結束時間
Offset 數據塊的起始位置
Size 數據塊的長度

 Meta 中存儲了 IndexBlock 對應的 SeriesKey 以及對應的 Entry 數量。
 每個 Entry 對應一個 DataBlock,描述了這個 DataBlock 對應的時間區間,以及實際的存儲地址。
 當需要查找 TSM 中的數據時,只需要將 IndexBlock 加載到內存中。就可以定位到相應的數據,提高查詢效率。


壓縮算法

 InfluxDB 中的數據類型可以分為五種 timestampfloat, int, bool, string。為了達到最優的壓縮效果,InfluxDB 針對不同類型的數據,使用了不同的壓縮算法。不過這些壓縮算法的原理都大同小異:使用變長編碼來保存數據有效位,避免存儲無效的 0 bit。

timestamp

 時序數據都是按照時間順序進行排序的,因此首先會使用 delta-delta 編碼精簡數據:

相鄰的兩個時間戳相減,減少了數據的有效位長度,有利於后續的壓縮

 若時間按固定區間分布,優先使用游程編碼run-length encoding進行壓縮:

如果時間戳間隔固定,則使用兩個 64bit 數據可以編碼 264個時間戳

 若編碼后所有值均小於260,則使用simple8b編碼,將多個值打包進單個 64bit 整數中:

simple8b 將 64 位整數分為兩部分:
  • selector(4bit) 用於指定剩余 60bit 中存儲的整數的個數與有效位長度
  • payload(60bit) 則是用於存儲多個定長的整數

根據一個查找表,將數據模式匹配到最優的 selector,然后將多個數據編碼至 payload

 如果無法不滿足以上壓縮條件,則直接存儲原始數據。

float

 Facebook 工程師通過觀察時序數據,發現相鄰時序數據進行異或操作后,僅有中間一小部分發生了變化。
 根據這個規律,發明了一個簡單高效的浮點數壓縮算法:先異或求值,然后存儲中間的有效數據。
 通過這一算法,他們將浮點數據的平均存儲空間壓縮至 1.37 字節。

 
 算法過程可以參考這篇論文,或者直接參考下面的實現:

@Data
@Accessors(fluent = true)
@ToString
static class Block {
    int leadingZero;
    int tailingZero;
    int blockSize;
    long value;
 
    boolean valueOf(int i) {
        Validate.isTrue(i < blockSize);
        return ((value >>> (blockSize-1-i)) & 0x1) > 0;
    }
 
    boolean fallInSameBlock(Block block) {
        return block != null && block.leadingZero == leadingZero && block.tailingZero == tailingZero;
    }
}
 
static Block calcBlock(double x, double y) {
    long a = Double.doubleToRawLongBits(x);
    long b = Double.doubleToRawLongBits(y);
    long xor = a ^ b;
 
    Block block = new Block().
            leadingZero(Long.numberOfLeadingZeros(xor)).
            tailingZero(Long.numberOfTrailingZeros(xor));
 
    return block.value(xor >>> block.tailingZero()).
           blockSize(block.value() == 0 ? 0 : 64 - block.leadingZero() - block.tailingZero());
}
 
static Pair<Long, Pair<Integer, byte[]>> encode(double[] values) {
    int offset = 0;
    BitSet buffer = new BitSet();
 
    boolean ctrlBit;
    double previous = values[0];
    Block prevBlock = null;
    for (int n=1; n<values.length; n++) {
        Block block = calcBlock(previous, values[n]);
        if (block.value() == 0) {
            buffer.clear(offset++);
        } else {
            buffer.set(offset++);
            buffer.set(offset++, ctrlBit = ! block.fallInSameBlock(prevBlock));
            if (ctrlBit) {
                int leadingZero = block.leadingZero();
                int blockSize = block.blockSize();
                Validate.isTrue(leadingZero < (1 << 6));
                Validate.isTrue(blockSize < (1 << 7));
                for (int i = 5; i > 0; i--) {
                    buffer.set(offset++, ((leadingZero >> (i - 1)) & 0x1) > 0);
                }
                for (int i = 6; i > 0; i--) {
                    buffer.set(offset++, ((blockSize >> (i - 1)) & 0x1) > 0);
                }
            }
            for (int i = 0; i < block.blockSize(); i++) {
                buffer.set(offset++, block.valueOf(i));
            }
        }
        previous = values[n];
        prevBlock = block;
    }
 
    return Pair.of(Double.doubleToLongBits(values[0]), Pair.of(offset, buffer.toByteArray()));
}
 
static List<Double> decode(Pair<Long, Pair<Integer, byte[]>> data) {
 
    long previous = data.getLeft();
    int dataLen = data.getRight().getKey();
    BitSet buffer = BitSet.valueOf(data.getRight().getValue());
 
    List<Double> values = new ArrayList<>();
    values.add(Double.longBitsToDouble(previous));
 
    int offset = 0;
    Block blockMeta = null;
    while (offset < dataLen) {
        if (! buffer.get(offset++)) {
            values.add(0d);
        } else {
            boolean ctrlBit = buffer.get(offset++);
            if (ctrlBit) {
                int leadingZero = 0;
                int blockSize = 0;
                for (int i = 0; i < 5; i++) {
                    leadingZero = (leadingZero << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
                }
                for (int i = 0; i < 6; i++) {
                    blockSize = (blockSize << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
                }
                blockMeta = new Block().leadingZero(leadingZero).blockSize(blockSize).
                        tailingZero(64 - leadingZero - blockSize);
            }
            Validate.notNull(blockMeta);
            long value = 0;
            for (int i = 0; i < blockMeta.blockSize(); i++) {
                value = (value << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
            }
            previous ^= (value << blockMeta.tailingZero());
            values.add(Double.longBitsToDouble(previous));
        }
    }
 
    Validate.isTrue(offset == dataLen);
    return values;
}
 
public static void main(String[] args) {
    double[] values = new double[]{15.5, 14.0625, 3.25, 8.625, 13.1, 0, 25.5};
    Pair<Long, Pair<Integer, byte[]>> data = encode(values);
    System.out.println(data.getRight().getKey()); // 編碼后的數據長度,單位 bits
    System.out.println(decode(data)); // 解碼后的數據
}

int

 對於整形數據,首先會使用 ZigZag 編碼精簡數據。
 然后嘗試使用 RLE 或 simple8b 對精簡后的數據進行壓縮。
 如果無法不滿足壓縮條件,則存儲原始數據。

bool

 直接使用 Bitmap 對數據進行編碼。

string

 將多個字符串拼接在一起,然后使用 Snappy 進行壓縮



參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM