時序數據基礎
時序數據特點
時序數據TimeSeries
是一連串隨時間推移而發生變化的相關事件。
以下圖的 CPU 監控數據為例,同個 IP 的相關監控數據組成了一條時序數據,不相關數據則分布在不同的時間序列上。
常見時序數據有:
- 監控日志:機器的 CPU 負載變化
- 用戶行為:用戶在電商網站上的訪問記錄
- 金融行情:股票的日內成交記錄
這類數據具有以下特點:
- 必然帶有時間戳,可能存在時效性
- 數據量巨大,並且生成速度極快
- 更關注數據變化的趨勢,而非數據本身
關系型數據庫的不足
當面對時序數據時,傳統的關系型數據庫就顯得有些力不從心。
關系型數據庫模型 | 時序數據庫需求 |
---|---|
數據按主鍵索引組織、存儲 | 數據按時間戳進行組織、存儲,便於按時間維度查詢 |
數據持久化后永久存在 | 數據具有的生命周期,定期清理過期數據 |
支持復雜的 OLTP 功能(點查、改、刪) | 支持的 OLAP 操作(基於時間窗口) |
並發修改加鎖,提供事務保證 | Last Write Win 解決寫沖突,無需事務 |
兩者之間的存在的沖突:
- 如果主鍵設計的不好,時序數據的順序插入可能變為隨機寫入,影響寫入性能
- 傳統關系型數據為提高數據生命周期管理功能,需要定時執行清理任務,避免磁盤空間耗盡
- 關系型數據庫對事務的支持,對於時序數據來說略顯多余,還會影響寫入能力
除此之外,關系型數據庫還存在以下天然短板:
- 需要通過分表分庫
sharding
實現橫向擴展
- 寫時模式
schema on write
靈活度不足
Redis 的不足
存儲時序數據庫的另一個挑戰就是其誇張的數據生成速度。以用戶行為數據為例,如果一個接口的QPS是1萬。就意味着一秒鍾內會生成1萬條用戶行為記錄。假設這樣的接口有100個,那么每秒鍾生成的記錄數可達100萬。
一種解決方式是將消息積壓至 Kafka 這類中間件,然后進行異步處理。但對於某些業務場景來說,這一處理方式則顯得不合時宜。以股票成交數據為例,為了保障行情的時效性,無法采用異步批處理的方式實現。為了實現極高的寫入吞吐量,通常會考慮使用 Redis 實現這一功能。
然而這一方案也存在以下問題:
- redis 不適合存儲大 Key,刪除 Key 的內存釋放操作可能導致長時間的阻塞
- 假設數據以 list 的形式存儲,執行 lrange 命令的時間復雜度為O(S+N),訪問性能較差
- 內存空間有限,無法存儲大量的數據
時序數據庫
時序數據庫是一類專門用於存儲時序數據的數據管理系統,這類數據庫的設計思想大致可以總結為下面幾條:
- 使用特殊設計的外存索引來組織數據
- 強制使用 timestamp 作為唯一的主鍵
- 不檢查寫沖突,避免加鎖,提高寫入性能
- 對按時間順序寫入進行優化,提高寫入性能
- 不支持細粒度的數據刪除功能,提高查詢寫入性能
- 犧牲強一致性來提高系統的查詢吞吐量,提高查詢性能
- 提供基於時間窗口的 OLAP 操作,放棄關聯查詢等高級功能
- 通過無模式
schemaless
設計使系統更易於水平擴展
時序數據模型
類似於關系型數據庫,時序數據庫也有自己的數據模型,並且兩者直接存在不少相似之處:
關系模型 | 時序模型 | 含義 |
---|---|---|
table | metric / measurement | 表 → 指標(時間序列集合) |
column | value / field | 列 → 值(無索引列) |
index | tag | 索引 → 標簽(索引列) |
row | point | 記錄行 → 數據點(時間序列中某個時刻的數據) |
primary key | timestamp | 行主鍵 → 點時間戳(時間序列內唯一標識) |
其中 tag 的概念較為重要:
- tag 是一個字符串類型的鍵值對
- tag 並不是時序數據,而是元數據
- tag 的唯一組合確定一個時間序列
- tag 可以方便實現粗粒度的聚合
- tag 決定了索引的組織形式
- tag 組合的數量數量不宜過多
常見的時序數據庫
時序數據庫排行榜中介紹了不少的時序數據庫,其中比較具有代表性的有以下兩款。
OpenTSDB
OpenTSDB 是一種基於 HBase 來構建的分布式、可擴展的時間序列數據庫。OpenTSDB 被廣泛應用於存儲、索引和服務從大規模計算機系統(網絡設備、操作系統、應用程序)采集來的監控指標數據,並且使這些數據易於訪問和可視化。
OpenTSDB 由時間序列守護程序 (TSD) 以及一組命令行實用程序組成。每個 TSD 都是獨立的。 沒有主節點,沒有共享狀態。
-
優點:
TSD 是無狀態的,所有狀態數據保存在 HBase 中,天然支持水平擴展 -
缺點:
-
Hadoop 全家桶運維難度較大,需要專人負責
- 新版本的 OpenTSDB 底層支持 Cassandra
-
存儲模型過於簡化
- 單值模型且只能存儲數值類型數據
- 單個 metric 最多支持 8 個 tag key
-
雖然利用了 HBase 作為底層存儲,但是沒有提供對 MapReduce 的支持。
-
InfluxDB
時序數據庫 InfluxDB 是一款專門處理高寫入和查詢負載的時序數據庫,基於 InfluxDB 能夠快速構建具有海量時序數據處理能力的分析和監控軟件。
該項目的發起者是 influxdata 公司,該公司提供了一套用於處理時序數據的完整解決方案,InfluxDB 是該解決方案中的核心產品。
-
優點:
- 開箱即用,運維簡單
- 多值存儲模型、豐富的數據類型
- 提供了類 SQL 的查詢語言
- 獨創 TSM 索引
-
缺點:
- 開源版本不支持集群部署,對於大規模應用來說,使用前需要慎重考慮
深入InfluxDB
數據模型
InfluxDB 的數據模型已經很接近傳統的關系模型:
|
![]() |
保留策略retention policy
用於管理數據生命周期,其中包含了:
-
持續時間
duration
:指定了數據保留時間,過期的數據將自動從數據庫中刪除 -
副本個數
replication factor
:指定了集群模式下,需要維護數據副本的個數(僅在集群模式下有效) -
分片粒度
hard duration)
:指定了 shard group 的時間跨度(影響數據分片大小)
保留策略與 database 存在以下關系:
- 一個 database 可以有多個 RP,每個 RP 只屬於一個 database
1:N
- 創建 point 時可以指定 RP,一個 measurement 可以有不同的 RP
N:N
這意味着:
Series
時間序列 Series 在 InfluxDB 中也是個核心概念:
series key | measurement + tag + field key |
series | 時間序列,serial key 相同的數據集合 |
series cardinality | 序列基數,series key 的數量 |
為了唯一標識一個時間序列,InfluxDB 引入了 Serieskey 的概念:
Serieskey 的數量稱為序列基數 series cardinality
:
上圖的 series cardinality 為 4,其中包含以下 series key:
measurement + tags + field |
---|
census, location=1, scientist=langstroth, butterflier census, location=1, scientist=langstroth, honeybees census, location=1, scientist=perpetual, butterflier census, location=1, scientist=perpetual, honeybees |
注意:即便兩條記錄的 measurement、time、tag、field 完全一致,但只要使用的是不同的 RP,那么它們就屬於不同的 series,會被存儲在不同的 bucket 中。
查詢語言
InfluxDB 提供了兩種查詢語言:
- InfluxQL:類 SQL 的聲明式查詢語言,同時具有 DDL 與 DML 功能
- Flux:函數式查詢語言,僅支持 DML 功能,能支持復雜的查詢條件,但不支持增刪改操作
下面通過一些實際操作來熟悉一下 InfluxQL:
# 創建數據庫
CREATE DATABASE "sample_data"
USE sample_data
# 插入樣例數據,格式參考:https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial
INSERT census,location=1,scientist=langstroth butterflies=12i,honeybees=23i 1439827200000000000
INSERT census,location=1,scientist=perpetua butterflies=1i,honeybees=30i 1439827200000000000
INSERT census,location=1,scientist=langstroth butterflies=11i,honeybees=28i 1439827560000000000
INSERT census,location=1,scientist=perpetua butterflies=3i,honeybees=28i 1439827560000000000
INSERT census,location=2,scientist=langstroth butterflies=2i,honeybees=11i 1439848440000000000
INSERT census,location=2,scientist=langstroth butterflies=1i,honeybees=10i 1439848800000000000
INSERT census,location=2,scientist=perpetua butterflies=8i,honeybees=23i 1439849160000000000
INSERT census,location=2,scientist=perpetua butterflies=7i,honeybees=22i 1439849520000000000
# 顯示數據庫中的表
# measurement 無需預先定義,由 InfluxDB 動態創建
SHOW MEASUREMENTS
# 顯示數據庫中的 field key
SHOW FIELD KEYS
# 顯示數據庫中的 tag key
SHOW TAG KEYS
# 顯示數據庫中的 tag value
SHOW TAG VALUES WITH KEY = scientist
# 查詢所有數據
SELECT * FROM census;
# 對 location = 1 的數據求和
SELECT SUM(butterflies) AS butterflies, SUM(honeybees) AS honeybees FROM census WHERE location = '1';
# 刪除 location = 1 的數據
DELETE FROM census WHERE location = '1';
SELECT * FROM census;
# 更新特定數據
SELECT * FROM census;
INSERT census,location=2,scientist=perpetua butterflies=10i,honeybees=50i 1439849520000000000
SELECT * FROM census;
# 更新數據時要保證數據類型一致,否則會報錯
INSERT census,location=2,scientist=perpetua butterflies=abc,honeybees=efg 1439849520000000000
# 刪除數據庫
DROP DATABASE sample_data;
Flux 無命令行支持,只能通過 http 接口請求。有興趣可以參考下面的腳本,動手嘗試一下:
curl -XPOST 127.0.0.1:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-H 'Authorization: Token root:123456' \
-d '
from(bucket:"sample_data")
|> range(start:time(v: 1439827200000), stop:time(v: 143984952000))
|> filter(fn:(r) => r._measurement == "census" and r.location == "1" and (r._field == "honeybees" or r._field == "butterflies"))
|> limit(n: 100)'
整體架構
在了解完查詢語言之后,接下來看看 InfluxDB 的整體架構:
上圖將 InfluxDB 分為了 4 層,上面 database 與 RP 這兩層之前已經介紹過,我們重點關注下面兩層:
shard | 存儲的時序數據的磁盤文件 |
shard group | shard 容器,責管理數據的生命周期,清除過期的數據 |
shard duration | shard duration |
由於時序數據的數據量通常十分巨大,因此 InfluxDB 的設計中引入了分片的策略。並且同時采用了兩種分片策略:
- shard group 層采用了基於時間的分片策略,方便實現按照時間條件范圍查詢
- shard 層則是基於 hashmod 進行分片,避免出現寫熱點產生性能瓶頸
每個 shard 由 WAL、Cache、TSM文件 3 部分組成:
整個數據的寫入流程簡化為 3 個步驟:
- 先寫入 WAL
- 然后寫入 Cache
- 最終持久化為 TSM File
WAL
預寫日志Write-Ahead-Log
是一種常見的提高數據庫優化手段,能夠在保證數據安全的同時,提升系統的寫入性能。
InfluxDB WAL 由一組定長的 segement 文件構成,每個文件大小約為 10MB。這些 segment 文件只允許追加,不允許修改。
Cache
Cache 是 WAL 的一個內存快照,保證 WAL 中的數據對用戶實時可見。
當 Cache 空閑或者過滿時,對應的 WAL 將被壓縮並轉換為 TSM,最終釋放內存空間。
每次重啟時會根據 WAL 重新構造 Cache。
TSM File
TSM 是一組存儲在磁盤上的外存索引文件,細節將在后續進行介紹。
它們之間的關系可以簡單描述為:
- Cache = WAL
- Cache + TSM = 完整的數據
存儲引擎發展史
在討論 TSM 之前,線回顧一下 InfluxDB 存儲引擎的發展歷程:
LSM tree 時代 (0.8.x)
- 引擎:LevelDB、RocksDB、HyperLevelDB、LMDB
- 優點:極高的寫入吞吐量,且支持數據壓縮
- 缺點:
- 懶刪除機制導致刪除操作耗時,且過期數據無法及時清理
- 按照時間維度分庫可以規避上述問題,但是又會導致新問題:
- 單個進程打開過多的文件導致句柄耗盡
- 過多的 WAL 可能會令順序追加退化為隨機寫入
B+Tree 時代 (0.9.x)
- 引擎:BoltDB
- 優點:單文件模型,穩定性更好,Go 實現更易嵌入
- 缺點:
- 文件較大時,寫放大效應會導致 IOPS 會極劇上升
- 通過在 Bolt 前嵌入自研的 WAL 模塊緩解了這一問題:
- 合並多個相鄰的寫入操作,減少 fsync
- 將隨機寫入變為順序追加,減少寫放大
TSM-Tree 時代 (0.9.5)
- 引擎:Time Structured Merge Tree
- 特點:
- 整體實現借鑒 LSM tree 架構,能有效規避寫放大
- 更少的數據庫文件,避免順序寫退化為隨機寫,不再出現文件句柄耗盡的情況
- 針對時序數據特性,采用了更具針對性的數據壓縮算法
關於 LSM-Tree 與 B-Tree 的寫放大分析,可以參考這篇文章:https://www.cnblogs.com/buttercup/p/12991585.html
TSM 解析
數據組織
TSM 是一個列存引擎columnar storage
,內部按照 SeriesKey 對時序數據進行組織:
- 每個 SeriesKey 對應一個數組,里面存儲着 time,value 構成的時間點數據
- 同個 SeriesKey 的數據存儲在一起,不同的 SeriesKey 的數據分開存儲
列存引擎的優勢
-
高效處理動態 schema 與稀疏數據
新增列時,對現有的數據無影響。並且由於不同列相互分離,可以直接忽略 null 值,不需要耗費空間存儲標記
-
同類型的數據能夠進行高效的壓縮
同個列的數據必然具有相同的數據類型,可以采取不同的壓縮手段進行優化。
-
查詢時能減少不必要的 I/O
查詢時能夠指定要返回的數據列,可以按需遍歷用戶指定的列,對於 OLAP 操作更友好。
列存引擎的劣勢
-
存儲稠密數據需要付出額外代價
當多個列具有相同的時間戳時,timestamp 會被重復存儲。
-
數據變更操作需要更多的 I/O
列分開存儲后,查、改、刪操作可能要同時修改多個文件。
-
無法提供原子性操作,事務實現困難
無法實現高效的悲觀鎖,如果使用場景中需要用到事務,建議使用行存引擎。
文件格式
TSM File 是一組列存格式的只讀文件,每個文件對應一組特定的 SeriesKey。
每個文件由 4 部分構成:
- Header:幻數 + 版本號
- DataBlock:時序數據塊
- IndexBlock:時序數據索引
- Footer:索引塊指針
Block 與 Series 的對應關系:
- 每個 IndexBlock 只屬於一個 Series
- 每個 DataBlock 只保存一個 Field 的數據
DataBlock 的結構較為簡單,其中存儲了壓縮過的時序數據:
DataBlock | |
---|---|
Type | 數據類型 |
Length | 時間戳數據長度 |
Timestamps | 時間戳列表(壓縮后) |
Values | 的時序數據列表(壓縮后) |
IndexBlock 則較為復雜,每個 IndexBlock 由一個 Meta 和多個 Entry 構成:
|
|
|
Meta 中存儲了 IndexBlock 對應的 SeriesKey 以及對應的 Entry 數量。
每個 Entry 對應一個 DataBlock,描述了這個 DataBlock 對應的時間區間,以及實際的存儲地址。
當需要查找 TSM 中的數據時,只需要將 IndexBlock 加載到內存中。就可以定位到相應的數據,提高查詢效率。
壓縮算法
InfluxDB 中的數據類型可以分為五種 timestamp
,float
, int
, bool
, string
。為了達到最優的壓縮效果,InfluxDB 針對不同類型的數據,使用了不同的壓縮算法。不過這些壓縮算法的原理都大同小異:使用變長編碼來保存數據有效位,避免存儲無效的 0 bit。
timestamp
時序數據都是按照時間順序進行排序的,因此首先會使用 delta-delta
編碼精簡數據:
若時間按固定區間分布,優先使用游程編碼run-length encoding
進行壓縮:
264
個時間戳
若編碼后所有值均小於260
,則使用simple8b
編碼,將多個值打包進單個 64bit 整數中:
- selector(4bit) 用於指定剩余 60bit 中存儲的整數的個數與有效位長度
- payload(60bit) 則是用於存儲多個定長的整數
根據一個查找表,將數據模式匹配到最優的 selector,然后將多個數據編碼至 payload
如果無法不滿足以上壓縮條件,則直接存儲原始數據。
float
Facebook 工程師通過觀察時序數據,發現相鄰時序數據進行異或操作后,僅有中間一小部分發生了變化。
根據這個規律,發明了一個簡單高效的浮點數壓縮算法:先異或求值,然后存儲中間的有效數據。
通過這一算法,他們將浮點數據的平均存儲空間壓縮至 1.37 字節。
算法過程可以參考這篇論文,或者直接參考下面的實現:
@Data
@Accessors(fluent = true)
@ToString
static class Block {
int leadingZero;
int tailingZero;
int blockSize;
long value;
boolean valueOf(int i) {
Validate.isTrue(i < blockSize);
return ((value >>> (blockSize-1-i)) & 0x1) > 0;
}
boolean fallInSameBlock(Block block) {
return block != null && block.leadingZero == leadingZero && block.tailingZero == tailingZero;
}
}
static Block calcBlock(double x, double y) {
long a = Double.doubleToRawLongBits(x);
long b = Double.doubleToRawLongBits(y);
long xor = a ^ b;
Block block = new Block().
leadingZero(Long.numberOfLeadingZeros(xor)).
tailingZero(Long.numberOfTrailingZeros(xor));
return block.value(xor >>> block.tailingZero()).
blockSize(block.value() == 0 ? 0 : 64 - block.leadingZero() - block.tailingZero());
}
static Pair<Long, Pair<Integer, byte[]>> encode(double[] values) {
int offset = 0;
BitSet buffer = new BitSet();
boolean ctrlBit;
double previous = values[0];
Block prevBlock = null;
for (int n=1; n<values.length; n++) {
Block block = calcBlock(previous, values[n]);
if (block.value() == 0) {
buffer.clear(offset++);
} else {
buffer.set(offset++);
buffer.set(offset++, ctrlBit = ! block.fallInSameBlock(prevBlock));
if (ctrlBit) {
int leadingZero = block.leadingZero();
int blockSize = block.blockSize();
Validate.isTrue(leadingZero < (1 << 6));
Validate.isTrue(blockSize < (1 << 7));
for (int i = 5; i > 0; i--) {
buffer.set(offset++, ((leadingZero >> (i - 1)) & 0x1) > 0);
}
for (int i = 6; i > 0; i--) {
buffer.set(offset++, ((blockSize >> (i - 1)) & 0x1) > 0);
}
}
for (int i = 0; i < block.blockSize(); i++) {
buffer.set(offset++, block.valueOf(i));
}
}
previous = values[n];
prevBlock = block;
}
return Pair.of(Double.doubleToLongBits(values[0]), Pair.of(offset, buffer.toByteArray()));
}
static List<Double> decode(Pair<Long, Pair<Integer, byte[]>> data) {
long previous = data.getLeft();
int dataLen = data.getRight().getKey();
BitSet buffer = BitSet.valueOf(data.getRight().getValue());
List<Double> values = new ArrayList<>();
values.add(Double.longBitsToDouble(previous));
int offset = 0;
Block blockMeta = null;
while (offset < dataLen) {
if (! buffer.get(offset++)) {
values.add(0d);
} else {
boolean ctrlBit = buffer.get(offset++);
if (ctrlBit) {
int leadingZero = 0;
int blockSize = 0;
for (int i = 0; i < 5; i++) {
leadingZero = (leadingZero << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
for (int i = 0; i < 6; i++) {
blockSize = (blockSize << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
blockMeta = new Block().leadingZero(leadingZero).blockSize(blockSize).
tailingZero(64 - leadingZero - blockSize);
}
Validate.notNull(blockMeta);
long value = 0;
for (int i = 0; i < blockMeta.blockSize(); i++) {
value = (value << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
previous ^= (value << blockMeta.tailingZero());
values.add(Double.longBitsToDouble(previous));
}
}
Validate.isTrue(offset == dataLen);
return values;
}
public static void main(String[] args) {
double[] values = new double[]{15.5, 14.0625, 3.25, 8.625, 13.1, 0, 25.5};
Pair<Long, Pair<Integer, byte[]>> data = encode(values);
System.out.println(data.getRight().getKey()); // 編碼后的數據長度,單位 bits
System.out.println(decode(data)); // 解碼后的數據
}
int
對於整形數據,首先會使用 ZigZag 編碼精簡數據。
然后嘗試使用 RLE 或 simple8b 對精簡后的數據進行壓縮。
如果無法不滿足壓縮條件,則存儲原始數據。
bool
直接使用 Bitmap 對數據進行編碼。
string
將多個字符串拼接在一起,然后使用 Snappy 進行壓縮