時序數據庫永遠的難關 — 時間線膨脹(高基數 Cardinality)問題的解決方案


簡介: 本文主要討論 influxdb 在遇到寫入的數據出現高基數 Cardinality 問題時,一些可行的解決方案。

作者 | 徐建偉 (竹影)

前序

隨着移動端發展走向飽和,現在整個 IT 行業都期待着“萬物互聯”的物聯網時代。在物聯網場景中,往往有許多各類不同的終端設備,布署在不同的位置,去采集各種數據,比如某一區域有 10萬個 loT 設備,每個 loT 設備每 5 秒發送一次數據。那么每年會產生 6307億 個數據點。而這些數據都是順序產生的,並且 loT 設備產生數據的格式全部是一致的,並且沒有刪除和修改的需求。針對這樣按時海量寫入無更新場景,時序數據庫應運而生。

時序數據庫在假定沒有數據插入和更新需求,數據結構穩定的前提下,極限追求快速寫入,高壓縮,快速檢索數據。時序數據的 Label(tag)會建立索引,以提高查詢性能,以便你可以快速找到與所有指定標簽匹配的值。如果 Label(tag)值的數量過多時(高基數 Cardinality 問題),索引會出現各種各樣的問題, 本文主要討論 influxdb 在遇到寫入的數據出現高基數 Cardinality 問題時,一些可行的解決方案。

高基數Cardinality問題(時間線膨脹)

時序數據庫主要存儲的是 metric 數據,每一條數據稱為一個樣本(sample),樣本由以下三部分組成:

  • 指標(時間線 time-series):metric name 和描述當前樣本特征的 labelsets;
  • 時間戳(timestamp):一個精確到毫秒的時間戳;
  • 樣本值(value):表示當前樣本的值。

<-------------- time-series="" --------=""><-timestamp -----=""> <-value->

node_cpu{cpu=“cpu0”,mode=“idle”} @1627339366586 70

node_cpu{cpu=“cpu0”,mode=“sys”} @1627339366586 5

node_cpu{cpu=“cpu0”,mode=“user”} @1627339366586 25

通常情況下, time-series 中的 lablelsets 是有限的,可枚舉的,比如上面的例子 model 可選值為 idle,sys,user。

prometheus 官方文檔中對於 Label 的建議:

CAUTION: Remember that every unique combination of key-value label pairs represents a new time series, which can dramatically increase the amount of data stored. Do not use labels to store dimensions with high cardinality (many different label values), such as user IDs, email addresses, or other unbounded sets of values.

時序數據庫的設計時,也是假定在時間線低基數的前提下。但是隨着 metric 的廣泛使用,在很多場景下無法避免出現時間線膨脹。

比如,在雲原生場景下 tag 出現 pod/container ID之類,也有些 tag 出現 userId,甚至有些 tag 是 url,而這些 tag 組合時,時間線膨脹得非常厲害。

這個矛盾出現是必然的,怎么解決呢?是寫入數據方調整寫入數據時,控制寫入的 time-series的數量,還是時序數據庫去更改設計來適用這種場景?這個問題沒有完美的解決方案,我們需要做出平衡。

從實際情況出發,如果時間線膨脹后,時序數據庫不會出現不可用,性能也不會出現指數級別下降。也就是說時間線不膨脹時,性能優秀。時間線膨脹后,性能能達到良好或者及格就好。

那怎么讓時序數據庫在時間線膨脹的情況下性能還能良好呢?接下來我們通過influxdb的源碼來討論這個問題。

時間線的處理邏輯

influxdb 的 tsm 結構,主要的邏輯處理過程類似 lsm。數據上報后,會添加到 cache 和日志文件(wal)。為了加快檢索速度或者壓縮比例,會對上報的數據進行 compaction(數據文件合並,重新構建索引)。

索引涉及到三個方面:

  • TSI(Time Series Index)檢索Measurement,tag,tagval,time
  • TSM(Time-Structured Merge Tree)用來檢索time-series -> value
  • Series Segment Index 用來檢索 time-series key <–> time-series Id

具體influxdb的索引實現可以參照官方文章。

https://github.com/influxdata/influxdb/blob/master/tsdb/index/tsi1/doc.go

1.jpg

 

當時間線膨脹后,TSI 和 TSM 的檢索性能下降並不嚴重,問題主要是出現在 Series Segment Index 里。

這節我們會討論influxdb的時間線文件的正排索引(time-series key ->id, id->time-series key):

  • SeriesFile 是 Database(bucket)級別的。
  • SeriesIndex 主要處理 key->Id, key->id 的索引映射。
  • SeriesSegment 主要存放的是 Series 的 Id 和 key。
  • SeriesIndex 里面是存放 Series 的 Id 和 key 等索引。(可以理解是兩個 hashmap)
  • keyIDMap 通過 key 來查找對應的 Id。
  • idOffsetMap 通過 Id 查到到 offset,通過這個 offset(對應 SeriesSegment 的位置)來查找 SeriesSegment 文件獲取 key。

2.jpg

2.jpg

2.jpg

2.jpg

具體的代碼(influxdb 2.0.7)如下:

tsdb/series_partition.go:30 // SeriesPartition represents a subset of series file data. type SeriesPartition struct { ... segments []*SeriesSegment index *SeriesIndex seq uint64 // series id sequence .... } tsdb/series_index.go:36 // SeriesIndex represents an index of key-to-id & id-to-offset mappings. type SeriesIndex struct { path string ... data []byte // mmap data keyIDData []byte // key/id mmap data idOffsetData []byte // id/offset mmap data // In-memory data since rebuild. keyIDMap *rhh.HashMap idOffsetMap map[uint64]int64 tombstones map[uint64]struct{} }

對 series key 進行檢索時,會先在內存 map 中查找,然后在磁盤的 map 上查找,具體的實現代碼如下:

tsdb/series_index.go:185 func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 { // 內存map查找 if v := idx.keyIDMap.Get(key); v != nil { if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) { return id } } if len(idx.data) == 0 { return 0 } hash := rhh.HashKey(key) for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { // 磁盤map查找offset elem := idx.keyIDData[(pos * SeriesIndexElemSize):] elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) if elemOffset == 0 { return 0 } // 通過offset獲取對於的id elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) elemHash := rhh.HashKey(elemKey) if d > rhh.Dist(elemHash, pos, idx.capacity) { return 0 } else if elemHash == hash && bytes.Equal(elemKey, key) { id := binary.BigEndian.Uint64(elem[8:]) if idx.IsDeleted(id) { return 0 } return id } } }

這里補充一個知識點,將內存 hashmap 轉成磁盤 hashmap 的實現。我們都知道 hashmap 的存儲是數組,influfxdb 中的實現是通過 mmap 方式映射磁盤空間(見 SeriesIndex 的 keyIDData),然后通過 hash 訪問數組地址,采用的 Robin Hood Hashing,符合內存局部性原理(查找邏輯的代碼如上 series_index.go 中)。將 Robin Hood Hashtable 純手動移植磁盤 hashtable, 開發人員還是花了不少心思。

那內存 map 和磁盤 map 是如何生成的,為什么需要兩個 map?

influxdb 的做法是將新增的 series key 先放到內存 hashmap 里面,當內存 hashmap 增長大於閾值時,將內存 hashmap 和磁盤 hashmap 進行 merge(遍歷所有 SeriesSegment,過濾已經刪除的 series key)生成一個新的磁盤 hashmap,這個過程叫做 compaction。compation 結束后內存 hashmap 被清空,然后繼續存放新增的 series key。

3.jpg

tsdb/series_partition.go:200 // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) && p.compactionLimiter.TryTake() { p.compacting = true log, logEnd := logger.NewOperation(context.TODO(), p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path)) p.wg.Add(1) go func() { defer p.wg.Done() defer p.compactionLimiter.Release() compactor := NewSeriesPartitionCompactor() compactor.cancel = p.closing if err := compactor.Compact(p); err != nil { log.Error("series partition compaction failed", zap.Error(err)) } logEnd() // Clear compaction flag. p.mu.Lock() p.compacting = false p.mu.Unlock() }() }

tsdb/series_partition.go:569 func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { hdr := NewSeriesIndexHeader() hdr.Count = seriesN hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) // Allocate space for maps. keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) // Reindex all partitions. var entryN int for _, segment := range segments { errDone := errors.New("done") if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { ... // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Ignore entry if tombstoned. if index.IsDeleted(id) { return nil } // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) }); err == errDone { break } else if err != nil { return err } }

這樣設計有兩個缺陷:

  1. 做 compaction 時,當 io 訪問 SeriesSegments 文件, 內存加載所有的 series key,會構建一個新的 hashtable,然后將這個 hashtable mmap 存儲到磁盤,當 series key 超過幾千萬或者更多時,會出現內存不夠,oom 問題。
     
  2. 做 compaction 時, 對於已經刪除的 series key(tombstone 標記)做了過濾,不生成 series index,但是 SeriesSegment 中已經刪除 series key 只有做了 tombstone 標記,不會做物理刪除,這樣會導致 SeriesSegment 一直膨脹,在實際生產環境一個 partition 下的所有 segmeng 文件超過幾十 G,做 compaction 時,會產生大量 io 訪問。

可行的解決方案

1、增加partition或者database

influxdb 的正排索引是 database 級別的,有兩個方式可以減少 compaction 時的內存,一個是增加 partition 數量或者將多個 Measurement 划到不同的 database 里面。

但這樣做的問題是,已經存在數據的 influxdb 不好調整兩個數據。

2、修改時間線存儲策略

我們知道 hash 索引是 O1 的查詢,效率非常高,但是對於增長性的數據,存在擴容問題。那我們做個折中的選擇。當 partition 大於某個閾值時,將 hash 索引變成 b+tree 索引。b+tree 對於數據膨脹性能下降有限,更適合高基數問題,而且不再需要全局的 compaction。

3、將series key的正排索引下沉到shard級別

influxdb 里面每個 shard 都是有時間區間的,某個時間區間內的時間線數據並不大。比如 database 里面保存的是 180天 的 series key,而 shard 一般只有一天甚至 1 個小時的跨度,兩者存放的 series key 存在 1~ 2 個數量級的差距。另外將 series key 正排索引下沉到 shard 級別對刪除操作更友好,當 shard 過期刪除時,會將當前 shard 的所有 series key 和其他 shard 做 diff,當 series key 不存在時再去刪除 series key。

4、根據Measurement修改時間線存儲策略

在實際生產環境中,時間線膨脹和 Measurement 有很大關系,一般是少數的 Measurement 存在時間線膨脹問題,但是絕大部分的 Measurement 不存在時間線爆炸的問題。

我們可以對做 series key 的正排索引的 compaction 時,可以添加 Measurement 時間線統計,如果某個 Measurement 的時間線膨脹時,可以將這個 Measurement 的所有 series key 切換到 B+ tree。而不膨脹的 series key 繼續保留走 hash 索引。這樣方案性能比第二個方案更好,開發成本會更高一些。

目前高基數問題主要體現在 series key 正排索引。個人覺得短期先做第二個方案過度到第四個方案的方式。這樣可以比較好的解決時間線增長的問題,性能下降不多,成本不高。第三個方案改動比較大,設計更合理,可以作為一個長期修復方案。

總結

本文主要通過 influxdb 來講解時序數據庫的高基數 Cardinality 問題,以及可行的方案。metric 的維度爆炸導致數據線膨脹問題,很多同學都認為這是對時序數據庫的誤用或者是濫用。但是信息數據爆炸的今天,讓數據維度收斂,不發散成本非常高,甚至遠高於數據存儲成本。

個人覺得需要對這個問題進行分而治之的方式,提升時序數據庫對維度爆炸的容忍度。換句話說,出現時間線膨脹后,時序數據庫不會出現崩潰情況,對時間線未膨脹的 metric 繼續高效運行,而出現時間線膨脹的 metic 可以出現性能下降,單不會線性下降。提升對時間線膨脹的容忍度,控制時間線膨脹的爆炸半徑,將會成為時序數據庫的核心能力。

原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM