1.pg_bulkload 概述
1.1 pg_bulkload 介紹
pg_bulkload是一種用於PostgreSQL的高速數據加載工具,相比copy命令。最大的優勢就是速度。優勢在讓我們跳過shared buffer,wal buffer。直接寫文件。pg_bulkload的direct模式就是這種思路來實現的,它還包含了數據恢復功能,即導入失敗的話,需要恢復。
pg_bulkload 旨在將大量數據加載到數據庫中。您可以選擇是否檢查數據庫約束以及在加載期間忽略多少錯誤。例如,當您將數據從另一個數據庫復制到 PostgreSQL 時,您可以跳過性能完整性檢查。另一方面,您可以在加載不干凈的數據時啟用約束檢查。
pg_bulkload 的最初目標是COPY在 PostgreSQL 中更快地替代命令,但 3.0 或更高版本具有一些 ETL 功能,例如輸入數據驗證和具有過濾功能的數據轉換。
在 3.1 版本中,pg_bulkload 可以將加載數據轉換成二進制文件,作為 pg_bulkload 的輸入文件。如果在將加載數據轉換成二進制文件時檢查加載數據是否有效,從二進制文件加載到表時可以跳過檢查。這將減少加載時間本身。同樣在 3.1 版中,並行加載比以前更有效。
pg_bulkload 加載數據時,在內部它調用PostgreSQL 的用戶定義函數 pg_bulkload() 並執行加載。pg_bulkload() 函數將在 pg_bulkload 安裝期間安裝。
1.2 性能對比
從 COPY 和 pg_bulkload 加載數據的性能對比來看,WRITER = PARALLEL 模式下的 Pg_bulkload 的數據加載效果幾乎是COPY一倍。另外沒有索引的COPY並不比有索引的COPY快。因為它必須從初始開始為表的總記錄創建索引。
另外:
1.PostgreSQL 參數 maintenance_work_mem 會影響 pg_bulkload 的性能。如果將此參數從 64 MB 更改為 1 GB,則持續時間將縮短近 15%。
2.FILTER 功能在各種操作中轉換輸入數據,但它不是免費的。實際測量顯示,SQL 函數的加載時間增加了近 240%,而 C 函數的加載時間增加了近 140%。
2.pg_bulkload架構圖
2.1 架構圖
pg_bulkload主要包括兩個模塊:reader和writer。reader負責讀取文件、解析tuple,writer負責把解析出的tuple寫入輸出源中。pg_bulkload最初的版本功能很簡單,只是加載數據。3.1版本增加了數據過濾的功能。
2.2 控制文件參數
在使用pg_bulkload時可以指定以下加載選項。控制文件可以用絕對路徑或相對路徑來指定。如果您通過相對路徑指定它,它將相對於執行 pg_bulkload 命令的當前工作目錄。如果你沒有指定控制文件,你應該通過 pg_bulkload 的命令行參數傳遞所需的選項。
部份參數說明
#TYPE = CSV | BINARY | FIXED | FUNCTION
CSV : 從 CSV 格式的文本文件加載 , 默認為CSV
BINARY | FIXED:從固定的二進制文件加載
FUNCTION :從函數的結果集中加載。如果使用它,INPUT 必須是調用函數的表達式。
# WRITER | LOADER = DIRECT | BUFFERED | BINARY | PARALLEL
DIRECT :將數據直接加載到表中。繞過共享緩沖區並跳過 WAL 日志記錄,但需要自己的恢復過程。這是默認的,也是原始舊版本的模式。
BUFFERED:通過共享緩沖區將數據加載到表中。使用共享緩沖區,寫入WAL,並使用原始 PostgreSQL WAL 恢復。
BINARY :將數據轉換為二進制文件,該文件可用作要從中加載的輸入文件。創建加載輸出二進制文件所需的控制文件樣本。此示例文件創建在與二進制文件相同的目錄中,其名稱為 <binary-file-name>.ctl。
PARALLEL:與“WRITER=DIRECT”和“MULTI_PROCESS=YES”相同。如果指定了 PARALLEL,則忽略MULTI_PROCESS。如果為要加載的數據庫配置了密碼驗證,則必須設置密碼文件。
# TRUNCATE = YES | NO
如果YES,則使用 TRUNCATE 命令從目標表中刪除所有行。如果NO,什么也不做。默認為NO。您不能同時指定“WRITER=BINARY”和 TRUNCATE。
# CHECK_CONSTRAINTS = YES | NO
指定在加載期間是否檢查 CHECK 約束。默認為否。您不能同時指定“WRITER=BINARY”和 CHECK_CONSTRAINTS。
# PARSE_ERRORS = n
在解析、編碼檢查、編碼轉換、FILTER 函數、CHECK 約束檢查、NOT NULL 檢查或數據類型轉換期間引發錯誤的 ingored 元組的數量。無效的輸入元組不會加載並記錄在 PARSE BADFILE 中。默認值為 0。如果解析錯誤數等於或多於該值,則提交已加載的數據並且不加載剩余的元組。0 表示不允許錯誤,-1 和 INFINITE 表示忽略所有錯誤。
# SKIP | OFFSET = n
跳過輸入行的數量。默認值為 0。您不能同時指定“TYPE=FUNCTION”和 SKIP。
# LIMIT | LOAD = n
要加載的行數。默認值為INFINITE,即將加載所有數據。即使您使用 TYPE=FUNCTION,此選項也可用。
#ENCODING = encoding
指定輸入數據的編碼。檢查指定的編碼是否有效,如果需要,將輸入數據轉換為數據庫編碼。默認情況下,輸入數據的編碼既不驗證也不轉換。
#FILTER = [ schema_name. ] function_name [ (argtype, ... ) ]
指定過濾函數以轉換輸入文件中的每一行。只要函數名在數據庫中是唯一的,就可以省略 argtype 的定義。如果未指定,則直接將輸入數據解析為加載目標表。另請參閱如何編寫 FILTER 函數以生成 FILTER 函數。
不能同時指定“TYPE=FUNCTION”和 FILTER。此外,CSV 選項中的 FORCE_NOT_NULL 不能與 FILTER 選項一起使用。
2.3 pg_bulkload狀態結果
成功加載時,pg_bulkload 返回 0。當存在一些解析錯誤或重復錯誤時,即使加載本身已完成,它也會返回 3 和 WARNING 消息。請注意,跳過的行和替換的行(ON_DUPLICATE_KEEP = NEW)不被視為錯誤;退出代碼將為 0。
當出現不可持續的錯誤時,加載程序會引發 ERROR 消息。返回碼通常為 1,因為在加載數據期間數據庫服務器中發生了許多錯誤。下表顯示了 pg_bulkload 可以返回的代碼。
返回碼 | 描述 |
---|---|
0 | 成功 |
1 | 在 PostgreSQL 中運行 SQL 時發生錯誤 |
2 | 無法連接到 PostgreSQL |
3 | 成功,但有些數據無法加載 |
3.pg_bulkload 安裝
3.1 依賴包
安裝pg_bulkload 需要以下依賴包,一般在安裝好PG數據庫的環境中,已安裝好相關包,所以我們可以直接安裝就可以。
PostgreSQL devel package : postgresqlxx-devel(RHEL), postgresql-server-dev-x.x(Ubuntu)
PAM devel package : pam-devel(RHEL), libpam-devel(Ubuntu)
Readline devel or libedit devel package : readline-devel or libedit-devel(RHEL), libreadline-dev or libedit-dev(Ubuntu)
C compiler and build utility : "Development Tools" (RHEL), build-essential(Ubuntu)
3.2 安裝pg_bulkload
# 下載連接 <a href="https://github.com/ossc-db/pg_bulkload" style="text-decoration-skip-ink: none; font-family: "Microsoft YaHei", Lato, "PingFang SC", sans-serif;">https://github.com/ossc-db/pg_bulkload</a>
postgres@s2ahumysqlpg01-> unzip pg_bulkload-VERSION3_1_19.zip
postgres@s2ahumysqlpg01-> cd pg_bulkload-VERSION3_1_19
postgres@s2ahumysqlpg01-> make
postgres@s2ahumysqlpg01-> make install
# 創建擴展 ,要使用它需要建extension
postgres@s2ahumysqlpg01-> psql -h 127.0.0.1
psql (12.4)
Type "help" for help.
lottu=# create extension pg_bulkload;
CREATE EXTENSION
#檢查已安裝插件
postgres=# \dx
4. pg_bulkload 參數
postgres@s2ahumysqlpg01-> pg_bulkload --help
pg_bulkload is a bulk data loading tool for PostgreSQL
Usage:
Dataload: pg_bulkload [dataload options] control_file_path
Recovery: pg_bulkload -r [-D DATADIR]
Dataload options:
-i, --input=INPUT INPUT path or function
-O, --output=OUTPUT OUTPUT path or table
-l, --logfile=LOGFILE LOGFILE path
-P, --parse-badfile=* PARSE_BADFILE path
-u, --duplicate-badfile=* DUPLICATE_BADFILE path
-o, --option="key=val" additional option
Recovery options:
-r, --recovery execute recovery
-D, --pgdata=DATADIR database directory
Connection options:
-d, --dbname=DBNAME database to connect
-h, --host=HOSTNAME database server host or socket directory
-p, --port=PORT database server port
-U, --username=USERNAME user name to connect as
-w, --no-password never prompt for password
-W, --password force password prompt
Generic options:
-e, --echo echo queries
-E, --elevel=LEVEL set output message level
--help show this help, then exit
--version output version information, then exit
Read the website for details. <http://github.com/ossc-db/pg_bulkload>
Report bugs to <http://github.com/ossc-db/pg_bulkload/issues>.
5. pg_bulkload 使用
5.1 初始化數據
postgres@s2ahumysqlpg01-> psql -h 127.0.0.1
psql (12.4)
Type "help" for help.
postgres=# create database testdb ;
CREATE DATABASE
postgres=# \c testdb
You are now connected to database "testdb" as user "postgres".
testdb=#
testdb=# create table tb_asher (id int,name text);
CREATE TABLE
testdb=# \d
List of relations
Schema | Name | Type | Owner
--------+----------+-------+----------
public | tb_asher | table | postgres
(1 row)
testdb=# create extension pg_bulkload; #如果連接指定到單個庫時,需要創建擴展以生成 pgbulkload.pg_bulkload() 函數
CREATE EXTENSION
test=# quit
# 模擬CSV 文件
postgres@s2ahumysqlpg01-> seq 100000| awk '{print $0"|asher"}' > bulk_asher.txt
postgres@s2ahumysqlpg01-> more bulk_asher.txt
1|asher
2|asher
3|asher
4|asher
5|asher
6|asher
5.2 加載到指定表
# 將bulk_asher.txt里的數據加載到testdb 庫下的 tb_asher表中
postgres@s2ahumysqlpg01-> pg_bulkload -i /home/postgres/bulk_asher.txt -O tb_asher -l /home/postgres/tb_asher_output.log -P /home/postgres/tb_asher_bad.txt -o "TYPE=CSV" -o "DELIMITER=|" -d testdb -U postgres -h 127.0.0.1
NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
0 Rows skipped.
100000 Rows successfully loaded.
0 Rows not loaded due to parse errors.
0 Rows not loaded due to duplicate errors.
0 Rows replaced with new rows.
# 查看導入日志:
postgres@s2ahumysqlpg01-> cat /home/postgres/tb_asher_output.log
pg_bulkload 3.1.19 on 2022-02-20 16:32:28.642071+08
INPUT = /home/postgres/bulk_asher.txt
PARSE_BADFILE = /home/postgres/tb_asher_bad.txt
LOGFILE = /home/postgres/tb_asher_output.log
LIMIT = INFINITE
PARSE_ERRORS = 0
CHECK_CONSTRAINTS = NO
TYPE = CSV
SKIP = 0
DELIMITER = |
QUOTE = "\""
ESCAPE = "\""
NULL =
OUTPUT = public.tb_asher
MULTI_PROCESS = NO
VERBOSE = NO
WRITER = DIRECT
DUPLICATE_BADFILE = /u01/postgresql/data_bak/pg_bulkload/20220220163228_testdb_public_tb_asher.dup.csv
DUPLICATE_ERRORS = 0
ON_DUPLICATE_KEEP = NEW
TRUNCATE = NO
0 Rows skipped.
100000 Rows successfully loaded.
0 Rows not loaded due to parse errors.
0 Rows not loaded due to duplicate errors.
0 Rows replaced with new rows.
Run began on 2022-02-20 16:32:28.642071+08
Run ended on 2022-02-20 16:32:28.743741+08
CPU 0.02s/0.04u sec elapsed 0.10 sec
5.3 先清空在加載
#增加了 -o "TRUNCATE=YES" 參數
postgres@s2ahumysqlpg01-> pg_bulkload -i /home/postgres/bulk_asher.txt -O tb_asher -l /home/postgres/tb_asher_output.log -P /home/postgres/tb_asher_bad.txt -o "TYPE=CSV" -o "DELIMITER=|" -o "TRUNCATE=YES" -d testdb -U postgres -h 127.0.0.1
4NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
0 Rows skipped.
100000 Rows successfully loaded.
0 Rows not loaded due to parse errors.
0 Rows not loaded due to duplicate errors.
0 Rows replaced with new rows.
# 數據查詢
postgres@s2ahumysqlpg01-> psql -h127.0.0.1 -d testdb -c "select count(1) from tb_asher ;"
count
--------
100000
(1 row)
5.4 使用控制文件
# 新建控制文件 ,可以根據之前加載時,產生的日志文件tb_asher_output.log來更改,去掉里面沒有值的參數 NULL =
vi asher.ctl
INPUT = /home/postgres/bulk_asher.txt
PARSE_BADFILE = /home/postgres/tb_asher_bad.txt
LOGFILE = /home/postgres/tb_asher_output.log
LIMIT = INFINITE
PARSE_ERRORS = 0
CHECK_CONSTRAINTS = NO
TYPE = CSV
SKIP = 0
DELIMITER = |
QUOTE = "\""
ESCAPE = "\""
OUTPUT = public.tb_asher
MULTI_PROCESS = NO
VERBOSE = NO
WRITER = DIRECT
DUPLICATE_BADFILE = /u01/postgresql/data_bak/pg_bulkload/20220220164822_testdb_public_tb_asher.dup.csv
DUPLICATE_ERRORS = 0
ON_DUPLICATE_KEEP = NEW
TRUNCATE = YES
# 使用控制文件來加載
postgres@s2ahumysqlpg01-> pg_bulkload /home/postgres/asher.ctl -d testdb -U postgres -h 127.0.0.1
NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
0 Rows skipped.
100000 Rows successfully loaded.
0 Rows not loaded due to parse errors.
0 Rows not loaded due to duplicate errors.
0 Rows replaced with new rows.
# 數據查詢
postgres@s2ahumysqlpg01-> psql -h127.0.0.1 -d testdb -c "select count(1) from tb_asher ;"
count
--------
100000
(1 row)
5.5 強制寫wal 日志
# pg_bulkload 默認是跳過buffer 直接寫文件 ,但時如果有復制 ,或者需要基本wal日志恢復時沒有wal日志是不行的,這是我們可以強制讓其寫wal日志 ,只需要加載 -o "WRITER=BUFFERED" 參數就可以了
pg_bulkload -i /home/postgres/bulk_asher.txt -O tb_asher -l /home/postgres/tb_asher_output.log -P /home/postgres/tb_asher_bad.txt -o "TYPE=CSV" -o "DELIMITER=|" -o "TRUNCATE=YES" -o "WRITER=BUFFERED" -d testdb -U postgres -h 127.0.0.1
NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
0 Rows skipped.
100000 Rows successfully loaded.
0 Rows not loaded due to parse errors.
0 Rows not loaded due to duplicate errors.
0 Rows replaced with new rows.
6.其它
6.1 注意事項
如果您使用直接加載模式(WRITER=DIRECT 或 PARALLEL),您必須注意以下事項:
1.當 MULTI_PROCESS=YES 並且需要密碼才能從 localhost 連接到數據庫進行加載時,即使在提示中正確輸入密碼,身份驗證也會失敗。為避免這種情況,請配置以下任一項。
a.在 UNIX 中,將以下行添加到 pg_hba.conf。
b.在 .pgpass 文件中指定密碼
127.0.0.1:*:foo:foopass
c.不要使用“WRITER=PARALLE”
2.PITR/Replication :由於繞過了 WAL,PITR 的歸檔恢復不可用。這並不意味着它可以在沒有加載表數據的情況下完成 PITR。
如果您想使用 PITR,請在通過 pg_bulkload 加載后對數據庫進行完整備份。如果您使用流式復制,則需要根據 pg_bulkload 之后的備份集重新創建備用數據庫。
3.不得刪除在 $PGDATA/pg_bulkload目錄中找到的加載狀態文件 (*.loadstatus)。在 pg_bulkload 崩潰恢復中需要這個文件。
4.盡量不要使用 " kill -9" 終止 pg_bulkload 命令。如果您這樣做了,您必須調用 postgresql 腳本來執行 pg_bulkload 恢復並重新啟動 PostgreSQL 以繼續。
5.默認情況下,在數據加載期間僅強制執行唯一約束和非空約束。您可以設置“CHECK_CONSTRAINTS=YES”來檢查 CHECK 約束。無法檢查外鍵約束。用戶有責任提供有效的數據集。
6. maintenance_work_mem會影響 pg_bulkload 的性能。如果將此參數從 64 MB 更改為 1 GB,則持續時間將縮短近 15%。
6.2 編輯過濾器函數
編寫 FILTER 函數時有一些注意事項和警告:
1.輸入文件中的記錄被一一傳遞給 FILTER 函數。
2.當 FILTER 函數發生錯誤時,傳遞的記錄不會加載並寫入 PARSE BADFILE。
3.FILTER 函數必須返回記錄類型或某種復合類型。此外,實際記錄類型必須與目標表定義匹配。
4.如果 FILTER 函數返回 NULL,則加載所有列中都有 NULL 的記錄。
5.支持具有默認參數的函數。如果輸入數據的列數少於函數的參數,則將使用默認值。
6.不支持 VARIADIC 函數。
7.不支持 SETOF 函數。
8.不支持具有泛型類型(any、anyelement 等)的函數。
9.FILTER 函數可以用任何語言實現。SQL、C、PL 都可以,但你應該盡可能快地編寫函數,因為它們會被多次調用。
10.您只能指定 FILTER 或 FORCE_NOT_NULL 選項之一。如果您需要該功能,請重新實現與 FORCE_NOT_NULL 兼容的 FILTER 功能。
# Filter 示例:
CREATE FUNCTION sample_filter(integer, text, text, real DEFAULT 0.05) RETURNS record
AS $$ SELECT $1 * $4, upper($3) $$
LANGUAGE SQL;
7.參考連接
https://github.com/ossc-db/pg_bulkload
http://ossc-db.github.io/pg_bulkload/index.html
http://ossc-db.github.io/pg_bulkload/pg_bulkload.html