Apache Druid


基本概念:

概述:

Metamarkets 公司(一家為在線媒體或廣告公司提供數據分析服務的公司)推出的一個分布式內存實時分析系統,用於解決如何在大規模數據集下進行快速的、交互式的查詢和分析。
Druid 是一個開源的數據分析引擎工具,為實時和歷史數據的次秒級(多於一秒)查詢設計。主要應用於對數據的OLAP查詢,Druid 提供低延遲(實時)的數據攝取、靈活的數據探索、快速的數據聚合。現有的 Druid 部署已支持擴展到數萬億時間和 PB 級數據

對比kylin:

同樣是cube預計算
支持流式更靈活
Kylin 利用 Hadoop/HBase 做計算和存儲,使用 SQL 查詢,提供 JDBC/ODBC 驅動與常見 BI 工具集成
Druid 有自己獨立的分布式集群,能夠實時攝入數據,有自己的查詢接口(與BI兼容性較弱),通常多用於實時要求高的場景

核心特點:

Apache Druid是一個開源的、分布式、實時OLAP分析工具。Druid的核心設計結合了數據倉庫、時間序列數據庫和搜索系統的思想,適用於多種場景的高性能數據實時分析。
Druid將這三個系統中的每個系統的關鍵特征合並到其接收層、存儲格式、查詢層和核心體系結構中。
列式存儲
    Druid 獨立的存儲和壓縮每一列,只需要讀取特定查詢所需的內容,這可以支持快速掃描、排名和聚合
流式和批量攝取(Ingestion)
    支持 Apache Kafka、HDFS、AWS S3、stream processors 等現成連接器
本地的搜索索引
    Druid 為字符串創建倒排索引,以支持快速搜索和排序
靈活的 schema
    Druid 可以處理變化的 schema 和嵌套數據
基於時間優化 partition
    Druid 基於時間智能的對數據進行分區,基於時間的查詢比傳統數據庫要快得多
支持 SQL
    Druid 支持本機的 JSON 語言,還支持基於 HTTP 或者 JDBC 的 SQL
水平擴展性
    Druid 已經用戶生產環境中,每秒接收數百萬個事件,保存多年的數據並提供次秒級查詢
操作簡單
    只需要增加或刪除服務器即可擴展或縮小規模,Druid 會自動平衡,容錯架構通過服務器的故障進行路由

Ingestion(攝取):

Druid支持流式傳輸和批量攝取。Druid連接到數據源,包括:Kafka(用於流數據加載),或分布式文件系統,如HDFS(用於批處理數據加載)。
Druid在 “索引” 過程中將數據源中的原始數據轉換為支持高效讀取的優化格式(Segment,段)。

存儲:

Druid的數據存儲采用列式存儲格式。根據列的類型(字符串,數字等),應用不同的壓縮和編碼方法,根據列類型構建不同類型的索引。
Druid為字符串列構建倒排索引,以進行快速搜索和過濾。Druid可按時間對數據進行智能分區,以實現面向時間的快速查詢。
Druid在攝取數據時對數據進行預聚合,節省大量存儲空間。

優點:

對於大部分查詢場景可以亞秒級響應
事件流實時寫入與批量數據導入兼備
數據寫入前預聚合節省存儲空間,提升查詢效率
水平擴容能力強
社區活躍

適合場景:

處理時間序列事件
快速的聚合以及探索式分析
近實時分析亞秒級響應
存儲大量(TB級、PB級)可以預先定義若干維度的事件
無單點問題的數據存儲

架構:

服務:

Master:Coordinator & Overload 進程,管理數據可用性和數據攝取
Data:Historical & MiddleManager,執行提取工作負載並存儲所有可查詢數據
Query:Broker & Router,處理來自外部客戶端的查詢

外部依賴:

Deep Storage:深度存儲,例如HDFS或者S3。不是用來存儲查詢數據的。而是作為數據的備份或者進程間數據交換。所以需在HADOOP集群內。
Metadata Storage:元數據存儲,可以用RDBMS。
ZooKeeper:服務發現、leader選舉、服務協調。

運維部署:

下載:

cd /opt/lagou/software
wget http://apache.communilink.net/druid/0.19.0/apache-druid-0.19.0-bin.tar.gz
tar -zxvf apache-druid-0.19.0-bin.tar.gz

Nano-Quickstart:1個CPU,4GB RAM
    啟動命令: bin/start-nano-quickstart
    配置目錄: conf/druid/single-server/nano-quickstart/*
微型快速入門:4個CPU,16GB RAM
    啟動命令: bin/start-micro-quickstart
    配置目錄: conf/druid/single-server/micro-quickstart/*
小型:8 CPU,64GB RAM(〜i3.2xlarge)
    啟動命令: bin/start-small
    配置目錄: conf/druid/single-server/small/*
中:16 CPU,128GB RAM(〜i3.4xlarge)
    啟動命令: bin/start-medium
    配置目錄: conf/druid/single-server/medium/*
大型:32 CPU,256GB RAM(〜i3.8xlarge)
    啟動命令: bin/start-large
    配置目錄: conf/druid/single-server/large/*
大型X:64 CPU,512GB RAM(〜i3.16xlarge)
    啟動命令: bin/start-xlarge
    配置目錄: conf/druid/single-server/xlarge/*

訪問:登錄http://linux121:8888/查看頁面

架構說明:

主節點部署 Coordinator 和 Overlord進程
數據節點運行 Historical 和 MiddleManager進程
查詢節點 部署 Broker 和 Router 進程

安裝流程:

設置環境變量
MySQL中創建相關數據庫
配置Druid參數
    將hadoop配置文件core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml鏈接到conf/druid/cluster/_common/ 下
將MySQL的驅動程序,鏈接到 $DRUID_HOME/extensions/mysql-metadata-storage/ 下
修改配置文件($DRUID_HOME/conf/druid/cluster/_common/common.runtime.properties)
    配置ip、metadata等信息
配置主節點文件(參數大小根據實際情況配置)
    $DRUID_HOME/conf/druid/cluster/master/coordinator-overlord/jvm.config
配置數據節點文件(參數大小根據實際情況配置)
    $DRUID_HOME/conf/druid/cluster/data/historical/jvm.config
    $DRUID_HOME/conf/druid/cluster/data/historical/runtime.properties
    $DRUID_HOME/conf/druid/cluster/data/middleManager/jvm.config
配置查詢節點文件(參數大小根據實際情況配置)
    $DRUID_HOME/conf/druid/cluster/query/broker/jvm.config
    $DRUID_HOME/conf/druid/cluster/query/broker/runtime.properties
    $DRUID_HOME/conf/druid/cluster/query/router/jvm.config
啟動:
    在主節點上執行nohup start-cluster-master-no-zk-server &
    在數據節點上執行nohup start-cluster-data-server &
    在查詢節點上執行nohup start-cluster-query-server &

日志路徑:
    /data/druid_master/apache-druid-0.17.0/var/sv
    頁面點擊具體task也能看log

數據存儲:

概述:

Druid中的數據存儲在被稱為DataSource中,DataSource類似RDBMS中的 Table
每個DataSource按照時間划分,每個時間范圍稱為一個Chunk(比如按天分區,則一個chunk為一天)
在Chunk中數據被分為一個或多個Segment
Segment是數據實際存儲結構,Datasource、Chunk只是一個邏輯概念
Segment是按照時間組織成的Chunk,所以在按照時間查詢數據時,效率非常高
每個Segment都是一個單獨的文件,通常包含幾百萬行數據

數據分區:

Druid處理的是事件數據,每條數據都會帶有一個時間戳,可以使用時間進行分區
上圖指定了分區粒度為為天,那么每天的數據都會被單獨存儲和查詢

Segment內部存儲結構:

Druid采用列式存儲,每列數據都是在獨立的結構中存儲
Segment中的數據類型主要分為三種
時間戳。每一行數據,都必須有一個timestamp,Druid一定會基於時間戳來分片
維度列。用來過濾filter或者組合groupby的列,通常是stringfloatdoubleint類型
指標列。用來進行聚合計算的列,指定的聚合函數 sum、average 等

segment狀態:

is_published:如果 segment 元數據已發布到存儲的元數據中,used則為 true,此值也為 true
is_available:如果該 segment 當前可用於實時任務或Historical查詢,則為 True。
is_realtime:如果 segment 在實時任務上可用,則為 true 。對於使用實時寫入的數據源,通常會先設置成true,然后隨着 segment 的發布和移交而變成false
            實時流Task kill掉后,會轉移為is_published狀態,寫入metadata表。
is_overshadowed:如果該 segment 已發布(used設置為 true)並且被其他一些已發布的 segment 完全覆蓋,則為 true。通常,這是一個過渡狀態,處於此狀態的 segment 很快就會將其used標志自動設置為 false

索引服務:

概述:

數據導入並創建 segments 數據文件的服務

架構:

overlord 作為主節點
    負責創建task、分發task到middlemanager上運行,為task創建鎖以及跟蹤task運行狀態並反饋給用戶
middlemanager是從節點
    作為從節點,負責接收主節點分配的任務,然后為每個task啟動一個獨立的JVM進程來完成具體的任務
peon用於運行一個task
    由middlemanager啟動的一個進程用於運行一個task任務

Task類型:

index hadoop task:Hadoop索引任務,利用Hadoop集群執行MapReduce任務以完成segment數據文件的創建,適合體量比較大的segments數據文件的創建任務
index kafka task:用於Kafka數據的實時攝入,通過Kafka索引服務可以在Overlord上配置一個KafkaSupervisor,通過管理Kafka索引任務的創建和生命周期來完成 Kafka 數據的攝取
merge task:合並索引任務,將多個segments數據文件按照指定的聚合方法合並為一個segments數據文件
kill task : 銷毀索引任務,將執行時間范圍內的數據從Druid集群的深度存儲中刪除

索引及壓縮機制:

查詢時延低性能好的原因:

數據預聚合
列式存儲、數據壓縮
Bitmap 索引
mmap(內存文件映射方式)
查詢結果的中間緩存

數據預聚合:

Druid通過一個roll-up的處理,將原始數據在注入的時候就進行匯總處理
Roll-up可以壓縮我們需要保存的數據量
Druid會把選定的相同維度的數據進行聚合操作,可減少存儲的大小
Druid可以通過 queryGranularity 來控制注入數據的粒度。 最小的queryGranularity 是 millisecond(毫秒級)

位圖索引:

通過建立位圖索引,實現快速數據查找。
Bitmap 索引主要為了加速查詢時有條件過濾的場景。Druid 在生成索引文件的時候,對每個列的每個取值生成對應的 Bitmap 集合。
結構:
    索引位圖可以看作是HashMap<String, Bitmap>
    key就是維度的取值
    value就是該表中對應的行是否有該維度的值
執行過程分析:
    根據時間段定位到segment
    Appkey in ('appkey1''appkey2'and area='北京' 查到各自的bitmap,取出對應的行數,取交集或者並集

操作:

DML操作:

頁面進行,刪除datasource需要先刪或者暫停ingestion
關於diamession和metric
diamession理解為group by的字段
metric理解為sum(字段),count(字段)這種,包含了聚合函數+字段。

新建流程:

1.start

2.Connect。注意kafka topic沒有消息會卡住

3.parse data 可以查看topic的數據

4.Parse Time

可以選擇格式。auto或者指定的datetime format

5.transform

添加diamession,比如新列為 column1 + column2 拼接起來。

6.filter

7.configure Schema

Query granularity可以指定時間維度保留的粒度。如時、分、秒。

8.partition分區

根據數據量來定義

9.tune

調整task數量,讀取時間范圍等

10.publish

11.edit spec

API介紹:

數據導入:

bin/post-index-task --file quickstart/tutorial/retention-index.json --url http://localhost:8081

數據保留/丟棄:

點擊Cluster default: loadForever旁邊的小按鈕,配置dropforever

更新/覆蓋/追加數據:

bin/post-index-task --file quickstart/tutorial/updates-overwrite-index.json --url http://localhost:8081
bin/post-index-task --file quickstart/tutorial/updates-append-index.json --url http://localhost:8081
bin/post-index-task --file quickstart/tutorial/updates-append-index2.json --url http://localhost:8081

刪除數據:

先禁用指定interval的segments,標記為unused
    curl -X 'POST' -H 'Content-Type:application/json' -d '{ "interval" : "2015-09-12T18:00:00.000Z/2015-09-12T20:00:00.000Z" }' http://localhost:8081/druid/coordinator/v1/datasources/deletion-tutorial/markUnused
禁用所有的segments:
    DELETE請求 /druid/coordinator/v1/datasources/{dataSourceName}
刪除segments:
    curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/deletion-kill.json http://localhost:8081/druid/indexer/v1/task
    請求體:
        {
            "type""kill",
            "id": <task_id>,
            "dataSource": <task_datasource>,
            "interval" : <all_segments_in_this_interval_will_die!>,
            "markAsUnused": <true|false>,       // 如果為true,會先將interval標記為unused,然后再刪除。
            "context": <task context>
        }

metadata相關表介紹:

druid_segments   存放is_published狀態的segment,其他狀態暫無記錄,可能存放於其他組件。
druid_pendingSegments   每次創建segment(初始狀態)的歷史表

生產環境排查:

1. 更改同一個ingestion的interval,會導致新的segment創建失敗,報錯為org.apache.druid.java.util.common.ISE: Could not allocate segment for row with timestamp

到overlord機器上查看log,發現報了segment conflict沖突。舊的interval貌似影響了新的interval。
解決:
    暫時將interval改回去。

2.datasource刪除失敗,即使先停掉任務

實時節點(MiddleManager):即時攝入實時數據,以及生成Segment數據文件 實時節點負責消費實時數據,實時數據首先會被直接加載進實時節點內存中的堆結構緩存區,當條件滿足時, 緩存區的數據會被沖寫到硬盤上形成一個數據塊(Segment Split),同時實時節點又會立即將新生成的數據庫加載到內存的非堆區, 因此無論是堆結構緩存區還是非堆區里的數據都能被查詢節點(Broker Node)查詢 
歷史節點(Historical Node):加載已經生成好的文件,以供數據查詢 
查詢節點(Broker Node):對外提供數據查詢服務,並同時從實時節點和歷史節點查詢數據,合並后返回給調用方 
協調節點(Coordinator Node):負責歷史節點的數據負載均衡,以及通過規則(Rule)管理數據的生命周期

原因:
    is_realtime狀態的segment不能刪除,位於historycal緩存中(未驗證)
    is_realtime的segment在一定時間或者暫停supervisor后,狀態會轉移為is_published,寫入metadata中,才能刪除。
解決方法:
    重命名datasource 或者等待空閑時段再試試。

3.如何實現刪除舊數據?

1.請求刪除相關api。
    注意realtime狀態的segment需要先暫停supervisor任務,等待所有的Segement從realtime變為publish即可。
    (暫停任務后,dataSource被disable,此時segement會短暫不可用,重新enable dataSource之后or等待一段時間自動load后,segement才變為publish狀態。)
2.使用data retention規則(失敗)
    先定義loadByInterval,再定義dropForever即可
    注意:
        realtime狀態的segment要先轉換publish才行。
        __time字段為寫入時間,而不是業務時間。這樣才能過濾舊版本的全量數據。(問題:pivot展示失敗,__time字段必選,而diamession類型沒有Date,所以不能作為Time類型。)
            overlap must have matching types (are SET/STRINGSET/TIME_RANGE)
    腳本示例:
        curl -X 'POST' -H 'Content-Type:application/json' -d '[{"type":"loadByInterval","tieredReplicants":{"_default_tier":1},"interval":"2010-01-01/2020-01-01"}]' http://10.0.0.48:18888/druid/coordinator/v1/rules/${dataSourceName}
3.每天數據一個版本,pivot展示的時候通過version_date來篩選
    注意字段類型得為數字,因為TIME類型不可選,而String會報錯:  max must have expression of type NUMBER or TIME (is STRING)
    `insert_timestamp` UInt64
    然后pivot篩選
        $main.filter($insert_timestamp==$main.max($insert_timestamp)).sum($sum_arpu)
    存在問題:
        $insert_timestamp==$main.max($insert_timestamp)pivot會加不了filter。
    解決:
        insert kafka表前,先用一個物理表保存數據,然后T-1的時候再發送相同diamession,負metric的過去抵消。
        or 
        每天定時腳本sed更新config.yaml里面寫死的字段值。(在生產上使用過)

4.supervisors或者task刪除返回500,

頁面點擊task log提示Duplicate entry 'partner-partition_2021-06-14T00:00:00.000Z_2021-06-15T00:00:00.0' for key 'PRIMARY' [statement:\"INSERT INTO druid_pendingSegments
解決:
    登錄數據庫,執行delete from druid_pendingSegments where dataSource='partner-partition';

pivot可視化:

概述:

開源版本turnilo
文檔地址:https://allegro.github.io/turnilo/configuration-cluster.html

使用:

turnilo --druid http://10.0.0.16:8082 --port 9099  --print-config --with-comments > config_v10.yaml
可以修改配置,比如漢化,diamession字段等

啟動:

turnilo --config config.yaml

停止:

kill 

只顯示部分cube:

nginx配置:

location /sources/ {
        proxy_pass http://10.0.0.16:9099/sources/;
        proxy_set_header 'x-turnilo-allow-datacubes' '*';      # 控制顯示的datasource
        proxy_set_header Host $proxy_host# 修改轉發請求頭,讓8080端口的應用可以受到真實的請求
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
    location /pivot/ {
            proxy_pass http://10.0.0.16:9099/;
            auth_basic "Please input password"#這里是驗證時的提示信息
            auth_basic_user_file /usr/local/nginx/passwd;
            proxy_set_header a a;
            proxy_set_header 'x-turnilo-allow-datacubes' '*,agent_sales_stats_v1';
            proxy_set_header Host $proxy_host# 修改轉發請求頭,讓8080端口的應用可以受到真實的請求
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }
        location /fonts/Open-Sans-regular/ {
            proxy_pass http://10.0.0.16:9099/fonts/Open-Sans-regular/;
            proxy_set_header Host $proxy_host# 修改轉發請求頭,讓8080端口的應用可以受到真實的請求
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }

config.yaml配置:

clusters:

  - name: druid
    guardDataCubes: true

支持選擇多個split:

dataCubes:

- name: globalsearch_and_launcher_pv_uv_for_olap
  maxSplits: 10

默認選擇metric:

defaultSelectedMeasures: [a,b]

metric format配置:

0.44444 0.00a 0.45

0.43 '(0.000 %)' 43.000 %

1230974 '($ 0.00 a)' $ 1.23 m

1234 0,0 1,234

參考http://numeraljs.com/#format

metric常用函數:

1.sum($field)

2.count()

3.countDistinct($field)        // druid提供count(distinct xxx),但是不准確,基於hyperlog來實現的。

4.filter(condition)

5.average($field)

生產問題:

1.如果有多個值的時候,需要multiValue: true。一個值的時候,則需要為false,否則pivot的類型有問題,顯示為null
    - name: __time
          title: 歸屬時間
          kind: time
          formula: $__time

2.數據去重問題:

        場景:字段修改,需要count總數

解決方法:

            a.可以根據binlog對於update或者delete操作,會發一條舊數據,clickhouse引擎標記sign-1。所以sum(sign)即可,druid會對metric相同的數據進行預聚合(啟用rollup)。
            b.如果不是binlog,可以考慮T-1離線更新。
            c.可以添加一層去重層,物化視圖基於這一層,insert select的時候判斷數據是否重復。
            d.如果不是嚴格要求,可以用hyper Unique聚合。

3.實現countif/count的效果

        $main.filter($statusCode == 500).sum($requests) / $main.sum($requests)

    4.metric獲取_time以外的數據,比如歷史全量數據。
        暫時沒有方法。
        暫時做法:將全量數據每個左表時刻都打進去,但只有對應記錄的狀態才為正確,否則為未標記。

5.metric不能使用concat,只能先定義diamession

    6.long類型不能搜索(只能范圍min-max),而string類型可以(背后調用contains函數)。
        所以druid定義schema的時候推薦都用string類型。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM