OLAP之Druid之實時數據攝入


實時數據攝入

我們采用Kafka Indexing Service作為實時攝入數據的方案。

准備工作

  • 將數據實時灌入某個Kafka topic中
  • 與批量導入數據類似:考慮清楚數據中哪一列可以作為時間列、哪些列可以作為維度列、哪些列可以作為指標列(尤其是指標的聚合函數,包括countsummaxmin等,如果涉及UV、留存的計算,則需要使用HyperUnique或者Theta sketch
  • 考慮最小時間粒度(即queryGranularity)和數據分片的時間粒度(即segmentGranularity),在我們的使用經驗中最小時間粒度應該根據業務需求確定,而數據分片的時間粒度設為HOUR即可

提交攝取任務

官方提供的數據攝取JSON如下,可以以此為模版修改(用#開頭的是我們添加的注釋,正式提交的時候請將注釋刪除,否則不是合法JSON文件):

{
  "type": "kafka", #注意這里的作業類型,與批量導入時不一樣的
  "dataSchema": {
    "dataSource": "metrics-kafka", #導入druid之后的datasource名字,最好是可以識別團隊的前綴
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json", #原始數據的格式,可以是JSON、CSV、TSV
        "timestampSpec": { #指定導入數據的時間列以及時間格式
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": { #在此指定導入數據的維度
          "dimensions": [ #在此指定導入數據的維度
              "channel",
              "cityName",
              "comment",
              "countryIsoCode",
              "countryName",
              "isAnonymous",
              "isMinor",
              "isNew"
            ]
        }
      }
    },
    "metricsSpec": [ #指定導入數據的指標列,以及各指標列的聚合函數
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "value_sum",
        "fieldName": "value", #fieldName是原始數據中的列名,name是在druid中的列名,可以不同
        "type": "doubleSum"
      },
      {
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
      },
      {
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR", #數據分片的時間粒度
      "queryGranularity": "NONE" #最小的查詢時間粒度, None則為毫秒級
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  },
  "ioConfig": {
    "topic": "metrics", #指定Kafka中的topic名
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092", #指定Kafka的broker列表
      "group.id": "druidxx" # 可以指定一個消費kafka的身份,需要注意的是不能有兩個druid作業以同一個身份消費同一個topic
    },
    "taskCount": 1, #task作業並發數
    "replicas": 1, #task作業的副本數
    "taskDuration": "PT1H" #單個task進程運行的時間
  }
}
提交命令
curl -X 'POST' -H 'Content-Type:application/json' -d @my-index-task.json OVERLORD_IP:8090/druid/indexer/v1/supervisor
停止消費Kafka
curl -X 'POST' -H 'Content-Type:application/json' OVERLORD_IP:8090/druid/indexer/v1/supervisor/datasource名/shutdown
數據查詢

建議在首次查詢之前,向Broker提交TimeBoundary查詢,方便掌握Druid中數據的時間分布

{
    "queryType" : "timeBoundary",
    "dataSource": "sample_datasource" #datasource名字
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM