BroadCast TimeOut 300.
org.apache.spark.SparkException: Could not execute broadcast in 300 secs.
You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
一個stt的job被Block之后,經查日志發現有兩個表在廣播的時候出現了broadcast TimeOut的問題。
Sql logic:
create or replace temporary view table_tmp
as
select
/** BROADCAST(A_tmp) */
A_tmp.c1,
A_tmp.c2,
A_tmp.c3,
...
from source_table A_tmp
join gdw_table.other_table1 B_tmp on ...
join gdw_table.other_table2 C_tmp on ...
group by 1,2,3,4,5,6,...11
檢查了一下
A_tmp的數據量,有238 rows 數據。
B_tmp的數據量,有1834203873 rows 數據。
C_tmp的數據量,有289371375 rows 的數據。
Property Name | Default | Meaning |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files. |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). |
spark.sql.broadcastTimeout | 300 | Timeout in seconds for the broadcast wait time in broadcast joins |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE
|
spark.sql.shuffle.partitions | 200 | Configures the number of partitions to use when shuffling data for joins or aggregations. |
問題產生原因:
可能Job的執行時間是美國的凌晨一點,會有大量的資源被占用,資源緊缺,導致沒有在規定時間內完成join並相應。
解決方案:
- 增大默認Timeout時間,set spark.sql.broadcastTimeout = 600.
- persist(), persist() 可以使broadcastHashjoin 變成 shuffleHashJoin.
- 關閉broadcast join set spark.sql.autoBroadcastJoinThreshold = -1
- reset job.
選擇了一個其他的時間重新執行該job,成功執行。