1、內存優化
1.1、RDD
RDD默認cache僅使用內存
可以看到使用默認cache時,四個分區只在內存中緩存了3個分區,4.4G的數據
使用kryo序列化+MEMORY_ONLY_SER
可以看到緩存了四個分區的全部數據,且只緩存了1445.8M
所以這兩種緩存方式如何選擇,官網建議
也就是說集群資源足夠使用默認cache,資源緊張使用kryo序列化+MEMORY_ONLY_SER
1.2、DataFrame與DataSet
DataSet不使用Java和Kryo序列化,它使用特殊的編碼器序列化
使用默認cache,保存在內存和磁盤
同樣多的數據也是全部緩存,只使用了內存612.3M
使用序列化緩存時比使用默認緩存還多緩存了30M,共646.2M
df和ds直接使用默認cache即可
2、小文件過多問題
2.1、RDD中並行度設置
spark.default.parallelism
For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
Local mode: number of cores on the local machine
Mesos fine grained mode: 8
Others: total number of cores on all executor nodes or 2, whichever is larger
Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.
2.2、spark sql
Spark SQL can cache tables using an in-memory columnar format by calling sqlContext.cacheTable("tableName") or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call sqlContext.uncacheTable("tableName") to remove the table from memory.
建議如果下面無其它任務,緩存可以不釋放,有其它任務要釋放
算子方式
result.unpersist
spark.sql.shuffle.partitions Configures the number of partitions to use when shuffling data for joins or aggregations.
默認200
如不減少分區,join后hadoop上會有200個小文件
前三個stage為讀文件(控制不了),后兩個stage為join並行度,為200
1、使用coalesce算子縮小分區,不能大於原有分區數值
2、如果數值小於vcore,有些vcore就不會工作,速度會慢
如壓縮成1,並行度就是1只有一個vcore在工作,不會shuffle,如果數據量很大且參數很小,可能會產生oom
可以看到將分區減少到20,hadoop上只有20個文件
3、合理利用cpu資源
未優化時任務可以看到最后200個任務沒有平均分到每台機器上,壓力全在hadoop103上,如果數據量很少,hadoop103上有些vcore可能沒數據在空轉,沒有合理利用cpu資源
將·spark.sql.shuffle.partitions設置為總vcore的2到3倍可以達到最優效果
不添加縮小分區coalesce可以看到有36個任務
任務分配也很平均,達到優化效果,避免空轉情況,合理利用cpu資源,任務時間縮短到2.5分鍾
4、廣播join
將小表聚合到driver端,分發到每個executor,規避shuffle,避免此stage
只適合小表join大表
正常大表join大表走SortMergeJoin
小於等於10M,自動進行廣播join
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB)
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run.
4.1 API
禁用掉廣播join ,設置參數為-1
可以看到只剩下一個36task的join stage,多出來一步broadcast exchange,
變成BroadcastHashJoin 耗時變成2分鍾
4.2 參數
單位不能是M,10485760
默認10M,實際生產可調大參數,如改成20MB,可以避免小表join大表時數據傾斜
set("spark.sql.autoBroadcastJoinThreshold","20971520")
5、數據傾斜
並不只是500萬,因為一個分區里不只一個key,包含多個key,如分區1 101 102 分區2 103 104
本質將相同key的數據聚集到一個task
5.1 解決數據傾斜錯誤方法
5.2 解決數據傾斜
1、廣播join
2、打散大表,擴容小表 能解決,但可能更加耗時,因為小表數據量增加了
除非情況非常嚴重,結果出不來
拿打散后的courseId進行join
可以看到dataframe.map后變成dataset
循環里面為i+"_"+courseid,寫錯了
可以看到已經得到優化
但是時間由變成50秒
和3中只縮小分區34秒時間增加,雖然3中有數據傾斜
6、SMB join
排序時間優化,數據量大的時候效果很明顯
spark中支持分桶必須用saveastable,insertinto不支持分桶
先拿兩張分桶表做join
分桶后task數就和分桶數一樣
7、使用堆外內存
3.0之前
3.0之后
修改內存測試
max(2G*0.1 384)
2G+2G+384>4G
實際申請4個G,申請會大於4個G
堆外內存使用
堆內堆外會互相借用
什么情況下使用堆外內存