1、簡介
Apache Druid是針對時間序列數據提供的低延時數據寫入以及快速交互式查詢的分布式OLAP數據庫。其兩大關鍵點是:首先,Druid主要針對時間序列數據提供低延時數據寫入和快速聚合查詢;其次,Druid是一款分布式OLAP引擎。
2、主要特性
Druid是一個用於大數據實時查詢和分析的高容錯、高性能開源分布式系統,旨在快速處理大規模的數據,並能夠實現快速查詢和分析。分析和存儲系統,提供極具成本效益並且永遠在線的實時數據攝取和任意數據處理。
為分析而設計——Druid是為OLAP工作流的探索性分析而構建。它支持各種filter、aggregator和查詢類型,並為添加新功能提供了一個框架。用戶已經利用Druid的基礎設施開發了高級K查詢和直方圖功能。
交互式查詢——Druid的低延遲數據攝取架構允許事件在它們創建后毫秒內查詢,因為Druid的查詢延時通過只讀取和掃描有必要的元素被優化。Aggregate和 filter沒有坐等結果。
高可用性——Druid是用來支持需要一直在線的SaaS的實現。你的數據在系統更新時依然可用、可查詢。規模的擴大和縮小不會造成數據丟失。
可伸縮——現有的Druid部署每天處理數十億事件和TB級數據。Druid被設計成PB級別。
3.、Druid使用場景,優缺點
應用場景:Druid應用最多的如廣告分析、互聯網廣告系統監控以及網絡監控等。當業務中出現以下情況時,Druid是一個很好的技術方案選擇。
Druid 常見應用的領域:
- 網頁點擊流分析
- 網絡流量分析
- 監控系統、APM
- 數據運營和營銷
- BI分析/OLAP
什么時候需要使用Druid
如果你的case滿足下面一些特征那么Druid應該是一個好的選擇
a)具有大量數據時,如每天數億事件的新增、每天數10T數據的增加;
b)需要一個高可用、高容錯、高性能數據庫時。
c)插入數據的頻次非常高,但是修改非常少
d)你的意願是希望查詢延遲在100ms到幾秒之間
e)你的數據有時間的屬性(Druid包含一些特殊的設計和優化對於時間序列)
f)你可能有不止一個表,而且每個查詢僅命中一些大的分布式的表.查詢可能也會命中不止一個小的lookup表.(就像儲存在內存里的一個小字典,可以不用的)
g)你需要在一些高基數的列上面(比如URLS,user IDs)做一些快速的計算和排序
h)你需要從Kafka,HDFS,flat files或者對象存儲比如Amszon S3上加載數據
下面一些情況你可能不太適合用Druid
a)你需要對已經存在的記錄利用主鍵進行低延遲的更新操作.Druid支持流式插入,但是不是更新(一般用后台的批處理任務來進行更新)
b)你正在構建一個線下的報表系統而且對查詢延遲不是非常在意
c)你想做一些大的表的關聯(比如連接大的事實表和另外一個大的事實表).
4、 數據存儲
Realtime Node為LSM-Tree架構,與Hbase實現不同,Druid未提供WAL功能,犧牲數據可靠性,換寫入速度。
實時數據到達Realtime Node后,被直接加載到堆緩沖區(Hbase memtable -> RB-Tree),當堆緩沖區大小達到閾值,數據被沖洗到磁盤,形成一個Segment(SSTable),同時Realtime Node立即將新生成的Segment加載到內存非堆區。堆區與非堆區都可以被Broker Node查詢。
同時,Realtime Node周期性掃描磁盤Segment,將同一時間段生成的所有Segment合並為一個大的Segment。這個過程叫Segment Merge(相當於hbase中的compaction)。合並完的Segment被Realtime Node上傳到DeepStorage,隨后Coordinator Node通知一個Historical Node去Deepstorage將新生成的Segment下載到本地,並加載到內存(盡可能使用內存,不足時LRU淘汰),成功加載后,Historical Node通知Coordinator Node,聲明其接管這個Segment的查詢操作,Realtime Node收到聲明后,也向Coordinator Node聲明,其不再提供此Segment查詢。
Druid有着自己的數據存儲的邏輯和格式(主要是DataSource和Segment)
DataSource包含以下內容:
時間列(TimeStamp): 表明每行數據的時間值,默認使用 UTC 時間格式且精確到毫秒級別。這個列是數據聚合與范圍查詢的重要維度。
維度列(Dimension):維度來自於OLAP的概念,用來標識數據行的各個類別信息。
指標列(Metric):指標對應於OLAP概念中的Fact,是用於聚合和計算的列這些指標列通常是一些數字,計算操作通常包括 Count、Sum 和 Mean 等。
無論是實時數據消費還是批量數據處理,Druid 在基於 DataSource 結構存儲數據時即可選擇對任意的指標列進行聚合(Roll Up)操作。
相對於其他時序數據庫,Druid 在數據存儲時便可對數據進行聚合操作是其一大特點, 並且該特點使得 Druid 不僅能夠節省存儲空間,而且能夠提高聚合查詢的效率。
DataSource 是一個邏輯概念,Segment 卻是數據的實際物理存儲格式,Druid 正是通過 Segment 實現了對數據的橫縱向切割(Slice and Dice)操作。
從數據按時間分布的角度來看, 通過參數 segmentGranularity 的設置,Druid 將不同時間范圍內的數據存儲在不同的 Segment 數據塊中,這便是所謂的數據橫向切割。
這種設計為 Druid 帶來一個顯而易見的優點:
按時間范圍查詢數據時,僅需要訪問對應時間段內的這些 Segment 數據塊,而不需要進行全表數據范圍查詢,這使效率得到了極大的提高。
同時,在 Segment 中也面向列進行數據壓縮存儲,這便是所謂的數據縱向切割。而且對Segment 中的維度列使用了 Bitmap 技術對其數據的訪問進行了優化。
其中,Druid會為每一維度列存儲所有列值、創建字典(用來存儲所有列值對應的ID)以及為每一個列值創建其bitmap索引以幫助快速定位哪些行擁有該列值。
6、Apache Druid查詢速度快原因
數據的預聚合
Druid 可以按照給定的時間粒度和所有維度列,進行最細粒度的指標聚合運算,並加以保存為原始數據。
列式存儲
對部分列進行查詢時可以顯著提高效率。
Bitmap 索引
利用位圖對所有維度列構建索引,可以快速定位數據行。
mmap
通過內存映射文件的方式加快對於 Segment 的訪問。
查詢結果的中間緩存
支持對於查詢級別和 segment 級別的緩存。
7、Druid的數據源和分段
Druid的數據存儲在"DataSource"中,這其實類似於傳統的RDBMS中的表.每一個數據源按照時間進行分段,當然你還可以選擇其他屬性進行分段.每一個時間區間被稱為一個"Chunk".(舉個列子,一天的時間區間的Chunk,如果你的數據源是按天進行分段的).在一個Chunk內,數據被分成一個或者多個"segments".每個segment是一個單獨的文件,它由數以百萬的數據行構成.因為segment是組織在時間chunk里的,所以按照時間曲線有助於理解segments,像下面這樣的
一個數據源剛開始由幾個segments一直擴展到幾百幾千甚至上百萬個segments.每個segment的生命周期始於被MiddleManager創建,這個時候segment是可變的沒有被提交的.一個segment的構建包含以下列出來的步驟,這種設計是為了滿足一個可以支持壓縮並可以被快速查詢的文件格式
- 轉換成列式存儲格式
- 利用bitmap建立索引
- 利用多種算法進行壓縮
segments會周期性的提交和發布.此時它會被寫入deep storage然后狀態改為不可變的.隨后它會被從MiddleManager移動到Historical進程中去.與此同時關於這個segment的一個條目也會被寫入元數據存儲.這個條目是描述該segment的元數據,包含segment的schema,大小,以及它在deep storage上的存儲位置.所有這些類似的條目都會被Coordinator用來尋找對應的數據是否在集群上是可用狀態的。
8、Druid的數據采集格式
Druid可以采集非標准化的數據諸如JSON,CSV或者以某種分隔符隔開的TSV格式,當然還支持自定義格式.雖然大部分的文檔使用JSON格式,但是通過druid來配置支持其他的限定格式也不是很難。
當前支持的格式化數據
- 列表項
JSON
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
CSV
2013-08-31T01:02:33Z,"Gypsy Danger","en","nuclear", "true","true","false","false","article","North America", "United States","Bay Area","San Francisco",57,200,-143
TSV
2013-08-31T01:02:33Z "Gypsy Danger" "en" "nuclear" "true" "true" "false" "false" "article" "North America" "United States" "Bay Area" "San Francisco" 57 200 -143
需要注意的是CSV,TSV不能包含列頭,這點在數據采集的時候一定要注意
自定義格式
Druid支持使用正則解析和JavaScript來自定義數據格式.但是這種方式並沒有自己實現的Java解析器或者額外的流式處理工具效率更高。
配置數據采集的schema
什么是data schema?其實就是Druid的index數據攝取任務需要的數據源的描述的元數據.它主要描述要采集的數據類型,數據由哪些列構成,哪些是指標列,哪些是維度列,時間的粒度等。
以CSV格式舉例
"parseSpec": { "format" : "csv", "timestampSpec" : { "column" : "timestamp" }, "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"], "dimensionsSpec" : { "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"] }}
parseSpec指明了數據源格式,這里是format中表明是CSV格式,然后說明時間戳字段名是timestamp,數據字段名是columns里面那一堆,dimensionsSpec則代表哪些字段可以作為維度。
實例參考:https://my.oschina.net/u/2460844/blog/649466
9、數據加載方式
a)本地批處理攝取文件
具體操作請參考:https://druid.apache.org/docs/latest/tutorials/tutorial-batch.html
b)從Apache Kafka加載流數據
具體操作請參考:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html
c) 使用Apache Hadoop加載文件
具體操作請參考: https://druid.apache.org/docs/latest/tutorials/tutorial-batch-hadoop.html
流式數據源 (實時導入)
實時數據首先會被直接加載到實時節點內存中的堆結構換從去,當條件滿足時,緩存區里的數據會被寫到硬盤上行程一個數據塊(Segment),同事實時節點又會立即將新生成的數據塊加載到內存中的非堆區,因此無論是堆緩存區還是非堆區里的數據,都能被查詢節點(Broker Node)查詢;
同時實時節點會周期性的將磁盤上同一時間段內生成的數據塊合並成一個大的Segment,這個過程在實時節點中的操作叫做Segment Merge,合並后的大Segment會被實時節點上傳到數據文件存儲(DeepStorage)中,上傳到DeepStorage后,協調節點(Coordination Node)會指定一個歷史節點(Historical Node)去文件存儲庫將剛剛上傳的Segment下載到本地磁盤,該歷史節點加載成功后,會通過協調節點(Coordination Node)在集群中聲明該Segment可以提供查詢了,當實時節點收到這條聲明后會立即向集群聲明實時節點不再提供該Segment的查詢,而對於全局數據來說,查詢節點(Broker Node)會同時從實時節點與歷史節點分別查詢,對結果整合后返回用戶。
靜態數據源(離線數據導入)
首先通過Indexing Service提交Indexing作業,將輸入數據轉換為Segments文件。然后需要一個計算單元來處理每個Segment的數據分析,這個計算單元就是Druid中的 歷史節點(Historical Node),
Historical Node是Druid最主要的計算節點,用於處理對Segments的查詢。在能夠服務Segment之前, Historical Node需要首先將Segment從HDFS、S3、本地文件等下載到本地磁盤,然后將Segment數據文件mmap到進程的地址空間。
Historical Node采用了Shared-Nothing架構,狀態信息記錄在Zookeeper中,可以很容易地進行伸縮。 Historical Node在Zookeeper中宣布自己和所服務的Segments,也通過Zookeeper接收加載/丟棄Segment的命令。
最后,由於Segment的不可變性,可以通過復制Segment到多個 Historical Node來實現容錯和負載均衡,這也體現了druid的擴展性和高可用。
10、數據導入流程說明
Parse data——確定正確的解析器。在這種情況下,它將成功確定json
。隨意使用不同的解析器選項來預覽Druid如何解析您的數據。
Parse time——要一個主時間戳列(內部存儲在名為的列中__time
)。如果您的數據中沒有時間戳,請選擇Constant value
。在我們的示例中,數據加載器將確定time
原始數據中的列是可用作主時間列的唯一候選者。
Configure schema——您可以配置將哪些維度和指標攝入到Druid中。數據一經攝取,就會在Druid中顯示出來。由於我們的數據集非常小,因此請繼續並Rollup
通過單擊開關並確認更改將其關閉。
Partition——您可以調整如何在Druid中將數據拆分為多個段。由於這是一個很小的數據集,因此在此步驟中無需進行任何調整。
Tune——在Tune
步驟是非常重要的設置Use earliest offset
到True
,因為我們要從流的開始消耗數據。沒有其他需要更改的內容,因此請單擊Next: Publish
以轉到Publish
步驟。
Publish——發布
Edit JSON spec——可以隨意返回並在之前的步驟中進行更改,以查看更改將如何更新規范。同樣,您也可以直接編輯規范,並在前面的步驟中看到它。
11、歷史數據更新
數據不能作更改操作,想要做更新需要通過hadoop任務整體覆蓋
12、Druid連接JAVA
1)pom中引入
<dependency> <groupId>org.apache.calcite.avatica</groupId> <artifactId>avatica-core</artifactId> <version>1.15.0</version> </dependency>
2)連接
public class DruidJdbcUtil { public static AvaticaConnection connection() throws SQLException{ String urlStr = "jdbc:avatica:remote:url="【http://ip:8082】/druid/v2/sql/avatica/"; Properties connectionProperties = new Properties(); AvaticaConnection connection = DriverManager.getConnection(urlStr, connectionProperties); AvaticaStatement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { String aa = resultSet.getString("aaa); ...... } } }
13、注意事項
1)Druid適用於寬表,不用Join方式
2)時序化數據:Druid 可以理解為時序數據庫,所有的數據必須有時間字段。
3)實時數據接入可容忍丟數據(tranquility):目前 tranquility 有丟數據的風險,所以建議實時和離線一起用,實時接當天數據,離線第二天把今天的數據全部覆蓋,保證數據完備性。
14、druid 丟失數據有幾個情況
1、peon讀取了加載數據, 但寫不到深度儲存里, 那這些數據只能在短暫時間內查詢得到, 但過了maxTime, 就會被丟棄了。
2、深度儲存丟失數據,
3、meta db(元數據,MySQL和PostgreSQL是更適合生產的元數據存儲) 丟失數據。
而這些條件,都在druid的控制范圍之外,所以說,druid自己不會丟失數據