Druid.io系列(四):索引過程分析


原文鏈接: https://blog.csdn.net/njpjsoftdev/article/details/52956083

 

Druid底層不保存原始數據,而是借鑒了Apache Lucene、Apache Solr以及ElasticSearch等檢索引擎的基本做法,對數據按列建立索引,最終轉化為Segment,用於存儲、查詢與分析。

首先,無論是實時數據還是批量數據在進入Druid前都需要經過Indexing Service這個過程。在Indexing Service階段,Druid主要做三件事:第一,將每條記錄轉換為列式(columnar format);第二,為每列數據建立位圖索引;第三,使用不同的壓縮算法進行壓縮,其中默認使用LZ4,對於字符類型列采用字典編碼(Dictionary encoding)進行壓縮,對於位圖索引采用Concise/Roaring bitmap進行編碼壓縮。最終的輸出結果也就是Segment。

下面,我們先講解Druid的索引過程中的幾個基本概念,再介紹實時索引的基本原理,最后結合我們在生產環境中使用過的兩種索引模式加深對原理的理解。

1 Segment粒度與時間窗口

Segment粒度(SegmentGranularity)表示每一個實時索引任務中產生的Segment所涵蓋的時間范圍。比如設置{”SegmentGranularity” : “HOUR”},表示每個Segment任務周期為1小時。

時間窗口(WindowPeriod)表示當前實時任務的時間跨度,對於落在時間窗口內的數據,Druid會將其“加工”成Segment,而任何早於或者晚於該時間窗口的數據都會被丟棄。

Segment粒度與時間窗口都是DruidReal-Time中重要的概念與配置項,因為它們既影響每個索引任務的存活時間,又影響數據停留在Real-TimeNode上的時長。所以,每個索引任務“加工”Segment的最長周期 =SegmentGranularity+WindowPeriod,在實際使用中,官方建議WindowPeriod<= SegmentGranularity,以避免創建大量的實時索引任務。

2 實時索引原理

Druid實時索引過程有三個主要特性:

  1. 主要面向流式數據(Event Stream)的攝取(ingest)與查詢,數據進入Real-TimeNode后可進行即席查詢。

  2. 實時索引面向一個小的時間窗口,落在窗口內的原始數據會被攝取,窗口外的原始數據則會被丟棄,已完成的Segments會被Handoff到HistoricalNode。

  3. 雖然Druid集群內的節點是彼此獨立的,但是整個實時索引過程通過Zookeeper進行協同工作。

實時索引過程可以划分為以下四個階段:

Ingest階段 
Real-TimeNode對於實時流數據,采用LSM-Tree(Log-Structured Merge-Tree )將數據持有在內存中(JVM堆中),優化數據的寫入性能。圖3.29中,Real-TimeNodes在13:37申明服務13:00-14:00這一小時內的所有數據。

Persist階段 
當到達一定閾值(0.9.0版本前,閾值是500萬行或10分鍾,為預防OOM,0.9.0版本后,閾值改為75000行或10分鍾)后,內存中的數據會被轉換為列式存儲物化到磁盤上,為了保證實時窗口內已物化的Smoosh文件依然可以被查詢,Druid使用內存文件映射方式(mmap)將Smoosh文件加載到直接內存 中,優化讀取性能。如圖3.29中所示,13:47、13:57、14:07都是Real-TimeNodes物化數據的時間點。 
這里寫圖片描述

圖3.28描述了Ingest階段與Persist階段內數據流走向以及內存情況。Druid對實時窗口內數據讀寫都做了大量優化,從而保證了實時海量數據的即席可查。

Merge階段 
對於Persist階段,會出現很多Smoosh碎片,小的碎片文件會嚴重影響后期的數據查詢工作,所以在實時索引任務周期的末尾(略少於SegmentGranularity+WindowPeriod時長),每個Real-TimeNode會產生back-groundtask,一方面是等待時間窗口內“掉隊”的數據,另一方面搜索本地磁盤所有已物化的Smoosh文件,並將其拼成Segment,也就是我們最后看到的index.zip。圖3.29中,當到達索引任務末期14:10分時,Real-TimeNodes開始merge磁盤上的所有文件,生成Segment,准備Handoff。

Handoff階段 
本階段主要由CoordinatorNodes負責,CoordinatorNodes會將已完成的Segment信息注冊到元信息庫、上傳DeepStorage,並通知集群內HistoricalNode去加載該Segment,同時每隔一定時間間隔(默認1分鍾)檢查Handoff狀態,如果成功,Real-TimeNode會在Zookeeper中申明已不服務該Segment,並執行下一個時間窗口內的索引任務;如果失敗,CoordinatorNodes會進行反復嘗試。圖3.29中,14:11分完成Handoff工作后,該Real-TimeNode申明不再為此時間窗口內的數據服務,開始下一個時間窗口內的索引任務。

這里寫圖片描述

下面,我們介紹Tranquility-Kafka索引過程與0.9.1.1版本中最新的Kafkaindexingservice索引過程。

3 Tranquility-Kafka

Tranquility是托管在GitHub上的開源Scala Library,主要負責協調實時索引任務中創建Indexing Service Tasks、處理partition、副本、服務發現以及更新schema。在集群中,我們可以啟動多個Tranquility-Kafka實例,所有實例均通過Zookeeper協同處理Indexing Service Tasks。

Tranquility的出現主要是因為Indexing Service API更偏向底層(low-level),就如同Kafka Producer和Consumer在low-level API(Scala API)的基礎上又封裝了high-level API(Java API),供開發者使用。

任務生命周期

Tranquility會為時間窗口內的每一個Segment啟動一個Indexing Service Task,其中Tranquility將數據以POST請求 的方式提交給EventReceiverFirehose(Firehose實現類,默認丟棄所有時間窗口外的數據),當到達任務最大時長(SegmentGranularity+WindowPeriod)時,TimedShutoffFirehose會自動關閉Firehose,此時Segment會進行合並、注冊元信息、存儲到Deep Storage中並等待Handoff,當某個Historical Node聲明自己已加載該Segment后,Indexing Service Task會正常退出。所以,每個Indexing Service Task的生命周期包括SegmentGranularity + WindowPeriod+push to Deep Storage + wait forHandoff。

Schema更新 
Schema更新表示我們增加或減少了原始數據中的維度數或度量數。Tranquility可以自動檢測Schema更新,並保存新老兩份Schema,對於先前創建的任務依然使用老Schema,當到達新的SegmentGranularity時,Tranquility則會使用新Schema攝取數據。

高可用性 
Tranquility的所有操作都是盡最大努力(best-effort),我們可以通過配置多個任務副本保證數據不丟失,但是在某些情況下,數據可能會丟失或出現重復:

  1. 早於或晚於時間窗口,數據一定會被丟棄。

  2. 失敗的Middle Manager數目多於配置的任務副本數,部分數據可能會丟失。

  3. IndexingService內部(Overlord、MiddleManager、Peon)通信長時間丟失,同時重試次數超過最大上限,或者運行周期已經超過了時間窗口,這種情況部分數據也會被丟棄。

  4. Tranquility一直未收到IndexingService的確認請求,那么Tranquility會切換到批量加載模式,數據可能會出現重復。

所以,Druid官方建議,如果使用Tranquility作為Real-TimeNodes,那么可以采用如下解決方案減少數據丟失或者重復的風險,從而保證Druid中數據的exactly-oncesemantics:

  1. 將數據備份到S3或者HDFS等存儲中;

  2. 晚間對備份數據運行Hadoopbatchindexingjob,從而對白天的數據重做Segment。

4 Kafka-Indexing-Service

Druid 0.9.1.1版本中新增了Experimental Features:KafkaIndexingService。之所以會增加這個新的特性,根據Druid官方博客:將Kafka集成進Druid,不僅是看重Kafka的高吞吐量以及高可靠性,同時也因為Kafka可以使流數據下游系統,也就是KafkaConsumer端能夠更好地實現exactly-oncesemantics。我們在使用過Tranquility-Kafka后可知,數據丟失可能不僅是因為集群節點問題,同樣可能是因為數據延遲從而造成沒有落在時間窗口內而“被丟失”。

采用Kafkaindexingservice主要有以下幾方面的考慮:

  1. 每一個進入Kafka的message都是有序、不變的,同時可以通過partition+offset的方式定位,而Druid作為Kafka的Consumer,可以通過該方式rewind到Kafka已存在的buffer中的任意一條message;

  2. Message是由Consumer端,也就是Druid自主地pull進入,而不是被KafkaBrokerpush進集群,push的方式我們知道,接收端無法控制接收速率,容易造成數據過載,而pull的方式Consumer端可以控制ingest速率,從而保證數據有序、穩定地進入Druid;

  3. Message中都包含了partition+offset標簽,這就保證了作為Consumer的Druid可以通過確認機制保證每一條message都被讀取,不會“被丟失”或“被重讀”。

所以,在Kafkaindexingservice中,每一個IndexingServiceTask都對應當前topic的一個partition,每一個partition都有對應的起止offset,那么Druid只需要按照offset順序遍歷讀取該partition中所有的數據即可。同時,在讀取過程中,Druid收到的每條message都會被確認,從而保證所有數據都被有序的讀取,作索引,“加工成”Segment。當到達SegmentGranularity時,當前partition被讀過的offset會被更新到元信息庫的druid_dataSource表中。

KafkaSupervisor 
KafkaSupervisor作為Kafkaindexingservice的監督者,運行在Overlord中,管理Kafka中某個topic對應的Druid中所有Kafkaindexingservicetasks生命周期。在生產環境中,我們通過構造Kafka Supervisor對應的spec文件,以JSON-OVER-HTTP 的方式發送給Overlord節點,Overlord啟動KafkaSupervisor,監控對應的Kafkaindexingservicetasks。

這里寫圖片描述

圖3.30給出了KafkaIndexingService中的數據流以及控制流。我們總結Supervisor的特性如下:

  1. Supervisor啟動后,會啟動最多不超過目標topic中最大partition數目的IndexingServiceTasks;

  2. 負責管理所有Indexing Service Tasks的生命周期,包括每個task的運行狀態、已運行時長(以秒為單位),剩余時長等;

  3. 重新創建失敗任務以及協調下一個SegmentGranularity內新任務的創建工作等;

  4. Overlord的重啟或Leader切換並不會影響Supervisor的工作;

  5. 對於Schema更新,Supervisor首先會自動停止所有以老Schema運行中的任務,發布Segment;然后使用新Schema重新創建Indexing Service Tasks,保證在此過程中沒有messages會被丟失或者重復讀取。

Kafka indexing service在生產環境中的使用說明 
這里寫圖片描述

這里寫圖片描述

在圖3.32中我們可以看到,該目錄下有眾多Kafkaindexingservicetasks子目錄,只有在restore.json文件中記錄的才是目前正在運行中的任務,而剩余的子目錄可能是因為各種原因(任務失敗、實例重啟)而未被刪除的文件夾。我們以第一個任務(index_kafka_XXX_0030af53edaf0a1_mephilnh)為例。

圖3.33中lock表示當前tasklock,task.json是當前索引任務的描述文件,log是當前peon日志信息。圖3.34中,work目錄下只有一層子目錄persist,表示當前索引任務已物化的“Segment碎片”。圖3.35展示了persist目錄下的具體情況,進入persist/${dataSource}_${intervalStart}_${intervalEnd}_${segment_generate_time}/目錄下,我們可以看到四個以數字命名的文件夾,這些文件夾是當前索引任務在實時時間窗口內按照一定規則(時間閾值10分鍾或者行數閾值75000行)物化的索引文件,每個文件夾內都由meta.smoosh、XXXXX.smoosh以及version.bin這三個文件構成,當索引任務運行到SegmentGranularity+WindowPeriod左右,當前文件夾下會生成一個以“merged”命名的文件夾,將所有以數字命名的文件夾下的文件合並歸一為descriptor.json和index.zip,也就是我們所說的Segment,等待publish和handoff。

這里寫圖片描述 
這里寫圖片描述

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM