**## 開始
本教程演示了如何使用 Druid 的 Kafka indexing 服務從 Kafka 流中加載數據至 Druid。
在本教程中,我們假設你已經按照 quickstart 文檔中使用micro-quickstart
單機配置所描述的下載了 Druid,並在本機運行了 Druid。你不需要加載任何數據。
下載並啟動 Kafka
Apache Kafka是一種高吞吐量消息總線,可與 Druid 很好地配合使用。在本教程中,我們將使用 Kafka 2.1.0。在終端運行下面命令下載 Kafka:
curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0
在終端運行下面命令啟動 kafka broker:
./bin/kafka-server-start.sh config/server.properties
運行下面命令創建名為wikipedia
的 topic,我們將向其發送數據:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
向 Kafka 加載數據
為wikipedia
topic 啟動一個 kafka producer,並發送數據。
在 Druid 目錄下,運行下面命令:
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
在 Kafka 目錄下運行下面命令,將{PATH_TO_DRUID}替換成你的 Kafka 路徑:
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
上面命令會向 kakfa 的wikiapedia topic 發送 events。之后,我們將使用 Druid 的 Kafka indexing 服務從 Kafka topic 中提取數據。
通過 data loader 加載數據
導航至 localhost:8080 並單擊控制台頂部的Load data
。
選擇 Apache Kafka
並單擊 Connect data
.
輸入 bootstrap:localhost:9092
和 topic:wikipedia
。
單擊Preview
並確定你看到的數據正確。
找到數據后,可以單擊"Next: Parse data"進入下一步。
data loader 會嘗試自動選擇正確的數據解析器。在本示例中,將選擇json
解析器。你可以嘗試選擇其他解析器,看看 Druid 是如何解析數據的。
選擇json
解析器,點擊Next: Parse time
進入下一步,來確定 timestamp 列。
Druid 需要一個主 timestamp 列(內部將存儲在__time 列)。如果你的數據中沒有 timestamp 列,選擇Constant value
。在我們的示例中,將選擇time
列,因為它是數據之中唯一可以作為主時間列的候選者。
單擊Next: ...
兩次以跳過Transform
和Filter
步驟。
您無需在這些步驟中輸入任何內容,因為應用提取數據的時間變換和過濾器不在本教程范圍內。
在Configure schema
步驟中,你可以配置哪些維度和指標可以攝入 Druid。這是數據被攝入 Druid 后呈現的樣子。由於我們的數據集比較小,點擊Rollup
開關關閉 rollup 功能。
對 schema 配置滿意后,單擊Next
進入Partition
步驟,以調整數據至 segment 的分區。
在這里,您可以調整如何在 Druid 中將數據拆分為多個段。由於這是一個很小的數據集,因此在此步驟中無需進行任何調整。
單擊Tune
步驟后,進入發布步驟。
在Publish
步驟中,我們可以指定 Druid 中的數據源名稱。我們將此數據源命名為wikipedia
。最后,單擊Next
以查看 spec。
這是你構建的 spec。嘗試隨意返回並在之前的步驟中進行更改,以查看變動將如何更新 spec。同樣,你也可以直接編輯 spec,並在前面的步驟中看到它。
對 spec 滿意后,點擊Submit
創建攝取任務。
你將進入任務視圖,重點關注新創建的任務。任務視圖設置為自動刷新,等待任務成功。
當一項任務成功完成時,意味着它建立了一個或多個 segment,這些 segment 將由數據服務器接收。
Datasources
從標題導航到視圖。
等待直到你的數據源(wikipedia
)出現。加載 segment 時可能需要幾秒鍾。
一旦看到綠色(完全可用)圓圈,就可以查詢數據源。此時,你可以轉到Query
視圖以對數據源運行 SQL 查詢。
運行SELECT * FROM "wikipedia"
查詢以查看結果。
通過控制台提交 supervisor
在控制台中,單擊Submit supervisor
打開提交 supervisor 窗口。
粘貼以下 spec 並點擊提交:
{
"type": "kafka",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"inputFormat": {
"type": "json"
},
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
這將啟動 supervisor,並分化出 task 監聽數據流入。
直接提交 supervisor
為了直接啟動服務,我們需要在 Druid 包根目錄下運行下面命令提交一個 supervisor spec 給 Druid overlord:
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
如果 supervisor 成功創建,你將得到一個包含 supervisor ID 的響應。在我們的示例中,將返回{"id":"wikipedia"}
。
你可以在控制台中查看當前 supervisor 和 tasks: http://localhost:8888/unified-console.html#tasks.
查詢數據
當數據發送給 Kafka stream 后,立刻就可以查詢數據。
本文翻譯自 Druid 官方文檔
請關注我們。一起學習 Druid 知識。
**