Druid.io系列(九):數據攝入


1. 概述

Druid的數據攝入主要包括兩大類: 
1. 實時輸入攝入:包括Pull,Push兩種 
- Pull:需要啟動一個RealtimeNode節點,通過不同的Firehose攝取不同種類的數據源。 
- Push:需要啟動Tranquility或是Kafka索引服務。通過HTTP調用的方式進行數據攝入 
2. 離線數據攝入:可以通過Realtime節點攝入,也可以通過索引節點啟動任務攝入

本文演示環節主要基於上一章部署的集群來進行

2. 實時數據攝入

2.1 Pull

由於Realtime Node 沒有提供高可用,可伸縮等特性,對於比較重要的場景推薦使用 Tranquility Server or 或是Tranquility Kafka索引服務

2.2 Push

Indexing service在前文已經介紹過了,Tranquility 是一個Scala庫,它通過索引服務實現數據實時的攝入。它之所以存在,是因為Indexing service API屬於低層面的。Tranquility是對索引服務進行抽象封裝, 對使用者屏蔽了 創建任務,處理分區、復制、服務發現和shema rollover等環節。

通過Tranquility 的數據攝入,可以分為兩種方式

    • Tranquility Server:發送方可以通過Tranquility Server 提供的HTTP接口,向Druid發送數據。
    • Tranquility Kafka:發送發可以先將數據發送到Kafka,Tranquility Kafka會根據配置從Kafka獲取數據,並寫到Druid中。

2.2.1 Tranquility Server配置

配置流程如下 
1. 開啟Tranquility Server,在數據節點上編輯conf/supervise/data-with-query.conf 文件,將Tranquility Server注釋放開

# Uncomment to use Tranquility Server                                                                                                                                                          
!p95 tranquility-server bin/tranquility server -configFile conf/tranquility/server.json 

2. 拷貝quick里面的server.json

root@druid:~/imply-2.3.8# cp conf-quickstart/tranquility/server.json conf/tranquility/

3. 啟動服務

root@druid:~/imply-2.3.8# bin/supervise -c conf/supervise/data-with-query.conf

啟動信息如下:

[Fri Dec  8 15:41:39 2017] Running command[tranquility-server], logging to[/root/imply-2.3.8/var/sv/tranquility-server.log]: bin/tranquility server -configFile conf/tranquility/server.json

4. 發送數據

bin/generate-example-metrics | curl -XPOST -H'Content-Type: application/json' --data-binary @- http://localhost:8200/v1/post/tutorial-tranquility-server

如果成功會打印出,表名產生了25條數據到druid里

{"result":{"received":25,"sent":25}}

5. 查詢數據

root@druid:~/imply-2.3.8/bin#./plyql -h localhost -p 8082 -q "SELECT server, SUM("count") AS "events", COUNT(*) AS "rows" FROM "tutorial-tranquility-server" GROUP BY server;"

┌──────────────────┬────────┬──────┐
│ server           │ events │ rows │
├──────────────────┼────────┼──────┤
│ www1.example.com │ 11    │
│ www2.example.com │ 54    │
│ www3.example.com │ 72    │
│ www4.example.com │ 52    │
│ www5.example.com │ 77    │
└──────────────────┴────────┴──────┘

6. 重啟Tranquility Server:

bin/service –restart tranquility-server

 

2.2.2 Tranquility Kafka配置

配置流程如下 
1. 開啟Tranquility Kafka,在數據節點上編輯conf/supervise/data-with-query.conf 文件,將Tranquility Kafka注釋放開

# Uncomment to use Tranquility Server                                                                                                                                                          
!p95 tranquility-server bin/tranquility server -configFile conf/tranquility/server.json 

2. 拷貝quick里面的kafka.json

root@druid:~/imply-2.3.8# cp conf-quickstart/tranquility/kafka.json conf/tranquility/

詳細配置可參考:http://druid.io/docs/0.12.1/tutorials/tutorial-kafka.html

3. 在kafa集群中創建topic

root@druid:/opt/PaaS/Talas/lib/Kafka/bin#./kafka-topics.sh --create --zookeeper native-lufanfeng-2-5-24-138:2181,native-lufanfeng-3-5-24-139:2181,native-lufanfeng-4-5-24-140:2181 --replication-factor 1 --partitions 1 --topic tutorial-tranquility-kafka

4. 啟動服務

root@druid:~/imply-2.3.8# bin/supervise -c conf/supervise/data-with-query.conf

啟動信息如下:

[Tue Dec 12 10:43:28 2017] Running command[tranquility-kafka], logging to[/root/imply-2.3.8/var/sv/tranquility-kafka.log]: bin/tranquility kafka -configFile conf/tranquility/kafka.json

5. 使用kafka自帶的工具發送數據

root@druid:/opt/PaaS/Talas/lib/Kafka/bin# ./kafka-console-producer.sh --broker-list native-lufanfeng-2-5-24-138:9092,native-lufanfeng-3-5-24-139:9092,native-lufanfeng-4-5-24-140:9092 --topic tutorial-tranquility-kafka
{"unit": "milliseconds", "http_method": "GET", "value": 107, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www1.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 19, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www1.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 135, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 103, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www4.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 93, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www3.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 89, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/list", "metricType": "request/latency", "server": "www2.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 7, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www5.example.com"}
{"unit": "milliseconds", "http_method": "GET", "value": 65, "timestamp": "2017-12-12T05:55:59Z", "http_code": "200", "page": "/", "metricType": "request/latency", "server": "www3.example.com"}

此時觀察kafka-server.log的日志會發現類似於如下輸出

2017-12-12 06:21:37,241 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {tutorial-tranquility-kafka={receivedCount=0, sentCount=8,droppedCount=8, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms. 

在datasource中,windowPeriod設置成了P10M,timestamp不在當前時間10M內的數據都會被過濾,由於上面的數據的timestamp和執行時間相差了大概26分鍾左右,所以都會被drop調,為了達到演示效果,可以對bin/generate-example-metrics-main 的腳本進行調整。代碼如下:

# Copyright 2017 Imply Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import json
import random
import sys
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaClient

hosts="native-lufanfeng-2-5-24-138:9092,native-lufanfeng-3-5-24-139:9092,native-lufanfeng-4-5-24-140:9092"
# hosts="10.48.253.104:9092"
topic='tutorial-tranquility-kafka'

class KafkaSender():

    def __init__(self):
        self.client=KafkaClient(hosts)
        self.producer=KafkaProducer(bootstrap_servers=hosts)
        self.client.ensure_topic_exists(topic)
    def send_messages(self,msg):
        self.producer.send(topic,msg)
        self.producer.r

def main():
  parser = argparse.ArgumentParser(description='Generate example page request latency metrics.')
  parser.add_argument('--count', '-c', type=int, default=25, help='Number of events to generate (negative for unlimited)')
  args = parser.parse_args()

  count = 0
  sender = KafkaSender()
  while args.count < 0 or count < args.count:
    timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

    r = random.randint(1, 4)
    if r == 1 or r == 2:
      page = '/'
    elif r == 3:
      page = '/list'
    else:
      page = '/get/' + str(random.randint(1, 99))

    server = 'www' + str(random.randint(1, 5)) + '.example.com'

    latency = max(1, random.gauss(80, 40))

    record = json.dumps({
      'timestamp': timestamp,
      'metricType': 'request/latency',
      'value': int(latency),

      # Additional dimensions
      'page': page,
      'server': server,
      'http_method': 'GET',
      'http_code': '200',
      'unit': 'milliseconds'
    })
    sender.send_messages(record)
    print 'Send:%s Successful!' % record
    count += 1

try:
  main()
except KeyboardInterrupt:
  sys.exit(1)

3. 離線數據攝入

3.1 靜態文件攝入

使用自帶的攝入機制,可以在數據節點攝入本地文件,方法如下:

bin/post-index-task --file quickstart/wikiticker-index.json

wikiticker-index.json 文件中既包括datasource的定義,也包括數據文件位置的配置

3.2 HDFS文件攝入

配置過程可參考:http://druid.io/docs/0.12.1/ingestion/batch-ingestion.html

4. 配置參考

通用配置:https://github.com/druid-io/tranquility/blob/master/docs/configuration.md 
數據攝入通用配置:http://druid.io/docs/latest/ingestion/index.html 
Tranquility Kafka:https://github.com/druid-io/tranquility/blob/master/docs/kafka.md

5. 其他注意事項

5.1 數據分片

Druid的分片基本都是通過配置tunningConfig來配置的,實時,批量配置的方式會存在一定的差異

實時加載包括下面兩種類型 
- Linear分片: 
- 添加新節點時,原節點的配置不需要調整 
- 當存在分片時數據也能被查詢 
- Numbered分片 
- 所有分片存在時,才能查詢 
- 需要制定分片總數

本地文件加載包括下面兩種類型 
- 按照Partition大小分片 
- 設置總的分片數

Hadoop文件加載包括下面兩種類型 
- 哈希分片 
- 范圍分片

5.2 高基數維度優化

對於需要統計維度基數的需求,如果某個維度的基數很大,可能會存在下列問題。維度基數統計主要包括下面兩種類型 
- Cardinality: 基於HyperLogLog算法,只在查詢階段做了優化,不能減少存儲容量,基數大時,效率可能會有問題 
- HyperUnique: 在攝入階段進行優化,對於不需要對高基數維度進行過濾,分組的業務場景可以使用該類型


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM