4.pg_bulkload 數據加載使用及示例


1.pg_bulkload 概述

1.1 pg_bulkload 介紹

pg_bulkload是一種用於PostgreSQL的高速數據加載工具,相比copy命令。最大的優勢就是速度。優勢在讓我們跳過shared buffer,wal buffer。直接寫文件。pg_bulkload的direct模式就是這種思路來實現的,它還包含了數據恢復功能,即導入失敗的話,需要恢復。
pg_bulkload 旨在將大量數據加載到數據庫中。您可以選擇是否檢查數據庫約束以及在加載期間忽略多少錯誤。例如,當您將數據從另一個數據庫復制到 PostgreSQL 時,您可以跳過性能完整性檢查。另一方面,您可以在加載不干凈的數據時啟用約束檢查。

pg_bulkload 的最初目標是COPY在 PostgreSQL 中更快地替代命令,但 3.0 或更高版本具有一些 ETL 功能,例如輸入數據驗證和具有過濾功能的數據轉換。
在 3.1 版本中,pg_bulkload 可以將加載數據轉換成二進制文件,作為 pg_bulkload 的輸入文件。如果在將加載數據轉換成二進制文件時檢查加載數據是否有效,從二進制文件加載到表時可以跳過檢查。這將減少加載時間本身。同樣在 3.1 版中,並行加載比以前更有效。

pg_bulkload 加載數據時,在內部它調用PostgreSQL 的用戶定義函數 pg_bulkload() 並執行加載。pg_bulkload() 函數將在 pg_bulkload 安裝期間安裝。

1.2 性能對比

從 COPY 和 pg_bulkload 加載數據的性能對比來看,WRITER = PARALLEL 模式下的 Pg_bulkload 的數據加載效果幾乎是COPY一倍。另外沒有索引的COPY並不比有索引的COPY快。因為它必須從初始開始為表的總記錄創建索引。

另外:
1.PostgreSQL 參數 maintenance_work_mem 會影響 pg_bulkload 的性能。如果將此參數從 64 MB 更改為 1 GB,則持續時間將縮短近 15%。
2.FILTER 功能在各種操作中轉換輸入數據,但它不是免費的。實際測量顯示,SQL 函數的加載時間增加了近 240%,而 C 函數的加載時間增加了近 140%。

2.pg_bulkload架構圖

2.1 架構圖

pg_bulkload主要包括兩個模塊:reader和writer。reader負責讀取文件、解析tuple,writer負責把解析出的tuple寫入輸出源中。pg_bulkload最初的版本功能很簡單,只是加載數據。3.1版本增加了數據過濾的功能。

2.2 控制文件參數

在使用pg_bulkload時可以指定以下加載選項。控制文件可以用絕對路徑或相對路徑來指定。如果您通過相對路徑指定它,它將相對於執行 pg_bulkload 命令的當前工作目錄。如果你沒有指定控制文件,你應該通過 pg_bulkload 的命令行參數傳遞所需的選項。

部份參數說明
#TYPE = CSV | BINARY | FIXED | FUNCTION  
CSV : 從 CSV 格式的文本文件加載 , 默認為CSV 
BINARY | FIXED:從固定的二進制文件加載
FUNCTION :從函數的結果集中加載。如果使用它,INPUT 必須是調用函數的表達式。   

# WRITER | LOADER = DIRECT | BUFFERED | BINARY | PARALLEL
DIRECT :將數據直接加載到表中。繞過共享緩沖區並跳過 WAL 日志記錄,但需要自己的恢復過程。這是默認的,也是原始舊版本的模式。
BUFFERED:通過共享緩沖區將數據加載到表中。使用共享緩沖區,寫入WAL,並使用原始 PostgreSQL WAL 恢復。
BINARY :將數據轉換為二進制文件,該文件可用作要從中加載的輸入文件。創建加載輸出二進制文件所需的控制文件樣本。此示例文件創建在與二進制文件相同的目錄中,其名稱為 <binary-file-name>.ctl。
PARALLEL:與“WRITER=DIRECT”和“MULTI_PROCESS=YES”相同。如果指定了 PARALLEL,則忽略MULTI_PROCESS。如果為要加載的數據庫配置了密碼驗證,則必須設置密碼文件。

# TRUNCATE = YES | NO
         如果YES,則使用 TRUNCATE 命令從目標表中刪除所有行。如果NO,什么也不做。默認為NO。您不能同時指定“WRITER=BINARY”和 TRUNCATE。

# CHECK_CONSTRAINTS = YES | NO
        指定在加載期間是否檢查 CHECK 約束。默認為否。您不能同時指定“WRITER=BINARY”和 CHECK_CONSTRAINTS。

# PARSE_ERRORS = n
        在解析、編碼檢查、編碼轉換、FILTER 函數、CHECK 約束檢查、NOT NULL 檢查或數據類型轉換期間引發錯誤的 ingored 元組的數量。無效的輸入元組不會加載並記錄在 PARSE BADFILE 中。默認值為 0。如果解析錯誤數等於或多於該值,則提交已加載的數據並且不加載剩余的元組。0 表示不允許錯誤,-1 和 INFINITE 表示忽略所有錯誤。

# SKIP | OFFSET = n
跳過輸入行的數量。默認值為 0。您不能同時指定“TYPE=FUNCTION”和 SKIP。

# LIMIT | LOAD = n
要加載的行數。默認值為INFINITE,即將加載所有數據。即使您使用 TYPE=FUNCTION,此選項也可用。

#ENCODING = encoding
    指定輸入數據的編碼。檢查指定的編碼是否有效,如果需要,將輸入數據轉換為數據庫編碼。默認情況下,輸入數據的編碼既不驗證也不轉換。

#FILTER = [ schema_name. ] function_name [ (argtype, ... ) ]
    指定過濾函數以轉換輸入文件中的每一行。只要函數名在數據庫中是唯一的,就可以省略 argtype 的定義。如果未指定,則直接將輸入數據解析為加載目標表。另請參閱如何編寫 FILTER 函數以生成 FILTER 函數。
    不能同時指定“TYPE=FUNCTION”和 FILTER。此外,CSV 選項中的 FORCE_NOT_NULL 不能與 FILTER 選項一起使用。

2.3 pg_bulkload狀態結果

成功加載時,pg_bulkload 返回 0。當存在一些解析錯誤或重復錯誤時,即使加載本身已完成,它也會返回 3 和 WARNING 消息。請注意,跳過的行和替換的行(ON_DUPLICATE_KEEP = NEW)不被視為錯誤;退出代碼將為 0。
當出現不可持續的錯誤時,加載程序會引發 ERROR 消息。返回碼通常為 1,因為在加載數據期間數據庫服務器中發生了許多錯誤。下表顯示了 pg_bulkload 可以返回的代碼。

返回碼 描述
0 成功
1 在 PostgreSQL 中運行 SQL 時發生錯誤
2 無法連接到 PostgreSQL
3 成功,但有些數據無法加載

3.pg_bulkload 安裝

3.1 依賴包

安裝pg_bulkload  需要以下依賴包,一般在安裝好PG數據庫的環境中,已安裝好相關包,所以我們可以直接安裝就可以。
PostgreSQL devel package : postgresqlxx-devel(RHEL), postgresql-server-dev-x.x(Ubuntu)
PAM devel package : pam-devel(RHEL), libpam-devel(Ubuntu)
Readline devel or libedit devel package : readline-devel or libedit-devel(RHEL), libreadline-dev or libedit-dev(Ubuntu)
C compiler and build utility : "Development Tools" (RHEL), build-essential(Ubuntu)

3.2 安裝pg_bulkload

# 下載連接 <a href="https://github.com/ossc-db/pg_bulkload" style="text-decoration-skip-ink: none; font-family: &quot;Microsoft YaHei&quot;, Lato, &quot;PingFang SC&quot;, sans-serif;">https://github.com/ossc-db/pg_bulkload</a> 
postgres@s2ahumysqlpg01-> unzip pg_bulkload-VERSION3_1_19.zip 
postgres@s2ahumysqlpg01-> cd pg_bulkload-VERSION3_1_19
postgres@s2ahumysqlpg01-> make
postgres@s2ahumysqlpg01-> make install

# 創建擴展 ,要使用它需要建extension
postgres@s2ahumysqlpg01-> psql -h 127.0.0.1 
psql (12.4)
Type "help" for help.

lottu=# create extension pg_bulkload;
CREATE EXTENSION

#檢查已安裝插件
postgres=# \dx

4. pg_bulkload 參數

postgres@s2ahumysqlpg01-> pg_bulkload --help
pg_bulkload is a bulk data loading tool for PostgreSQL

Usage:
  Dataload: pg_bulkload [dataload options] control_file_path
  Recovery: pg_bulkload -r [-D DATADIR]

Dataload options:
  -i, --input=INPUT                INPUT path or function
  -O, --output=OUTPUT        OUTPUT path or table
  -l, --logfile=LOGFILE           LOGFILE path
  -P, --parse-badfile=*          PARSE_BADFILE path
  -u, --duplicate-badfile=*    DUPLICATE_BADFILE path
  -o, --option="key=val"        additional option

Recovery options:
  -r, --recovery            execute recovery
  -D, --pgdata=DATADIR      database directory

Connection options:
  -d, --dbname=DBNAME         database to connect
  -h, --host=HOSTNAME           database server host or socket directory
  -p, --port=PORT                      database server port
  -U, --username=USERNAME   user name to connect as
  -w, --no-password                   never prompt for password
  -W, --password                        force password prompt

Generic options:
  -e, --echo                     echo queries
  -E, --elevel=LEVEL        set output message level
  --help                       show this help, then exit
  --version                 output version information, then exit

Read the website for details. <http://github.com/ossc-db/pg_bulkload>
Report bugs to <http://github.com/ossc-db/pg_bulkload/issues>.

5. pg_bulkload 使用

5.1 初始化數據

postgres@s2ahumysqlpg01-> psql -h 127.0.0.1
psql (12.4)
Type "help" for help.

postgres=# create database testdb ;
CREATE DATABASE
postgres=# \c testdb
You are now connected to database "testdb" as user "postgres".
testdb=# 
testdb=# create table tb_asher (id int,name text);
CREATE TABLE
testdb=# \d 
          List of relations
 Schema |   Name   | Type  |  Owner   
--------+----------+-------+----------
 public | tb_asher | table | postgres
(1 row)

testdb=# create extension pg_bulkload;  #如果連接指定到單個庫時,需要創建擴展以生成 pgbulkload.pg_bulkload() 函數
CREATE EXTENSION

test=# quit

# 模擬CSV 文件 
postgres@s2ahumysqlpg01-> seq 100000| awk '{print $0"|asher"}' > bulk_asher.txt
postgres@s2ahumysqlpg01-> more   bulk_asher.txt
1|asher
2|asher
3|asher
4|asher
5|asher
6|asher

5.2 加載到指定表

# 將bulk_asher.txt里的數據加載到testdb 庫下的 tb_asher表中
postgres@s2ahumysqlpg01->  pg_bulkload -i /home/postgres/bulk_asher.txt -O tb_asher  -l /home/postgres/tb_asher_output.log -P /home/postgres/tb_asher_bad.txt  -o "TYPE=CSV" -o "DELIMITER=|" -d testdb -U postgres  -h 127.0.0.1
NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
        0 Rows skipped.
        100000 Rows successfully loaded.
        0 Rows not loaded due to parse errors.
        0 Rows not loaded due to duplicate errors.
        0 Rows replaced with new rows.

# 查看導入日志:
postgres@s2ahumysqlpg01-> cat /home/postgres/tb_asher_output.log 

pg_bulkload 3.1.19 on 2022-02-20 16:32:28.642071+08

INPUT = /home/postgres/bulk_asher.txt
PARSE_BADFILE = /home/postgres/tb_asher_bad.txt
LOGFILE = /home/postgres/tb_asher_output.log
LIMIT = INFINITE
PARSE_ERRORS = 0
CHECK_CONSTRAINTS = NO
TYPE = CSV
SKIP = 0
DELIMITER = |
QUOTE = "\""
ESCAPE = "\""
NULL = 
OUTPUT = public.tb_asher
MULTI_PROCESS = NO
VERBOSE = NO
WRITER = DIRECT
DUPLICATE_BADFILE = /u01/postgresql/data_bak/pg_bulkload/20220220163228_testdb_public_tb_asher.dup.csv
DUPLICATE_ERRORS = 0
ON_DUPLICATE_KEEP = NEW
TRUNCATE = NO


  0 Rows skipped.
  100000 Rows successfully loaded.
  0 Rows not loaded due to parse errors.
  0 Rows not loaded due to duplicate errors.
  0 Rows replaced with new rows.

Run began on 2022-02-20 16:32:28.642071+08
Run ended on 2022-02-20 16:32:28.743741+08

CPU 0.02s/0.04u sec elapsed 0.10 sec

5.3 先清空在加載

#增加了 -o "TRUNCATE=YES" 參數
postgres@s2ahumysqlpg01-> pg_bulkload -i /home/postgres/bulk_asher.txt -O tb_asher  -l /home/postgres/tb_asher_output.log -P /home/postgres/tb_asher_bad.txt  -o "TYPE=CSV" -o "DELIMITER=|" -o "TRUNCATE=YES"  -d testdb -U postgres -h 127.0.0.1
4NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
        0 Rows skipped.
        100000 Rows successfully loaded.
        0 Rows not loaded due to parse errors.
        0 Rows not loaded due to duplicate errors.
        0 Rows replaced with new rows.

# 數據查詢 
postgres@s2ahumysqlpg01-> psql -h127.0.0.1 -d testdb  -c "select count(1) from tb_asher ;"
 count  
--------
 100000
(1 row)

5.4 使用控制文件

# 新建控制文件 ,可以根據之前加載時,產生的日志文件tb_asher_output.log來更改,去掉里面沒有值的參數  NULL = 
vi  asher.ctl 
INPUT = /home/postgres/bulk_asher.txt
PARSE_BADFILE = /home/postgres/tb_asher_bad.txt
LOGFILE = /home/postgres/tb_asher_output.log
LIMIT = INFINITE
PARSE_ERRORS = 0
CHECK_CONSTRAINTS = NO
TYPE = CSV
SKIP = 0
DELIMITER = |
QUOTE = "\""
ESCAPE = "\""
OUTPUT = public.tb_asher
MULTI_PROCESS = NO
VERBOSE = NO
WRITER = DIRECT
DUPLICATE_BADFILE = /u01/postgresql/data_bak/pg_bulkload/20220220164822_testdb_public_tb_asher.dup.csv
DUPLICATE_ERRORS = 0
ON_DUPLICATE_KEEP = NEW
TRUNCATE = YES

# 使用控制文件來加載
postgres@s2ahumysqlpg01-> pg_bulkload  /home/postgres/asher.ctl -d testdb -U postgres -h 127.0.0.1
NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
        0 Rows skipped.
        100000 Rows successfully loaded.
        0 Rows not loaded due to parse errors.
        0 Rows not loaded due to duplicate errors.
        0 Rows replaced with new rows.

# 數據查詢 
postgres@s2ahumysqlpg01-> psql -h127.0.0.1 -d testdb  -c "select count(1) from tb_asher ;"
 count  
--------
 100000
(1 row)

5.5 強制寫wal 日志

# pg_bulkload 默認是跳過buffer 直接寫文件 ,但時如果有復制 ,或者需要基本wal日志恢復時沒有wal日志是不行的,這是我們可以強制讓其寫wal日志 ,只需要加載 -o "WRITER=BUFFERED" 參數就可以了
 pg_bulkload -i /home/postgres/bulk_asher.txt -O tb_asher  -l /home/postgres/tb_asher_output.log -P /home/postgres/tb_asher_bad.txt  -o "TYPE=CSV" -o "DELIMITER=|" -o "TRUNCATE=YES" -o "WRITER=BUFFERED"  -d testdb -U postgres -h 127.0.0.1

NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
        0 Rows skipped.
        100000 Rows successfully loaded.
        0 Rows not loaded due to parse errors.
        0 Rows not loaded due to duplicate errors.
        0 Rows replaced with new rows.

6.其它

6.1 注意事項

如果您使用直接加載模式(WRITER=DIRECT 或 PARALLEL),您必須注意以下事項:

1.當 MULTI_PROCESS=YES 並且需要密碼才能從 localhost 連接到數據庫進行加載時,即使在提示中正確輸入密碼,身份驗證也會失敗。為避免這種情況,請配置以下任一項。
    a.在 UNIX 中,將以下行添加到 pg_hba.conf。
    b.在 .pgpass 文件中指定密碼  
      127.0.0.1:*:foo:foopass
    c.不要使用“WRITER=PARALLE”
2.PITR/Replication :由於繞過了 WAL,PITR 的歸檔恢復不可用。這並不意味着它可以在沒有加載表數據的情況下完成 PITR。
                   如果您想使用 PITR,請在通過 pg_bulkload 加載后對數據庫進行完整備份。如果您使用流式復制,則需要根據 pg_bulkload 之后的備份集重新創建備用數據庫。
3.不得刪除在 $PGDATA/pg_bulkload目錄中找到的加載狀態文件 (*.loadstatus)。在 pg_bulkload 崩潰恢復中需要這個文件。
4.盡量不要使用 " kill -9" 終止 pg_bulkload 命令。如果您這樣做了,您必須調用 postgresql 腳本來執行 pg_bulkload 恢復並重新啟動 PostgreSQL 以繼續。
5.默認情況下,在數據加載期間僅強制執行唯一約束和非空約束。您可以設置“CHECK_CONSTRAINTS=YES”來檢查 CHECK 約束。無法檢查外鍵約束。用戶有責任提供有效的數據集。
6. maintenance_work_mem會影響 pg_bulkload 的性能。如果將此參數從 64 MB 更改為 1 GB,則持續時間將縮短近 15%。

6.2 編輯過濾器函數

編寫 FILTER 函數時有一些注意事項和警告:

1.輸入文件中的記錄被一一傳遞給 FILTER 函數。
2.當 FILTER 函數發生錯誤時,傳遞的記錄不會加載並寫入 PARSE BADFILE。
3.FILTER 函數必須返回記錄類型或某種復合類型。此外,實際記錄類型必須與目標表定義匹配。
4.如果 FILTER 函數返回 NULL,則加載所有列中都有 NULL 的記錄。
5.支持具有默認參數的函數。如果輸入數據的列數少於函數的參數,則將使用默認值。
6.不支持 VARIADIC 函數。
7.不支持 SETOF 函數。
8.不支持具有泛型類型(any、anyelement 等)的函數。
9.FILTER 函數可以用任何語言實現。SQL、C、PL 都可以,但你應該盡可能快地編寫函數,因為它們會被多次調用。
10.您只能指定 FILTER 或 FORCE_NOT_NULL 選項之一。如果您需要該功能,請重新實現與 FORCE_NOT_NULL 兼容的 FILTER 功能。
# Filter 示例:
CREATE FUNCTION sample_filter(integer, text, text, real DEFAULT 0.05) RETURNS record
    AS $$ SELECT $1 * $4, upper($3) $$
    LANGUAGE SQL;

7.參考連接

https://github.com/ossc-db/pg_bulkload
http://ossc-db.github.io/pg_bulkload/index.html
http://ossc-db.github.io/pg_bulkload/pg_bulkload.html






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM