PostgreSQL的並行查詢


 

PostgreSQL的並行化包含三個重要組件:進程本身(leader進程)、gather、workers。沒有開啟並行化的時候,進程自身處理所有的數據;一旦計划器決定某個查詢或查詢中部分可以使用並行的時候,就會在查詢的並行化部分添加一個gather節點,將gather節點作為子查詢樹的根節點。

 

查詢執行是從leader進程開始。一旦開啟了並行或查詢中部分支持並行,就會分配一個gather節點和多個worker線程。relation的blocks在各個workers線程之間划分。workers的數量受postgresql的配置參數控制。workers之間使用共享內存相互協調和通信,一旦worker完成了自己的工作,結果就被傳給了leader進程。

workers和leader進程之間使用消息隊列(依賴共享內存)進行通信。每個進程有兩個隊列:一個是error隊列;一個是tuples隊列。

 

 

 

並行順序掃描(Parallel sequential scan)

在PostgreSQL 9.6中,增加了對並行順序掃描的支持。順序掃描是在表上進行的掃描,在該表中一個接一個的塊順序地被評估。就其本質而言,順序掃描允許並行化。這樣,整個表將在多個workers線程之間順序掃描。

並行順序掃描快並不是因為可以並行地讀,而是將數據分散到了多個cpu。

abce=# explain analyze select work_hour from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                           QUERY PLAN                                                           
--------------------------------------------------------------------------------------------------------------------------------
 Seq Scan on hh_adds_static_day  (cost=0.00..261864.36 rows=4241981 width=4) (actual time=0.012..1407.214 rows=4228109 loops=1)
   Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
   Rows Removed by Filter: 735600
 Planning Time: 0.108 ms
 Execution Time: 1585.835 ms
(5 rows)

abce=# 

順序掃描產生了大量的行,但是沒有使用聚合函數。因此,查詢使用的是單個cpu核心。

 

增加一個sum函數后,很明顯使用了兩個工作線程,從而使得查詢加速:

abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                                        QUERY PLAN                                                                        
----------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=231089.60..231089.61 rows=1 width=4) (actual time=749.998..751.529 rows=1 loops=1)
   ->  Gather  (cost=231089.38..231089.59 rows=2 width=4) (actual time=749.867..751.515 rows=3 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=230089.38..230089.39 rows=1 width=4) (actual time=746.463..746.464 rows=1 loops=3)
               ->  Parallel Seq Scan on hh_adds_static_day  (cost=0.00..225670.65 rows=1767492 width=4) (actual time=0.032..489.501 rows=1409370 loops=3)
                     Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
                     Rows Removed by Filter: 245200
 Planning Time: 0.112 ms
 Execution Time: 751.611 ms
(10 rows)

abce=# 

 

並行聚合(Parallel Aggregation)

在數據庫中,計算聚合是非常昂貴的操作。如果以單個進程進行執行,則這將花費相當長的時間。在PostgreSQL 9.6中,通過簡單地將它們分成多個塊(分而治之策略)來增加了並行計算的能力。多個worker線程執行聚合的部分,然后leader再根據它們的結果計算最終結果。

從技術上講,將Partial Aggregate節點添加到計划樹中,並且每個Partial Aggregate節點包含一個worker線程的輸出。然后將這些輸出發送到Finalize Aggregate節點,該節點合並來自多個(所有)Partial Aggregate節點的聚合。如此有效的並行部分計划在根部包括一個Finalize Aggregate節點,以及一個將Partial Aggregate節點作為子節點的Gather節點。

''Parallel Seq Scan''節點生成用於部分聚合(''Partial Aggregate'')的行。

''Partial Aggregate''節點使用SUM()函數減少這些行。最后,由''Gather''節點收集每個worker的總和計數。

''Finalize Aggregate''節點計算最后的總和。如果你使用的自己的聚合函數,別忘了將其標記為''parallel safe''的。

worker數量

可以動態調整worker的數量:

abce=# show max_parallel_workers_per_gather;
 max_parallel_workers_per_gather 
---------------------------------
 2
(1 row)

abce=# alter system set max_parallel_workers_per_gather=4;
ALTER SYSTEM
abce=# select * from pg_reload_conf();
 pg_reload_conf 
----------------
 t
(1 row)

abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                                       QUERY PLAN                                                                        
---------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=218981.25..218981.26 rows=1 width=4) (actual time=473.424..475.156 rows=1 loops=1)
   ->  Gather  (cost=218980.83..218981.24 rows=4 width=4) (actual time=473.314..475.144 rows=5 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial Aggregate  (cost=217980.83..217980.84 rows=1 width=4) (actual time=468.769..468.770 rows=1 loops=5)
               ->  Parallel Seq Scan on hh_adds_static_day  (cost=0.00..215329.59 rows=1060495 width=4) (actual time=0.036..306.854 rows=845622 loops=5)
                     Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
                     Rows Removed by Filter: 147120
 Planning Time: 0.150 ms
 Execution Time: 475.218 ms
(10 rows)

abce=# 

我們將數量從2變成了4。

 

 

使用多少個worker進程

首先,max_parallel_workers_per_gather參數定義了worker的最小數量。

其次,查詢執行器從池中獲取worker的數量受限於max_parallel_workers的值。

最后,最頂層的限制是max_worker_processes的值,該參數定義了后台worker進程的總數量。

 

如果分配worker進程失敗,就會切換成單進程執行。

查詢計划器會根據表或索引的大小,考慮減少worker進程的數量。受參數min_parallel_table_scan_size、min_parallel_index_scan_size影響。

參數的默認設置:

abce=# show min_parallel_table_scan_size;
 min_parallel_table_scan_size 
------------------------------
 8MB
(1 row)

abce=# show min_parallel_index_scan_size ;
 min_parallel_index_scan_size 
------------------------------
 512kB
(1 row)

abce=# 

影響關系:

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

每當表的大小是min_parallel(index|table)scan_size的三倍,postgresql就會增加一個worker進程。worker進程的數量不是基於cost的。

在實踐中,這些規則並不總是被遵守的,可以對特定的表執行alter table ... set (parallel_workers=N)進行設置。

 

為什么並行執行沒有被使用?

除了並行執行的一些限制,postgresql也檢查成本(cost):

·parallel_setup_cost避免了對小的查詢采用並行執行。它對用於內存設置、進程啟動和初始通信的時間進行建模

·parallel_tuple_cost:leader進程和worker進程之間的通信會花費很長的時間。時間與worker發送的元組數量成比例。該參數對通信成本進行建模。

 

嵌套循環連接(Nested loop joins)

從9.6版本開始,postgresql支持對“Nested loop”執行並行操作:

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN                                      
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

gather發生在最后階段,因此“Nested Loop Left Join”是並行操作。從版本10開始提供“Parallel Index Only Scan”。其行為與並行順序掃描類似。條件c_custkey = o_custkey為每個客戶行讀取一個訂單。 因此,它不是並行的。

 

哈希連接(Hash Join)

在PostgreSQL 11之前,每個worker都構建自己的哈希表。因此,4個以上的workers進程無法提高性能。新的實現使用一個共享哈希表。每個worker都可以利用WORK_MEM來構建哈希表。

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN                                               
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

這里每個worker幫助構建一個共享的hash表。

 

合並連接(Merge Join)

鑒於合並連接的自身特性,使其不支持並行查詢。如果合並連接是查詢執行的最后一個階段-你仍可以看到並行執行。

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

“Merge Join”節點在“Gather Merge”上方。 因此,合並不使用並行執行。但是“Parallel Index Scan”節點仍然有助於part_pkey。

 

分區智能連接(Partition-wise join)

PostgreSQL 11默認禁用分區智能連接(partition-wise join)功能。分區智能聯接的計划成本很高。分區相似的表的聯接可以逐分區進行。這允許postgres使用較小的哈希表。每個分區聯接操作都可以並行執行。

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN                     
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN                         
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

最重要的是,僅在分區足夠大的情況下,分區智能聯接才能使用並行執行。

 

並行追加(Parallel Append)

通常可以在UNION ALL查詢中看到這一點。缺點是並行性差,因為每個work最終都是為單個查詢工作。

即使啟用了四個worker,也只有兩個被啟動。

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN                                           
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

  

重要的變量

·work_mem

·max_parallel_workers_per_gather:執行器為每個並行執行的計划節點分配的worker數量

·max_worker_processes

·max_parallel_workers

 

在9.6版本中,並行查詢執行被引入;

在10版本中,默認開啟並行執行;

在負載重的oltp系統上,建議關閉並行執行。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM