實時數據攝入
我們采用Kafka Indexing Service
作為實時攝入數據的方案。
准備工作
- 將數據實時灌入某個Kafka topic中
- 與批量導入數據類似:考慮清楚數據中哪一列可以作為時間列、哪些列可以作為維度列、哪些列可以作為指標列(尤其是指標的聚合函數,包括
count
、sum
、max
、min
等,如果涉及UV、留存的計算,則需要使用HyperUnique
或者Theta sketch
) - 考慮最小時間粒度(即
queryGranularity
)和數據分片的時間粒度(即segmentGranularity
),在我們的使用經驗中最小時間粒度應該根據業務需求確定,而數據分片的時間粒度設為HOUR
即可
提交攝取任務
官方提供的數據攝取JSON如下,可以以此為模版修改(用#開頭的是我們添加的注釋,正式提交的時候請將注釋刪除,否則不是合法JSON文件):
{ "type": "kafka", #注意這里的作業類型,與批量導入時不一樣的 "dataSchema": { "dataSource": "metrics-kafka", #導入druid之后的datasource名字,最好是可以識別團隊的前綴 "parser": { "type": "string", "parseSpec": { "format": "json", #原始數據的格式,可以是JSON、CSV、TSV "timestampSpec": { #指定導入數據的時間列以及時間格式 "column": "timestamp", "format": "auto" }, "dimensionsSpec": { #在此指定導入數據的維度 "dimensions": [ #在此指定導入數據的維度 "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew" ] } } }, "metricsSpec": [ #指定導入數據的指標列,以及各指標列的聚合函數 { "name": "count", "type": "count" }, { "name": "value_sum", "fieldName": "value", #fieldName是原始數據中的列名,name是在druid中的列名,可以不同 "type": "doubleSum" }, { "name": "value_min", "fieldName": "value", "type": "doubleMin" }, { "name": "value_max", "fieldName": "value", "type": "doubleMax" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", #數據分片的時間粒度 "queryGranularity": "NONE" #最小的查詢時間粒度, None則為毫秒級 } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000 }, "ioConfig": { "topic": "metrics", #指定Kafka中的topic名 "consumerProperties": { "bootstrap.servers": "localhost:9092", #指定Kafka的broker列表 "group.id": "druidxx" # 可以指定一個消費kafka的身份,需要注意的是不能有兩個druid作業以同一個身份消費同一個topic }, "taskCount": 1, #task作業並發數 "replicas": 1, #task作業的副本數 "taskDuration": "PT1H" #單個task進程運行的時間 } }
提交命令
curl -X 'POST' -H 'Content-Type:application/json' -d @my-index-task.json OVERLORD_IP:8090/druid/indexer/v1/supervisor
停止消費Kafka
curl -X 'POST' -H 'Content-Type:application/json' OVERLORD_IP:8090/druid/indexer/v1/supervisor/datasource名/shutdown
數據查詢
建議在首次查詢之前,向Broker提交TimeBoundary
查詢,方便掌握Druid中數據的時間分布
{ "queryType" : "timeBoundary", "dataSource": "sample_datasource" #datasource名字 }