Druid:通過 Kafka 加載流數據


**## 開始

本教程演示了如何使用 Druid 的 Kafka indexing 服務從 Kafka 流中加載數據至 Druid。

在本教程中,我們假設你已經按照 quickstart 文檔中使用micro-quickstart單機配置所描述的下載了 Druid,並在本機運行了 Druid。你不需要加載任何數據。

下載並啟動 Kafka

Apache Kafka是一種高吞吐量消息總線,可與 Druid 很好地配合使用。在本教程中,我們將使用 Kafka 2.1.0。在終端運行下面命令下載 Kafka:

curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0

在終端運行下面命令啟動 kafka broker:

./bin/kafka-server-start.sh config/server.properties

運行下面命令創建名為wikipedia的 topic,我們將向其發送數據:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

向 Kafka 加載數據

wikipedia topic 啟動一個 kafka producer,並發送數據。

在 Druid 目錄下,運行下面命令:

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

在 Kafka 目錄下運行下面命令,將{PATH_TO_DRUID}替換成你的 Kafka 路徑:

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

上面命令會向 kakfa 的wikiapedia topic 發送 events。之后,我們將使用 Druid 的 Kafka indexing 服務從 Kafka topic 中提取數據。

通過 data loader 加載數據

導航至 localhost:8080 並單擊控制台頂部的Load data

選擇 Apache Kafka 並單擊 Connect data.

輸入 bootstrap:localhost:9092和 topic:wikipedia

單擊Preview並確定你看到的數據正確。

找到數據后,可以單擊"Next: Parse data"進入下一步。

data loader 會嘗試自動選擇正確的數據解析器。在本示例中,將選擇json解析器。你可以嘗試選擇其他解析器,看看 Druid 是如何解析數據的。

選擇json解析器,點擊Next: Parse time進入下一步,來確定 timestamp 列。

Druid 需要一個主 timestamp 列(內部將存儲在__time 列)。如果你的數據中沒有 timestamp 列,選擇Constant value。在我們的示例中,將選擇time列,因為它是數據之中唯一可以作為主時間列的候選者。

單擊Next: ...兩次以跳過TransformFilter步驟。

您無需在這些步驟中輸入任何內容,因為應用提取數據的時間變換和過濾器不在本教程范圍內。

Configure schema步驟中,你可以配置哪些維度和指標可以攝入 Druid。這是數據被攝入 Druid 后呈現的樣子。由於我們的數據集比較小,點擊Rollup開關關閉 rollup 功能。

對 schema 配置滿意后,單擊Next進入Partition步驟,以調整數據至 segment 的分區。

在這里,您可以調整如何在 Druid 中將數據拆分為多個段。由於這是一個很小的數據集,因此在此步驟中無需進行任何調整。

單擊Tune步驟后,進入發布步驟。

Publish步驟中,我們可以指定 Druid 中的數據源名稱。我們將此數據源命名為wikipedia。最后,單擊Next以查看 spec。

這是你構建的 spec。嘗試隨意返回並在之前的步驟中進行更改,以查看變動將如何更新 spec。同樣,你也可以直接編輯 spec,並在前面的步驟中看到它。

對 spec 滿意后,點擊Submit創建攝取任務。

你將進入任務視圖,重點關注新創建的任務。任務視圖設置為自動刷新,等待任務成功。

當一項任務成功完成時,意味着它建立了一個或多個 segment,這些 segment 將由數據服務器接收。

Datasources從標題導航到視圖。

等待直到你的數據源(wikipedia)出現。加載 segment 時可能需要幾秒鍾。

一旦看到綠色(完全可用)圓圈,就可以查詢數據源。此時,你可以轉到Query視圖以對數據源運行 SQL 查詢。

運行SELECT * FROM "wikipedia"查詢以查看結果。

通過控制台提交 supervisor

在控制台中,單擊Submit supervisor打開提交 supervisor 窗口。

粘貼以下 spec 並點擊提交:

{
  "type": "kafka",
  "spec" : {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "metricsSpec" : [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    },
    "ioConfig": {
      "topic": "wikipedia",
      "inputFormat": {
        "type": "json"
      },
      "replicas": 2,
      "taskDuration": "PT10M",
      "completionTimeout": "PT20M",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      }
    }
  }
}

這將啟動 supervisor,並分化出 task 監聽數據流入。

直接提交 supervisor

為了直接啟動服務,我們需要在 Druid 包根目錄下運行下面命令提交一個 supervisor spec 給 Druid overlord:

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

如果 supervisor 成功創建,你將得到一個包含 supervisor ID 的響應。在我們的示例中,將返回{"id":"wikipedia"}

你可以在控制台中查看當前 supervisor 和 tasks: http://localhost:8888/unified-console.html#tasks.

查詢數據

當數據發送給 Kafka stream 后,立刻就可以查詢數據。

本文翻譯自 Druid 官方文檔

請關注我們。一起學習 Druid 知識。

碼哥字節
**


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM