DorisDB提供了多種導入方式,用戶可以根據數據量大小、導入頻率等要求選擇最適合自己業務需求的導入方式。
數據導入:
- 1、離線數據導入:如果數據源是Hive/HDFS,推薦采用 Broker Load 導入, 如果數據表很多導入比較麻煩可以考慮使用Hive外表直連查詢,性能會比Broker load導入效果差,但是可以避免數據搬遷,如果單表的數據量特別大,或者需要做全局數據字典來精確去重可以考慮 Spark Load 導入。
- 2、實時數據導入:日志數據和業務數據庫的binlog同步到Kafka以后,優先推薦通過 Routine load 導入DorisDB,如果導入過程中有復雜的多表關聯和ETL預處理可以使用Flink處理以后用 stream load 寫入DorisDB
- 3、程序寫入DorisDB:推薦使用Stream Load
- 4、Mysql數據導入:推薦使用Mysql外表,insert into new_table select * from external_table 的方式導入
- 5、DorisDB內部導入:可以在DorisDB內部使用 insert into tablename select 的方式導入,可以跟外部調度器配合實現簡單的ETL處理
- 6、其他數據源導入:推薦使用 DataX 導入
外部表:
- DorisDB支持以外部表的形式,接入其他數據源。
- 外部表指的是保存在其他數據源中的數據表。
- 目前DorisDB已支持的第三方數據源包括 MySQL、HDFS、ElasticSearch,Hive。
重點目錄:
- 1.1 Broker Load
- 1.2 Spark Load
- 1.3 Stream Load
- 1.4 Routine Load
- 2.1 MySQL外部表
- 2.2 ElasticSearch外部表
1.1 Broker Load
在Broker Load模式下,通過部署的 Broker 程序,DorisDB可讀取對應數據源(如HDFS, S3、阿里雲 OSS、騰訊 COS)上的數據,利用自身的計算資源對數據進行預處理和導入。這是一種異步的導入方式,用戶需要通過MySQL協議創建導入,並通過查看導入命令檢查導入結果。
1、名詞解釋
- Broker:Broker 為一個獨立的無狀態進程,封裝了文件系統接口,為 DorisDB 提供讀取遠端存儲系統中文件的能力。
- Plan:導入執行計划,BE會執行導入執行計划將數據導入到DorisDB系統中。
2、語法:
-
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_name broker_properties [PROPERTIES (key1=value1, ... )] data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] broker_properties: (key2=value2, ...)
說明:
1)、Label:導入任務的標識。每個導入任務,都有一個數據庫內部唯一的Label。Label是用戶在導入命令中自定義的名稱。
- 通過這個Label,用戶可以查看對應導入任務的執行情況,並且Label可以用來防止用戶導入相同的數據。
- 當導入任務狀態為FINISHED時,對應的Label就不能再次使用了。
- 當 Label 對應的導入任務狀態為CANCELLED時,可以再次使用該Label提交導入作業。
2)、data_desc:每組 data_desc表述了本次導入涉及到的數據源地址,ETL 函數,目標表及分區等信息。
示例 :參見:https://www.kancloud.cn/dorisdb/dorisdb/2146000
1.2 Spark Load
Spark Load 通過外部的 Spark 資源實現對導入數據的預處理,提高 DorisDB 大數據量的導入性能並且節省 Doris 集群的計算資源。主要用於初次遷移、大數據量導入 DorisDB 的場景(數據量可到TB級別)
1、基本原理
Spark Load 任務的執行主要分為以下幾個階段:
- 1、用戶向 FE 提交 Spark Load 任務;
- 2、FE 調度提交 ETL 任務到 Spark 集群執行。
- 3、Spark 集群執行 ETL 完成對導入數據的預處理。包括全局字典構建(BITMAP類型)、分區、排序、聚合等。
- 4、ETL 任務完成后,FE 獲取預處理過的每個分片的數據路徑,並調度相關的 BE 執行 Push 任務。
- 5、BE 通過 Broker 讀取數據,轉化為 DorisDB 存儲格式。
- 6、FE 調度生效版本,完成導入任務。
2、預處理流程:
- 1、從數據源讀取數據,上游數據源可以是HDFS文件,也可以是Hive表。
- 2、對讀取到的數據完成字段映射、表達式計算,並根據分區信息生成分桶字段bucket_id。
- 3、根據DorisDB表的Rollup元數據生成RollupTree。
- 4、遍歷RollupTree,進行分層的聚合操作,下一個層級的Rollup可以由上一個層的Rollup計算得來。
- 5、每次完成聚合計算后,會對數據根據bucket_id進行分桶然后寫入HDFS中。
- 6、后續Broker會拉取HDFS中的文件然后導入DorisDB BE節點中。
3、基本操作參見:https://www.kancloud.cn/dorisdb/dorisdb/2146001
1.3 Stream Load
Stream Load 是一種同步的導入方式,用戶通過發送 HTTP 請求將本地文件或數據流導入到 DorisDB 中。Stream Load 同步執行導入並返回導入結果。用戶可直接通過請求的返回值判斷導入是否成功。
1、主要流程:
說明:
- Stream Load 中,用戶通過HTTP協議提交導入命令。
- 如果提交到FE節點,則FE節點會通過HTTP redirect指令將請求轉發給某一個BE節點,用戶也可以直接提交導入命令給某一指定BE節點。
- 該BE節點作為Coordinator節點,將數據按表schema划分並分發數據到相關的BE節點。
- 導入的最終結果由 Coordinator節點返回給用戶。
2、基本操作參見:https://www.kancloud.cn/dorisdb/dorisdb/2146002
1.4 Routine Load
Routine Load 是一種例行導入方式,DorisDB通過這種方式支持從Kafka持續不斷的導入數據,並且支持通過SQL控制導入任務的暫停、重啟、停止
1、基本原理
導入流程說明:
- 1、用戶通過支持MySQL協議的客戶端向 FE 提交一個Kafka導入任務。
- 2、FE將一個導入任務拆分成若干個Task,每個Task負責導入指定的一部分數據。
- 3、每個Task被分配到指定的 BE 上執行。在 BE 上,一個 Task 被視為一個普通的導入任務,通過 Stream Load 的導入機制進行導入。
- 4、BE導入完成后,向 FE 匯報。
- 5、FE 根據匯報結果,繼續生成后續新的 Task,或者對失敗的 Task 進行重試。
- 6、FE 會不斷的產生新的 Task,來完成數據不間斷的導入。
2、基本操作參見:https://www.kancloud.cn/dorisdb/dorisdb/2146003
2.1 MySQL外部表
星型模型中,數據一般划分為維度表和事實表。維度表數據量少,但會涉及UPDATE操作。目前DorisDB中還不直接支持UPDATE操作(可以通過Unique數據模型實現),在一些場景下,可以把維度表存儲在MySQL中,查詢時直接讀取維度表。
在使用MySQL的數據之前,需在DorisDB創建外部表,與之相映射。DorisDB中創建MySQL外部表時需要指定MySQL的相關連接信息,如下圖。
2.2 ElasticSearch外部表
DorisDB與ElasticSearch都是目前流行的分析系統,DorisDB強於大規模分布式計算,ElasticSearch擅長全文檢索。DorisDB支持ElasticSearch訪問的目的,就在於將這兩種能力結合,提供更完善的一個OLAP解決方案。
1、建表示例
2、謂詞下推
DorisDB支持對ElasticSearch表進行謂詞下推,把過濾條件推給ElasticSearch進行執行,讓執行盡量靠近存儲,提高查詢性能。目前支持哪個下推的算子如下表。
3、詳細說明參見:https://www.kancloud.cn/dorisdb/dorisdb/2146013
參考資料: