大數據時序分析組件druid獲取kafka和hdfs數據示例


1.說明

a. druid支持獲取數據種類較多,包括本地離線數據,hdfs數據和kafka實時流數據。在實際基於hadoop生態系統的大數據開發應用中,獲取hdfs數據和kafka流式數據較為常見。本篇文檔着重說明獲取kafka和hdfs數據的實例。
b. 想要獲取什么樣類型的數據,就需要在配置文件配置(這里默認druid集群或單擊已經搭建完成,如果沒有搭建,參照上篇博客)。vim ${DRUID_HOME}/conf/druid/cluster/_common/common.runtime.properties

druid.extensions.loadList=["druid-hdfs-storage","mysql-metadata-storage","druid-kafka-indexing-service"]

c. 獲取數據的方法有兩種,第一種就是通過頁面傻瓜式的下一步,如圖

只需要相關信息填寫正確,按照箭頭方向每一步正確操作即可
第二種方式是自己寫json配置文件,通過執行命令。其實這兩種方式本事是一樣的。只不過第一種方式是在頁面操作后生成了json文件。但實際開發中,還是建議選擇第二種方式。下面基於獲取kafka和hdfs上的數據來介紹第二種方式。

2.實時獲取kafka數據流

a. druid自帶了一個獲取kafka數據樣例,${DRUID_HOME}/quickstart/tutorial/wikipedia-kafka-supervisor.json,直接在此基礎上改成自己的正確的配置

{
  "type": "kafka",
  "spec" : {
    "dataSchema": {
      "dataSource": "my-wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          {
            "name": "added",
            "type": "long"
          },
          {
            "name": "deleted",
            "type": "long"
          },
          {
            "name": "delta",
            "type": "long"
          }
        ]
      },
      "metricsSpec": [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "reportParseExceptions": false
    },
    "ioConfig": {
      "topic": "my-wikipedia",
      "inputFormat": {
        "type": "json"
      },
      "replicas": 1,
      "taskDuration": "PT10M",
      "completionTimeout": "PT20M",
      "consumerProperties": {
        "bootstrap.servers": "master:9092"
      }
    }
  }
}

b. 執行命令

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://master:8081/druid/indexer/v1/supervisor

執行上述命令出現{"id":"my-wikipedia"}的結果證明是成功的
在druid頁面也正確看到任務的狀況,如下圖,表示完全成功

c.往kafka寫和配置匹配的樣例數據,就可以在query頁面查看到寫入的數據了

3. 獲取hdfs數據

a. 獲取hdfs數據和kafka數據只是在配置文件上有所區別,druid也自帶了一個獲取hdfs數據樣例,${DRUID_HOME}/quickstart/tutorial/wikipedia-index-hadoop.json,這里我將其給名為my-wikipedia-index-hadoop.json,直接在此基礎上改成自己的正確的配置

{
  "type" : "index_hadoop",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "my-hdfs-wikipedia",
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "dimensionsSpec" : {
            "dimensions" : [
              "channel",
              "cityName",
              "comment",
              "countryIsoCode",
              "countryName",
              "isAnonymous",
              "isMinor",
              "isNew",
              "isRobot",
              "isUnpatrolled",
              "metroCode",
              "namespace",
              "page",
              "regionIsoCode",
              "regionName",
              "user",
              { "name": "added", "type": "long" },
              { "name": "deleted", "type": "long" },
              { "name": "delta", "type": "long" }
            ]
          },
          "timestampSpec" : {
            "format" : "auto",
            "column" : "time"
          }
        }
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "/test-data/druid/wikiticker-2015-09-12-sampled.json.gz"
      }
    },
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      },
      "forceExtendableShardSpecs" : true,
      "jobProperties" : {
        "fs.default.name" : "hdfs://master:8020",
        "fs.defaultFS" : "hdfs://master:8020/",
        "dfs.datanode.address" : "master",
        "dfs.client.use.datanode.hostname" : "true",
        "dfs.datanode.use.datanode.hostname" : "true",
        "yarn.resourcemanager.hostname" : "master",
        "yarn.nodemanager.vmem-check-enabled" : "false",
        "mapreduce.map.java.opts" : "-Duser.timezone=UTC -Dfile.encoding=UTF-8",
        "mapreduce.job.user.classpath.first" : "true",
        "mapreduce.reduce.java.opts" : "-Duser.timezone=UTC -Dfile.encoding=UTF-8",
        "mapreduce.map.memory.mb" : 1024,
        "mapreduce.reduce.memory.mb" : 1024
      }
    }
  },
  "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.8.5"]
}

這里需要注意"hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.8.5"]這項配置。這里的配置需要跟隨druid自帶的hadoop-dependencies版本,比如這里是${DRUID_HOME}/hadoop-dependencies/hadoop-client/2.8.5/。但是這里還需要注意hadoop版本和該版本是否一致,如果不至於是會報錯的。這個時候最好的方式是將druid版本作調整。

b. 執行命令

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/my-wikipedia-index-hadoop.json http://master:8081/druid/indexer/v1/task


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM