InfluxDB 增刪改查源碼分析


influxdb介紹

influxdb是db-engines中當前排行第一的時序數據庫,本文針對influxdb源碼的閱讀,簡單介紹influxdb的內部模塊設計,實現機制等原理,水平有限,歡迎糾正。

influxdb基礎概念

https://docs.influxdata.com/influxdb/v1.4/concepts/glossary

時序數據庫的標准數據模型:
influxdb和我們使用的普通關系型數據庫一樣,存在database, table(在influxdb中稱之為measurement),每條數據的數據模型設計:

timestamp
tags:
  tag_key: tag_value
  tag_key2: tag_value2
fileds:
   field_key: value

我們監控cpu使用量的insert語句如:

insert cpu_info,host=10.1.1.1,cpu_core=1 value=0.2,sys=0.1,user=0.1 1515413665013940928

那么數據結果為:

timestamp: 1515413665013940928.
tags:
  host(tag_key): 10.1.1.1(tag_value)
  cpu_core: 1
fields:
  value: 0.2
  sys: 0.1
  user: 0.1 

influxdb內部的數據存儲結構仍然是key/value結構:

  • key 為:measurement+timestamp+tags+filed_key,
  • value 為: field_value。

influxdb提供了類似sql的語句查詢db中存儲的時序數據,influxql:如

select * from cpu_info where time>0 and host="10.1.1.1"

其他語句可見官方文檔:https://docs.influxdata.com/influxdb/v1.4/query_language/data_download/

模塊設計

img

  • httpd:influxdb內部所有的api請求均通過httpd接口對外提供服務。

  • influxsql:influxdb內部實現了一個sql parser 模塊,在數據讀取寫入過程中會對輸入的sql進行解析。

  • meta:metainfo記錄了influxdb所有的元信息,並且dump到某一個file中,元信息包括database name, retention policy, shard groups, user等。

  • index:tags的索引信息

  • retention:自動清理過期數據功能。

  • tsdb:influxdb中最核心的模塊:存儲引擎層,influxdb引擎層的思路基本類似於lsm tree,influxdb將其稱之為tsm tree, lsm tree的介紹文章非常的多,這里不詳細進行描述。

下文我們會詳細描述存儲引擎各個模塊的工作機制。

influxdb將數據按照時間分區存儲,每個分區稱之為shard, 每個shard有各自的存儲引擎存儲數據,且相互獨立。作者在官方文檔注明這么做的原因是為了快速刪除過期數據,拆分為shard后刪除數據只需要直接清理shard所有的數據文件即可。

刪除過期數據是時序數據庫一個比較重要的的特性,如性能數據只保持最近一個月或者幾個月的數據的需求。

創建數據庫流程

根據influxdb的模塊設計,創建一個數據庫的流程如下:

  1. 進入httpd模塊,根據用戶調用的請求 POST + /query 路由到相應的函數中。

  2. 通過influxsql parse用戶輸入,獲取對象 inflxql.CreateDatabaseStatement

  3. 根據inflxql.CreateDatabaseStatement 進入meta模塊,修改元信息並且重新生成meta.db file覆蓋原先的文件。

meta模塊主要存儲維護了兩方面元信息:

  1. Database -> measurement -> retention policy -> shardGroups 數據存儲的元信息,很重要的是記錄了每個 shard 的 startTime+endTime, 這樣每次用戶的 select 查詢需要通過 metainfo 定位到哪些shard 去查詢數據。
  2. User信息,如哪個用戶是admin用戶,以及user是否對db擁有讀寫權限等,influxdb的賬戶權限體系還是比較簡單的,詳細可參考官方文檔。

數據插入流程

以下面數據為例:

name: cpu_info
time                core_num host     sys user value
----                -------- ----     --- ---- -----
1516326021157346829 2        10.1.1.1 0.1 0.1  0.2
1516326032353077597 1        10.1.1.1 0.2 0.1  0.3
1516326046959517000 3        10.1.1.1 0.1 0.4  0.5
1516326094282729378 1        10.1.1.2 0.3 0.6  0.9

當執行insert時:

insert cpu_info,host=10.1.1.1,core_num=4 value=0.2,user=0.1,sys=0.1

流程如下:

img

流程解析:

  1. 根據meta.db信息判斷寫入哪個shard, 或者是否需要新建一個shard.

  2. 獲得shard后,數據寫到Cache中。 Cache的數據會在用戶select數據時被用戶讀取,當數據量達到閾值后,落地生成tsm file,同時對應的wal數據也會被相應刪除。

  3. 數據寫入到wal中,wal的數據一般情況上不會使用,當influxdb宕機重啟時,wal數據會被讀取load成為Cache,保證數據不丟失。

  4. 當數據成功寫入到wal中后,才返回給調用者表示插入成功。

Insert數據直接寫入到Cache內存中,同時寫入到Wal (write ahead log) 文件中,所以influxdb的數據寫入性能非常優秀

數據庫查詢流程

查詢的流程涉及內容比較多,比insert復雜的多,因為會涉及到索引,所以復雜度提升。下面以查詢語句為例進行說明:

# select
select * from cpu_info where time>1000000 and host='10.1.1.1'

查詢流程如下:

img

解析influxql

select請求被httpd模塊獲取到后,經過Influxql parser后,構造為 SelectStatement structure。如上述中的influxql: select * from cpu_info where time>1000000 and host='10.1.1.1', 會被parse為幾部分:

  1. Sources: cpu_info, 為influxql中的Measurement對象,其中包含屬性,Database, RetentionPolicy,Measurement Name.

  2. Fields: Wildcard, 既上述influxql中的* .

  3. Condition: 條件表達式,在influxdb中為 Expr 對象。 根據where條件,Condition如圖:

img

// BinaryExpr represents an operation between two expressions.
type BinaryExpr struct {
    Op  Token
    LHS Expr
    RHS Expr
}

我們描述的是influxql最簡單的場景,實際上influxql目前版本位置已經是較為完善結構化查詢語言,支持很多時序場景下的常用聚合等功能。

獲取相應的Shards

根據傳入的time condition + Sources measurement, 從meta.db中獲取對應的shards,下述流程將會到每個shard中讀取指定的數據。

read seriesKey from memory index

我們根據Condition中的time>0, host='10.1.1.1', 需要解析成一個sereisKey: 這個seriesKey在influxdb內部存儲格式為string: measurementName,tag_key1=tag_value1,tag_key2=tag_value2,... 這個seriesKey就是influxdb中的索引key。

如上述請求中我們解析出來的seriesKey為三個:

cpu_info,host=10.1.1.1,cpu_cores=1
cpu_info,host=10.1.1.1,cpu_cores=2
cpu_info,host=10.1.1.1,cpu_cores=3

我們介紹下如何根據輸入的host='10.1.1.1' 的直接定位到上述的seriesKey。

influxdb中 存在一個 index 的模塊,index中存儲着 tag_key/tag_value 定位到 seriesKey 的元信息,數據存儲模型總結大致如下:

measurement_name ->
{tag_key1: {tag_value1: [seriesKey1, seriesKey2], ...}, ...}

可以預見上述的存儲結構當 tags 大量存在的情況下,會占用相當多的內存。所以在我們使用influxdb的過程中,tags的設計極為重要。

read block data

獲取到seriesKey后,根據我們調用的select * from cpu_info where time>1000000 and host='10.1.1.1' 語句解析的SelectStatement:

fields: * 
Condition: time > 1000000

和我們通過index獲取的SeriesKeys:

cpu_info,host=10.1.1.1,cpu_cores=1
cpu_info,host=10.1.1.1,cpu_cores=2
cpu_info,host=10.1.1.1,cpu_cores=3

由此從tsm file中獲取到我們想要的block data。

tsm file的存儲結構

提到block data 這里簡單介紹下tsm file的存儲結構。官方文檔中描述的存儲結構如下:

img

其實主要就分為了Blocks和Index兩個部分。 Blocks存儲了實際的數據塊,Index存儲了到實際Blocks的映射。索引數據結構如下:

img

其中:

  • Index中的Key 即為我們在上述步驟中獲取到的SeriesKey + field key.
  • Index中的 Offset指向的是tsm file中對應Block的position。

擁有這兩個屬性,我們就能通過SeriesKey定位到具體的block.

如何從seriesKey定位到Index在tsm file中的position?

  1. 內存中還存儲了一個數據結構IndirectIndex, 間接索引。

  2. IndirectIndex中Offsets存儲了每一條Index record start position。

  3. 獲取到Index position的流程是通過二分查找法在offsets所有的position獲取到Index中的key, 和當前select條件中的sereiesKey進行比較,當然這個前提是tsm file中index是有序存儲的。代碼如下:

// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice).
i := sort.Search(len(d.offsets), func(i int) bool {
    // i is the position in offsets we are at so get offset it points to
    offset := d.offsets[i]

    // It's pointing to the start of the key which is a 2 byte length
    keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))

    // See if it matches
        // 傳入key和 d.b[offset+2:offset+2+keyLen] 進行比較。
    return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
})

獲取到index position后,tsm index中的Offset記錄了對應的Block data在 tsm file中的偏移位置,這樣就可以順利讀取到values.

read from cache

cache中的存儲結構比較簡單,就是一個hashtable,hash key即為seriesKey, 所以根據seriesKey獲取values的時間復雜度為O(1),非常的快。

為何其他lsm tree存儲引擎實現中cache使用linked list (leveldb中稱之為Memtable)這種數據結構,而influxdb中卻是用hashtable方法,我猜測是leveldb中了支持范圍搜索的需求,需要可以進行范圍查詢的數據結構。 而influxdb的select 語句中,是不允許tag key > 或 < 等條件的,所有的tag value均存儲為string格式,且需要精確匹配,沒有這方面的需求。

merge block files and cache data

Influxdb 的 tsm tree 存儲引擎擁有所有類lsm tree存儲引擎的通病,既高並發的寫入是犧牲了一定的查詢性能的。

  • Merge Blocks data:influxdb數據寫入到cache中后,當數據寫入量達到一定閾值后會dump寫入到磁盤中形成tsm file. 並且存在compaction level的概念,如第一次寫入到磁盤中compaction level為1。隨着后續數據的合並,低層級tsm file進行compaction合並為一個新的高一級的tsm file,compaction level 會遞增,如下圖為shardId為126的shard存在兩個tsm file. 000000717-000000002.tsm和000000717-000000003.tsm,其中00002.tsm表示為level 2的compaction tsm file, 00003.tsm file表示為level 3的compaction tsm file.
├── ps_retention_10m
│   ├── 126
│   │   ├── 000000717-000000002.tsm
│   │   └── 000000717-000000003.tsm

同一個seriesKey 可能會存儲在兩個compaction level的tsm file中,所以數據讀取需要遍歷兩個block塊數據,並進行去重處理。compaction的流程是tsm tree存儲引擎中最重要的組成部分,篇幅有限,暫不多做描述。

除了tsm files中的block datas合並,還需要和cache data進行合並,按照后寫覆蓋先寫的原則,進行去重。

數據刪除流程

假設用戶執行 influxql 如下:

delete from cpu_info where host='10.1.1.1' and time>10000000;

跟 select 流程類似,流程如下:

  1. 根據condition, 通過metainfo映射到所有的shards,對下述的shard均執行以下操作

  2. 根據condition從index中獲取到相應的seriesKeys

  3. 刪除cache數據:根據seriesKeys和time區間刪除cache中的數據,並且寫入到wal 中

  4. 刪除tsm file中的數據:此時創建了tombstone file, 標識哪些數據已經被刪除了,並不會修改實際上的tsm file。

  5. 判斷dalete完成后,是否seriesKey對應的所有的數據被刪除,則同時刪除index數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM