Doris導數


概述

Doris現在支持Broker load/routine load/stream load/mini batch load等多種導入方式。
spark load主要用於解決初次遷移,大量數據遷移doris的場景,用於提升數據導入的速度。

導入方式

所有導入方式都支持 csv 數據格式。其中 Broker load 還支持 parquet 和 orc 數據格式。

  • Broker load
    通過 Broker 進程訪問並讀取外部數據源(如 HDFS)導入到 Doris。用戶通過 Mysql 協議提交導入作業后,異步執行。通過 SHOW LOAD 命令查看導入結果。
  • Stream load
    用戶通過 HTTP 協議提交請求並攜帶原始數據創建導入。主要用於快速將本地文件或數據流中的數據導入到 Doris。導入命令同步返回導入結果。
  • Insert
    類似 MySQL 中的 Insert 語句,Doris 提供 INSERT INTO tbl SELECT ...; 的方式從 Doris 的表中讀取數據並導入到另一張表。或者通過 INSERT INTO tbl VALUES(...); 插入單條數據。
  • Multi load
    用戶通過 HTTP 協議提交多個導入作業。Multi Load 可以保證多個導入作業的原子生效。
  • Routine load
    用戶通過 MySQL 協議提交例行導入作業,生成一個常駐線程,不間斷的從數據源(如 Kafka)中讀取數據並導入到 Doris 中。

導入執行流程:

Label 和 原子性:
Doris 對所有導入操作保證原子性,要么全部成功要么全部失敗,不會出現只導數一部分的情況。每一個導數作業會用戶指定或者系統自動生成一個label。label在一個database中唯一,導入成功之后就不能再重復使用了,導入失敗則可以重復使用。

同步和異步:
Doris 目前的導入方式分為兩類,同步和異步。
同步:
用戶直接根據創建任務的返回結果判定是否成功導入。 同步類型的導入方式有: Stream load,Insert。
具體操作步驟如下:

  1. 用戶(外部系統)創建導入任務。
  2. Doris 返回導入結果。
  3. 用戶(外部系統)判斷導入結果,如果失敗可以再次提交導入任務。

異步:
導入任務會被異步執行,用戶在創建成功后,需要通過輪詢的方式發送查看命令查看導入作業的狀態。如果創建失敗,則可以根據失敗信息,判斷是否需要再次創建。
異步的話就是不知道最終結果,需要輪訓獲取執行結果,類比肯德基點餐,每次拿小紙條去詢問是否做餐完畢。
異步類型的導入方式有:Broker load,Multi load。

  1. 用戶(外部系統)創建導入任務。
  2. Doris 返回導入創建結果。
  3. 用戶(外部系統)判斷導入創建結果,成功則進入4,失敗回到重試創建導入,回到1。
  4. 用戶(外部系統)輪詢查看導入任務,直到狀態變為 FINISHED 或 CANCELLED。

至於重試次數不應該無限制重試,外部系統在有限次數重試並失敗后,保留失敗信息(記錄日志,存入MySQL),大部分多次重試均失敗問題都是使用方法問題或數據本身問題。

內存限制:
既然涉及到導數就必須限制每個任務的內存使用,這個和CK 相比是優點, CK 只有全局控制內存,沒有單個任務級別控制內存使用。
一個導入任務會分布到多個BE上面執行, 內存參數限制的是每個BE節點的內存限制,而不是整個集群的內存限制。
較小的內存限制可能會影響導入效率,因為導入流程可能會因為內存達到上限而頻繁的將內存中的數據寫回磁盤。而過大的內存限制可能導致當導入並發較高時,系統OOM。所以,需要根據需求,合理的設置導入的內存限制。

最佳實踐:

  1. 選擇合適的導入方式:根據數據源所在位置選擇導入方式。例如:如果原始數據存放在 HDFS 上,則使用 Broker load 導入。
  2. 確定導入方式的協議:如果選擇了 Broker load 導入方式,則外部系統需要能使用 MySQL 協議定期提交和查看導入作業。
  3. 確定導入方式的類型:導入方式為同步或異步。比如 Broker load 為異步導入方式,則外部系統在提交創建導入后,必須調用查看導入命令,根據查看導入命令的結果來判斷導入是否成功。
  4. 制定 Label 生成策略:Label 生成策略需滿足,每一批次數據唯一且固定的原則。這樣 Doris 就可以保證 At-Most-Once。
  5. 程序自身保證 At-Least-Once:外部系統需要保證自身的 At-Least-Once,這樣就可以保證導入流程的 Exactly-Once。

通用系統配置
FE 配置
以下配置屬於 FE 的系統配置,可以通過修改 FE 的配置文件 fe.conf 來修改配置。

  • max_load_timeout_second 和 min_load_timeout_second

這兩個配置含義為:最大的導入超時時間,最小的導入超時時間,以秒為單位。默認的最大超時時間為3天, 默認的最小超時時間為1秒。用戶自定義的導入超時時間不可超過這個范圍。該參數通用於所有的導入方式。

  • desired_max_waiting_jobs

在等待隊列中的導入任務個數最大值,默認為100。當在 FE 中處於 PENDING 狀態(也就是等待執行的)導入個數超過該值,新的導入請求則會被拒絕。

此配置僅對異步執行的導入有效,當異步執行的導入等待個數超過默認值,則后續的創建導入請求會被拒絕。

  • max_running_txn_num_per_db

這個配置的含義是說,每個 Database 中正在運行的導入最大個數(不區分導入類型,統一計數)。默認的最大導入並發為 100。當當前 Database 正在運行的導入個數超過最大值時,后續的導入不會被執行。如果是同步導入作業,則導入會被拒絕。如果是異步導入作業。則作業會在隊列中等待。

BE 配置
以下配置屬於 BE 的系統配置,可以通過修改 BE 的配置文件 be.conf 來修改配置。

  • push_write_mbytes_per_sec

BE 上單個 Tablet 的寫入速度限制。默認是 10,即 10MB/s。通常 BE 對單個 Tablet 的最大寫入速度,根據 Schema 以及系統的不同,大約在 10-30MB/s 之間。可以適當調整這個參數來控制導入速度。

  • write_buffer_size

導入數據在 BE 上會先寫入一個 memtable,memtable 達到閾值后才會寫回磁盤。默認大小是 100MB。過小的閾值可能導致 BE 上存在大量的小文件。可以適當提高這個閾值減少文件數量。但過大的閾值可能導致 RPC 超時,見下面的配置說明。

  • tablet_writer_rpc_timeout_sec

導入過程中,發送一個 Batch(1024行)的 RPC 超時時間。默認 600 秒。因為該 RPC 可能涉及多個 memtable 的寫盤操作,所以可能會因為寫盤導致 RPC 超時,可以適當調整這個超時時間來減少超時錯誤(如 send batch fail 錯誤)。同時,如果調大 write_buffer_size 配置,也需要適當調大這個參數。

  • streaming_load_rpc_max_alive_time_sec

在導入過程中,Doris 會為每一個 Tablet 開啟一個 Writer,用於接收數據並寫入。這個參數指定了 Writer 的等待超時時間。如果在這個時間內,Writer 沒有收到任何數據,則 Writer 會被自動銷毀。當系統處理速度較慢時,Writer 可能長時間接收不到下一批數據,導致導入報錯:TabletWriter add batch with unknown id。此時可適當增大這個配置。默認為 600 秒。

  • load_process_max_memory_limit_bytes 和 load_process_max_memory_limit_percent

這兩個參數,限制了單個 Backend 上,可用於導入任務的內存上限。分別是最大內存和最大內存百分比。load_process_max_memory_limit_percent 默認為 80,表示對 Backend 總內存限制的百分比(總內存限制 mem_limit 默認為 80%,表示對物理內存的百分比)。即假設物理內存為 M,則默認導入內存限制為 M * 80% * 80%。

  • load_process_max_memory_limit_bytes 默認為 100GB。系統會在兩個參數中取較小者,作為最終的 Backend 導入內存使用上限。

  • label_keep_max_second

設置導入任務記錄保留時間。已經完成的( FINISHED or CANCELLED )導入任務記錄會保留在 Doris 系統中一段時間,時間由此參數決定。參數默認值時間為3天。該參數通用與所有類型的導入任務。

批量刪除

使用delete 語句的方式刪除時,每執行一次delete 都會生成一個新的數據版本,如果頻繁刪除會嚴重影響查詢性能,並且在使用delete 方式刪除時,是通過生成一個空的rowset來記錄刪除條件實現,每次讀取都要對刪除跳條件進行過濾,同樣在條件較多時會對性能造成影響。
所以批量刪除解決的是傳統delete語句,刪除性能很差,具體為啥普通 delete性能很差, 根本原因是delete操作會生成多個版本,並且Doris采取, 記錄刪除條件的方式,然后在查詢時過濾刪除條件實現,所以普通 delete性能很差。
數據導入有三種合並方式:

  1. APPEND: 數據全部追加到現有數據中
  2. DELETE: 刪除所有與導入數據key 列值相同的行
  3. MERGE: 根據 DELETE ON 的決定 APPEND 還是 DELETE

原理:
批量刪除只能使用到 unique模型, 本質上還是通過邏輯刪除實現
通過增加 DELETE_SIGN 隱藏列,該列是一個bool類型,replace 的隱藏列,當標記為刪除時將該值標記為 true,比如剛開始這些KEY 對應的這個字段為 false , 當發生delete的時候, 該字段會變更為 true
具體使用手段如下, select * 時把該字段屏蔽掉,並且自動加上 DELETE_SIGN != true 過濾掉已經刪除的數據

具體涉及到的流程如下:

  • 導入 將隱藏列設置為 delete 語句
  • 讀取 所有存在隱藏列的加上 DELETE_SIGN != true
  • Cumulative Compaction 將隱藏列看作正常的列處理,Compaction邏輯沒有變化
  • Base Compaction 將標記為刪除的行的刪掉,以減少數據占用的空間

stream load:
stream load 的寫法在在header 中的 columns 字段增加一個設置刪除標記列的字段, 示例 -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"
stream load 導入是順序執行的, 其他導入不是順序執行,其他的若要支持 merge, 則必須是保證順序執行,這樣才能保證 delete 不會把后面新插入的數據,誤刪除掉;其他的導數方式需要結合 load sequence 使用。

broker load:
在PROPERTIES 處設置刪除標記列的字段

LOAD LABEL db1.label1
(
    [MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1,tmp_c2, label_c3)
    SET
    (
        id=tmp_c2,
        name=tmp_c1,
    )
    [DELETE ON label=true]

)
WITH BROKER 'broker'
(
    "username"="user",
    "password"="pass"
)
PROPERTIES
(
    "timeout" = "3600"
    
);

routine load

啟動批量刪除支持
批量刪除只支持 unique引擎, 並且必須指定詳細的 unique key

  1. 通過在fe 配置文件中增加enable_batch_delete_by_default=true 重啟fe 后新建表的都支持批量刪除,此選項默認為false
  2. 對於沒有更改上述fe 配置或對於以存在的不支持批量刪除功能的表,可以使用如下語句: ALTER TABLE tablename ENABLE FEATURE "BATCH_DELETE" 來啟用批量刪除。本操作本質上是一個schema change 操作,操作立即返回,可以通過show alter table column 來確認操作是否完成。
    如何確定已經生效 , SET show_hidden_columns=true
    desc tablename 如下出現了 DORIS_DELETE_SIGN 列則說明是支持批量刪除的

使用示例:

  1. 正常導入數據:
    curl --location-trusted -u 'root:xxxx' -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: APPEND" -T ./table1_data http://172.26.xx.143:8030/api/rtdw_bm/table1_unique/_stream_load
    merge_type: APPEND 是默認行為 可以不設置

  2. 將與導入數據key 相同的數據全部刪除, 以下本質就是 和 table1_data 文件中的數據做對比, 把按照 UNIQUE KEY 一樣的數據置為邏輯刪除,也就是 DORIS_DELETE_SIGN = 1
    curl --location-trusted -u 'root:xxxx' -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: DELETE" -T ./table1_data http://172.26.xxx.143:8030/api/rtdw_bm/table1_unique/_stream_load
    執行完畢效果如下:

  3. 將導入數據中與site_id=1 的行的key列相同的行, 主要行為是 merge 也就是根據 delete: 指定的條件進行邏輯刪除,其他的新增插入
    curl --location-trusted -u 'root:xxxxx' -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: MERGE" -H "delete: siteid=1" -T ./table1_data http://172.26.xxx.143:8030/api/rtdw_bm/table1_unique/_stream_load

Broker Load

Broker load 是一個異步的導入方式,支持的數據源取決於 Broker 進程支持的數據源。
用戶需要通過 MySQL 協議 創建 Broker load 導入,並通過查看導入命令檢查導入結果。

適用場景:
1、 源數據在 Broker 可以訪問的存儲系統中,如 HDFS。
2、 數據量在 幾十到百GB 級別。對於大數據量場景時間較長,異步方式可以提高用戶體驗度。
原理:

創建導入:

LOAD LABEL db_name.label_name 
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]

* data_desc:

    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY separator ]
    [(col1, ...)]
    [PRECEDING FILTER predicate]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]

* broker_properties: 

    (key1=value1, ...)
LOAD LABEL db1.label1
(
    DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1,tmp_c2)
    SET
    (
        id=tmp_c2,
        name=tmp_c1
    ),
    DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
    INTO TABLE tbl2
    COLUMNS TERMINATED BY ","
    (col1, col2)
    where col1 > 1
)
WITH BROKER 'broker'
(
    "username"="user",
    "password"="pass"
)
PROPERTIES
(
    "timeout" = "3600"
);

注意事項:
1、 多張表導入也是支持的, 保證多個表導入的原子性
2、 timeout 導入超時時間,由於是異步執行,當超時時作業狀態會被改為 CANCELLED,默認超時時間是 4小時
3、 max_filter_ratio 可以設置容忍率 ,容忍部分數據失敗
4、merge_type 數據的合並類型,一共支持三種類型APPEND、DELETE、MERGE 其中,APPEND是默認值,表示這批數據全部需要追加到現有數據中,DELETE 表示刪除與這批數據key相同的所有行,MERGE 語義 需要與delete 條件聯合使用,表示滿足delete 條件的數據按照DELETE 語義處理其余的按照APPEND 語義處理

查看導入結果 :
show load order by createtime desc limit 1\G

取消導入:
HELP CANCEL LOAD

性能分析
可以在提交 LOAD 作業前,先執行 set is_report_success=true 打開會話變量 ,打開后可以在 Doris 管理界面的 querys中查看 profile信息

實操:
1, 創建一個Doris表

CREATE TABLE `t10_unique` (
`name` varchar(32) NULL DEFAULT "" COMMENT "",
  `age1` int(11) NULL DEFAULT "10" COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`name`, `age1`)
COMMENT "t10_unique"
DISTRIBUTED BY HASH(`name`) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
  1. 創建一個load任務
    注意 HDFS 必須要定義到具體的文件不能是目錄
    (name,age) 此處指定的是 hive中的字段, 后續的 set可以設置對字段進行函數轉換
    此處有的 broker名字 Broker_Doris 必須每個真實集群的broker的名字。Broker 為一個獨立的無狀態進程。封裝了文件系統接口,提供 Doris 讀取遠端存儲系統中文件的能力。
    set 屬性設置: 代表獲取在parquet或orc中以(tmp_c1, tmp_c2)為列名的列,映射到doris表中的(id, name)列。如果沒有設置set, 則以column中的列作為映射。
    也就是沒有指定 set 則 以 (tmp_c1,tmp_c2) 名字賦值,沒對應上的 設置默認值
    有 SET 的話,則以 set中的賦值條件為准。
    (tmp_c1,tmp_c2)
    SET
    (
    id=tmp_c2,
    name=tmp_c1
    )

LOAD LABEL rtdw_bm.xxxxxx5
(
DATA INFILE("hdfs://172.24.28.65:9000/user/hive/warehouse/fff.db/t10/*")
INTO TABLE t10_unique
COLUMNS TERMINATED BY "|"
(name,age)
SET
(
name=name,
age1=age
)
where name = 'a'
)
WITH BROKER 'Broker_Doris'
("username"="yyy", "password"="yyy") PROPERTIES
(
"timeout" = "3600"
);

效果圖:

select  * from t10_unique;

驗證導入結果:
show load order by createtime desc limit 1\G

Routine Load

例行導入(Routine Load)功能為用戶提供了一種自動從指定數據源進行數據導入的功能。

Stream load

基本原理

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

Header 中支持屬性見下面的 ‘導入任務參數’ 說明
格式為: -H "key1:value1"
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/stream_load
導入任務參數
label : 導入任務的標識。
max_filter_ratio:導入任務的最大容忍率,默認為0容忍,取值范圍是0~1。
where : 導入任務指定的過濾條件。
partition:待導入表的 Partition 信息,
columns: 待導入數據的函數變換配置,目前 Stream load 支持的函數變換方法包含列的順序變化以及表達式變換,其中表達式變換的方法與查詢語句的一致。
列順序變換例子:原始數據有兩列,目前表也有兩列(c1,c2)但是原始文件的第一列對應的是目標表的c2列, 而原始文件的第二列對應的是目標表的c1列,則寫法如下:
columns: c2,c1
表達式變換例子:原始文件有兩列,目標表也有兩列(c1,c2)但是原始文件的兩列均需要經過函數變換才能對應目標表的兩列,則寫法如下:
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
其中 tmp
*是一個占位符,代表的是原始文件中的兩個原始列。
exec_mem_limit: 導入內存限制。默認為 2GB,單位為字節。
strict_mode: strict mode 模式的意思是:對於導入過程中的列類型轉換進行嚴格過濾。
應用場景:
使用 Stream load 的最合適場景就是原始文件在內存中,或者在磁盤中。其次,由於 Stream load 是一種同步的導入方式,所以用戶如果希望用同步方式獲取導入結果,也可以使用這種導入。
curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8000/api/bj_sales/store_sales/_stream_load


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM