Druid:一個用於大數據實時處理的開源分布式系統


Druid是一個用於大數據實時查詢和分析的高容錯、高性能開源分布式系統,旨在快速處理大規模的數據,並能夠實現快速查詢和分析。尤其是當發生代碼部署、機器故障以及其他產品系統遇到宕機等情況時,Druid仍能夠保持100%正常運行。創建Druid的最初意圖主要是為了解決查詢延遲問題,當時試圖使用Hadoop來實現交互式查詢分析,但是很難滿足實時分析的需要。而Druid提供了以交互方式訪問數據的能力,並權衡了查詢的靈活性和性能而采取了特殊的存儲格式。

Druid功能介於PowerDrillDremel之間,它幾乎實現了Dremel的所有功能,並且從PowerDrill吸收一些有趣的數據格式。Druid允許以類似Dremel和PowerDrill的方式進行單表查詢,同時還增加了一些新特性,如為局部嵌套數據結構提供列式存儲格式、為快速過濾做索引、實時攝取和查詢、高容錯的分布式體系架構等。從官方得知,Druid的具有以下主要特征:

  • 為分析而設計——Druid是為OLAP工作流的探索性分析而構建,它支持各種過濾、聚合和查詢等類;
  • 快速的交互式查詢——Druid的低延遲數據攝取架構允許事件在它們創建后毫秒內可被查詢到;
  • 高可用性——Druid的數據在系統更新時依然可用,規模的擴大和縮小都不會造成數據丟失;
  • 可擴展——Druid已實現每天能夠處理數十億事件和TB級數據。

Druid應用最多的是類似於廣告分析創業公司Metamarkets中的應用場景,如廣告分析、互聯網廣告系統監控以及網絡監控等。當業務中出現以下情況時,Druid是一個很好的技術方案選擇:

  • 需要交互式聚合和快速探究大量數據時;
  • 需要實時查詢分析時;
  • 具有大量數據時,如每天數億事件的新增、每天數10T數據的增加;
  • 對數據尤其是大數據進行實時分析時;
  • 需要一個高可用、高容錯、高性能數據庫時。

 

Historical節點  :對非實時數據進行處理存儲和查詢

Realtime節:實時攝取數據、監聽輸入數據流

Coordinator節點:監控Historical節點

Broker節點:接收來自外部客戶端的查詢和將查詢轉發到Realtime和Historical節點

Indexer節點:負責索引服務

一個Druid集群有各種類型的節點(Node)組成,每個節點都可以很好的處理一些的事情,這些節點包括對非實時數據進行處理存儲和查詢的Historical節點、實時攝取數據、監聽輸入數據流的Realtime節、監控Historical節點的Coordinator節點、接收來自外部客戶端的查詢和將查詢轉發到Realtime和Historical節點的Broker節點、負責索引服務的Indexer節點

 

 

查詢操作中數據流和各個節點的關系如下圖所示:

如下圖是Druid集群的管理層架構,該圖展示了相關節點和集群管理所依賴的其他組件(如負責服務發現的ZooKeeper集群)的關系:

 

 

 

 

一、Druid簡介
二、Druid架構組成及相關依賴
三、Druid集群配置
四、Druid集群啟動
五、Druid查詢
六、后記

一、Druid簡介

Druid是一個為大型冷數據集上實時探索查詢而設計的開源數據分析和存儲系統,提供極具成本效益並且永遠在線的實時數據攝取和任意數據處理。

主要特性:

  • 為分析而設計——Druid是為OLAP工作流的探索性分析而構建。它支持各種filter、aggregator和查詢類型,並為添加新功能提供了一個框架。用戶已經利用Druid的基礎設施開發了高級K查詢和直方圖功能。
  • 交互式查詢——Druid的低延遲數據攝取架構允許事件在它們創建后毫秒內查詢,因為Druid的查詢延時通過只讀取和掃描有必要的元素被優化。Aggregate和 filter沒有坐等結果。
  • 高可用性——Druid是用來支持需要一直在線的SaaS的實現。你的數據在系統更新時依然可用、可查詢。規模的擴大和縮小不會造成數據丟失。
  • 可伸縮——現有的Druid部署每天處理數十億事件和TB級數據。Druid被設計成PB級別。

就系統而言,Druid功能位於PowerDrill和Dremel之間。它實現幾乎所有Dremel提供的工具(Dremel處理任意嵌套數據結構,而Druid只允許一個基於數組的嵌套級別)並且從PowerDrill吸收一些有趣的數據格式和壓縮方法。

Druid對於需要實時單一、海量數據流攝取產品非常適合。特別是如果你面向無停機操作時,如果你對查詢查詢的靈活性和原始數據訪問要求,高於對速度和無停機操作,Druid可能不是正確的解決方案。在談到查詢速度時候,很有必要澄清“快速”的意思是:Druid是完全有可能在6TB的數據集上實現秒級查詢。

二、Druid架構組成及其他依賴

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

2.1 Overlord Node (Indexing Service)

Overlord會形成一個加載批處理和實時數據到系統中的集群,同時會對存儲在系統中的數據變更(也稱為索引服務)做出響應。另外,還包含了Middle Manager和Peons,一個Peon負責執行單個task,而Middle Manager負責管理這些Peons。

2.2 Coordinator Node

監控Historical節點組,以確保數據可用、可復制,並且在一般的“最佳”配置。它們通過從MySQL讀取數據段的元數據信息,來決定哪些數據段應該在集群中被加載,使用Zookeeper來確定哪個Historical節點存在,並且創建Zookeeper條目告訴Historical節點加載和刪除新數據段。

2.3 Historical Node

是對“historical”數據(非實時)進行處理存儲和查詢的地方。Historical節點響應從Broker節點發來的查詢,並將結果返回給broker節點。它們在Zookeeper的管理下提供服務,並使用Zookeeper監視信號加載或刪除新數據段。

2.4 Broker Node

接收來自外部客戶端的查詢,並將這些查詢轉發到Realtime和Historical節點。當Broker節點收到結果,它們將合並這些結果並將它們返回給調用者。由於了解拓撲,Broker節點使用Zookeeper來確定哪些Realtime和Historical節點的存在。

2.5 Real-time Node

實時攝取數據,它們負責監聽輸入數據流並讓其在內部的Druid系統立即獲取,Realtime節點同樣只響應broker節點的查詢請求,返回查詢結果到broker節點。舊數據會被從Realtime節點轉存至Historical節點。

2.6 ZooKeeper

為集群服務發現和維持當前的數據拓撲而服務;

2.7 MySQL

用來維持系統服務所需的數據段的元數據;

2.8 Deep Storage

保存“冷數據”,可以使用HDFS。

 

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

三、Druid集群配置

3.1 環境信息

我這里有兩台機器,node1有32G內存,上面部署了Histotical Node和Coordinator Node;node2有72G內存,上面部署了其他四個服務。

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

3.2 通用配置(Common Configuration)

##創建MySQL數據庫

CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;

##配置文件

cd $DRUID_HOME/config/_common
vi common.runtime.properties(所有節點)

##使用Mysql存儲元數據
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight", "io.druid.extensions:mysql-metadata-storage"]

##zookeeper
druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181

##Mysql配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd1234

##配置deep storage到HDFS
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://cdh5/tmp/druid/storage

##配置查詢緩存,暫用本地,可配置memcached
druid.cache.type=local
druid.cache.sizeInBytes=10737418240

##配置監控
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]

##配置Indexing service的名字
druid.selectors.indexing.serviceName=druid/overlord

##
druid.emitter=logging

3.3 Overlord Node(Indexing Service)

在運行Overlord Node節點上:

cd $DRUID_HOME/config/overlord
vi runtime.properties

druid.host=node2
druid.port=8090
druid.service=druid/overlord

# Only required if you are autoscaling middle managers
druid.indexer.autoscale.doAutoscale=true
druid.indexer.autoscale.strategy=ec2
druid.indexer.autoscale.workerIdleTimeout=PT90m
druid.indexer.autoscale.terminatePeriod=PT5M
druid.indexer.autoscale.workerVersion=0

# Upload all task logs to deep storage
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

# Run in remote mode
druid.indexer.runner.type=remote
druid.indexer.runner.minWorkerVersion=0

# Store all task state in the metadata storage
druid.indexer.storage.type=metadata

3.4 MiddleManager Node

在運行MiddleManager Node節點上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties

druid.host=node2
druid.port=8091
druid.service=druid/middlemanager

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

# Resources for peons
druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
druid.indexer.task.baseTaskDir=/tmp/persistent/task/

3.5 Coordinator Node

在運行Coordinator Node節點上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties

druid.host=node1
druid.port=8081
druid.service=coordinator

druid.coordinator.startDelay=PT5M

3.6 Historical Node

在運行Historical Node節點上:
cd $DRUID_HOME/config/historical
vi runtime.properties

druid.host=node1
druid.port=8082
druid.service=druid/historical

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=9

druid.server.http.numThreads=9
druid.server.maxSize=300000000000

druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}]

druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

3.7 Broker Node

在運行Broker Node節點上:
cd $DRUID_HOME/config/broker
vi runtime.properties

druid.host=node2
druid.port=8092
druid.service=druid/broker

druid.broker.http.numConnections=20
druid.broker.http.readTimeout=PT5M

druid.processing.buffer.sizeBytes=2147483647
druid.processing.numThreads=11

druid.server.http.numThreads=20

3.8 Real-time Node

在運行Real-time Node節點上:
cd $DRUID_HOME/config/realtime
vi runtime.properties

druid.host=node2
druid.port=8093
druid.service=druid/realtime

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=5

# Override emitter to print logs about events ingested, rejected, etc
druid.emitter=logging

druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

四、Druid集群啟動

首次啟動時候,可以遵循下面的啟動順序。

4.1 Broker Node

cd $DRUID_HOME/
cp run_druid_server.sh run_broker.sh
vi run_broker.sh

替換以下內容:

SERVER_TYPE=broker

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms5g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=24g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17071 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行./run_broker.sh啟動Broker Node:

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

4.2 Historical Node

cd $DRUID_HOME/
cp run_druid_server.sh run_historical.sh

vi run_historical.sh

替換以下內容:

SERVER_TYPE=historical

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=16g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行命令./run_historical.sh啟動Historical Node:

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

4.3 Coordinator Node

cd $DRUID_HOME/
cp run_druid_server.sh run_coordinator.sh
vi run_coordinator.sh

替換以下內容:

SERVER_TYPE=coordinator

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行命令./run_coordinator.sh啟動Coordinator Node.

4.4 Middle Manager

cd $DRUID_HOME/
cp run_druid_server.sh run_middleManager.sh
vi run_middleManager.sh

替換以下內容:

SERVER_TYPE=middleManager
# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid -Ddruid.extensions.localR
epository=${MAVEN_DIR}"

執行命令./run_middleManager.sh啟動MiddleManager Node。

4.5 Overlord Node

cd $DRUID_HOME/
cp run_druid_server.sh run_overlord.sh
vi run_overlord.sh

替換以下內容:

SERVER_TYPE=overlord
# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

執行命令./run_overlord.sh啟動Overlord Node:

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

4.6 Real-time Node

cd $DRUID_HOME/
cp run_druid_server.sh run_realtime.sh
vi run_realtime.sh
替換以下內容:

SERVER_TYPE=realtime

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx13g -Xms13g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=9g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -
XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17072 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremot
e.ssl=false"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

##特別需要注意參數:

-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec

啟動RealTime Node需要指定一個realtime數據源的配置文件,本文中使用example提供的wikipedia_realtime.spec,啟動后,該數據源從irc.wikimedia.org獲取實時數據。

關於RealTime Node的配置,后續文章將會詳細介紹。

執行命令./run_realtime.sh啟動RealTime Node。

五、Druid查詢

第四部分中啟動RealTime Node時候使用了例子中自帶的配置文件wikipedia_realtime.spec,啟動后,該RealTime Node會從irc.wikimedia.org獲取實時數據,本章將以該數據源為例,學習幾種最常見的查詢。

5.1 select查詢

首先編輯查詢配置文件select_query.json

{
   "queryType": "select",
   "dataSource": "wikipedia",
   "dimensions":[],
   "metrics":[],
   "granularity": "all",
   "intervals": [
     "2015-11-01/2015-11-20"
   ],
   "pagingSpec":{"pagingIdentifiers": {}, "threshold":10}
 }

該配置文件的含義是從數據源”wikipedia”進行select查詢所有列,時間區間為2015-11-01/2015-11-20,每10條記錄一個分頁。

執行命令查詢:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @select_query.json

瞬間返回結果:

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

5.2 基於時間序列的查詢Timeseries query

編輯查詢配置文件timeseries.json

{
    "queryType": "timeseries",
    "dataSource": "wikipedia",
    "intervals": [ "2010-01-01/2020-01-01" ],
    "granularity": "minute",
    "aggregations": [
        {"type": "longSum", "fieldName": "count", "name": "edit_count"},
        {"type": "doubleSum", "fieldName": "added", "name": "chars_added"}
    ]
}

該配置文件的含義是:從數據源” wikipedia”中進行時間序列查詢,區間為2010-01-01/2020-01-01,按分鍾匯總結果,匯總字段為count和added;

執行查詢命令:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @timeseries.json

同樣瞬間返回結果:

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

5.3 TopN查詢

編輯查詢文件topn.json

{
  "queryType": "topN",
  "dataSource": "wikipedia",
  "granularity": "all",
  "dimension": "page",
  "metric": "edit_count",
  "threshold" : 10,
  "aggregations": [
    {"type": "longSum", "fieldName": "count", "name": "edit_count"}
  ],
  "filter": { "type": "selector", "dimension": "country", "value": "United States" },
  "intervals": ["2012-10-01T00:00/2020-01-01T00"]
}

該文件含義是:從數據源” wikipedia”進行TopN查詢,其中N=10,維度為page,指標為edit_count,也就是,在page維度上將edit_count匯總后取Top 10.

執行查詢命令:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @topn.json

結果為:

海量數據實時OLAP分析系統-Druid.io安裝配置和體驗

六、后記

Druid目前已經有很多公司用於實時計算和實時OLAP,而且效果很好。雖然它的配置和查詢都比較復雜和繁瑣,但如果是真正基於海量數據的實時OLAP,它的威力還是很強大的。我將持續學習和分享Druid的相關技術,驗證它在海量數據實時OLAP上的效果,敬請關注我的博客

參考文章:

http://druid.io

http://www.csdn.net/article/2014-10-30/2822381/2

 

 

 

 

 

參考 http://www.infoq.com/cn/news/2015/04/druid-data/

http://druid.io/

http://www.open-open.com/lib/view/open1447852962978.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM