Druid 0.17 入門(3)—— 數據接入指南


file

在快速開始中,我們演示了接入本地示例數據方式,但Druid其實支持非常豐富的數據接入方式。比如批處理數據的接入和實時流數據的接入。本文我們將介紹這幾種數據接入方式。

  • 文件數據接入:從文件中加載批處理數據
  • 從Kafka中接入流數據:從Kafka中加載流數據
  • Hadoop數據接入:從Hadoop中加載批處理數據
  • 編寫自己的數據接入規范:自定義新的接入規范

本文主要介紹前兩種最常用的數據接入方式。

1、Loading a file——加載文件

Druid提供以下幾種方式加載數據:

  • 通過頁面數據加載器

  • 通過控制台

  • 通過命令行

  • 通過Curl命令調用

1.1、數據加載器

Druid提供了一個示例數據文件,其中包含2015年9月12日發生的Wiki的示例數據。

此樣本數據位於quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz

示例數據大概是這樣:

{
  "timestamp":"2015-09-12T20:03:45.018Z",
  "channel":"#en.wikipedia",
  "namespace":"Main",
  "page":"Spider-Man's powers and equipment",
  "user":"foobar",
  "comment":"/* Artificial web-shooters */",
  "cityName":"New York",
  "regionName":"New York",
  "regionIsoCode":"NY",
  "countryName":"United States",
  "countryIsoCode":"US",
  "isAnonymous":false,
  "isNew":false,
  "isMinor":false,
  "isRobot":false,
  "isUnpatrolled":false,
  "added":99,
  "delta":99,
  "deleted":0,
}

Druid加載數據分為以下幾種:

  • 加載文件
  • 從kafka中加載數據
  • 從hadoop中加載數據
  • 自定義加載方式

我們這樣演示一下加載示例文件數據

1.1.1、進入localhost:8888 點擊load data

file

1.1.2、選擇local disk

file

1.1.3、選擇Connect data

file

1.1.4、預覽數據

Base directory輸入quickstart/tutorial/

File filter輸入 wikiticker-2015-09-12-sampled.json.gz

然后點擊apply預覽 就可以看見數據了 點擊Next:parse data解析數據

file

1.1.5、解析數據

可以看到json數據已經被解析了 繼續解析時間

file

1.1.6、解析時間

解析時間成功 之后兩步是transform和filter 這里不做演示了 直接next

file

1.1.7、確認Schema

這一步會讓我們確認Schema 可以做一些修改

由於數據量較小 我們直接關掉Rollup 直接下一步

file

1.1.8、設置分段

這里可以設置數據分段 我們選擇hour next

file

1.1.9、確認發布

file

file

1.1.10、發布成功 開始解析數據

file

等待任務成功

file

1.1.11、查看數據

選擇datasources 可以看到我們加載的數據

可以看到數據源名稱 Fully是完全可用 還有大小等各種信息

file

1.1.12、查詢數據

點擊query按鈕

我們可以寫sql查詢數據了 還可以將數據下載

file

1.2 控制台

在任務視圖中,單擊Submit JSON task

file

這將打開規格提交對話框,粘貼規范

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "timestampSpec": {
        "column": "time",
        "format": "iso"
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/tutorial/",
        "filter" : "wikiticker-2015-09-12-sampled.json.gz"
      },
      "inputFormat" :  {
        "type": "json"
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000,
      "maxRowsInMemory" : 25000
    }
  }
}

file

查看加載任務即可。

1.3 命令行

為了方便起見,Druid提供了一個加載數據的腳本

bin/post-index-task

我們可以運行命令

bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081

看到如下輸出:

Beginning indexing data for wikipedia
Task started: index_wikipedia_2018-07-27T06:37:44.323Z
Task log:     http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/log
Task status:  http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/status
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto the cluster...
wikipedia loading complete! You may now query your data

查看加載任務即可。

1.4 CURL

我們可以通過直接調用CURL來加載數據

curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task

提交成功

{"task":"index_wikipedia_2018-06-09T21:30:32.802Z"}

2、Load from Apache Kafka——從Apache Kafka加載流數據

Apache Kafka是一個高性能的消息系統,由Scala 寫成。是由Apache 軟件基金會開發的一個開源消息系統項目。

Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該項目的目標是為處理實時數據提供一個統一、高通量、低等待(低延時)的平台。

更多kafka相關請查看Kafka入門寶典(詳細截圖版)

2.1 安裝kafka

我們安裝一個最新的kafka

curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0

啟動kafka

./bin/kafka-server-start.sh config/server.properties

創建一個topic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

2.2 將數據寫入Kafka

向kafka的topic為wikipedia寫入數據

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

在kafka目錄中運行命令 {PATH_TO_DRUID}替換為druid目錄

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

2.3 加載kafka數據到Druid

druid加載kafka的數據也有多種方式

  • 數據加載器
  • 控制台
  • CURL

2.3.1 數據加載器

2.3.1.1 進入localhost:8888 點擊load data

選擇Apache Kafka並單擊Connect data

file

2.3.1.2 輸入kafka服務器localhost:9092
輸入topic wikipedia 可以預覽數據 然后下一步

file

2.3.1.3 解析數據

file

2.3.1.4 解析時間戳 設置轉換 設置過濾

file
file
file

2.3.1.4 這步比較重要 確定統計的范圍

file

file

2.3.1.5 發布

file

2.3.1.6 等待任務完成

file
file

2.3.1.7 去查詢頁面查看

file

2.3.2 控制台

在任務視圖中,單擊Submit JSON supervisor以打開對話框。

file

粘貼進去如下指令

{
  "type": "kafka",
  "spec" : {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "metricsSpec" : [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    },
    "ioConfig": {
      "topic": "wikipedia",
      "inputFormat": {
        "type": "json"
      },
      "replicas": 2,
      "taskDuration": "PT10M",
      "completionTimeout": "PT20M",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      }
    }
  }
}

2.3.3 CURL

我們也可以通過直接調用CURL來加載kafka數據

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

大數據流動 專注於大數據實時計算,數據治理,數據可視化等技術分享與實踐。
請在后台回復關鍵字下載相關資料。相關學習交流群已經成立,歡迎加入~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM