#!/bin/bash #隊列名 根據yarn的隊列提交 realtime_queue=root #提交的任務名 my_job_name="OrderQZ" spark-shell --master yarn --deploy-mode client \ --queue $realtime_queue \ #總的executors數 根據數據量與自己的集群資源來分配 --num-executors 35 \ #每個executor的核數 --executor-cores 5 \ #每個executor的內存 --executor-memory 19G \ #diver 端jvm日志配置 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \ --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \ #序列化 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ #數據本地化;一般會默認3s,重試5次的去分配,一旦超時失敗,將會選擇一個比上一個本地級別差的級別再一次分配,如果發生了數據傳輸,那么task首先通過blockmanager獲取數據,如果本地沒有數據,則通過getRemote方法從數據所在節點的blockmanager獲取數據並返回至task所在節點 --conf spark.locality.wait=5 \ #失敗重試次數 --conf spark.task.maxFailures=8 \ # 是否開啟在webui殺死進程 --conf spark.ui.killEnabled=false \ #SparkContext 啟動時記錄有效 SparkConf信息 --conf spark.logConf=true \ #driver的堆外內存 內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機),這樣做的結果就是能夠在一定程度上減少垃圾回收對應用程序造成的影響。使用未公開的Unsafe和NIO包下ByteBuffer來創建堆外內存 --conf spark.yarn.driver.memoryOverhead=512 \ --conf spark.yarn.executor.memoryOverhead=5480 #提交申請的最大嘗試次數, 小於等於YARN配置中的全局最大嘗試次數。 --conf spark.yarn.maxAppAttempts=4 \ #定義AM故障跟蹤的有效時間間隔。如果AM至少在定義的時間間隔內運行,則AM故障計數將被重置。如果未配置,此功能未啟用。 --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.executor.failuresValidityInterval=1h \ #動態資源分配 --conf spark.shuffle.service.enabled=true \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ #推測執行 --conf spark.speculation=true \ --conf spark.speculation.quantile=0.9 \ #shffule task數目 --conf spark.sql.shuffle.partitions=178 \ #,當它設置為true時,Spark SQL會把每條查詢的語句在運行時編譯為java的二進制代碼。這有什么作用呢?它可以提高大型查詢的性能,但是如果進行小規模的查詢的時候反而會變慢,就是說直接用查詢反而比將它編譯成為java的二進制代碼快。所以在優化這個選項的時候要視情況而定。 --conf spark.sql.codegen=true \ #默認值為false 它的作用是自動對內存中的列式存儲進行壓縮 --conf spark.sql.inMemoryColumnarStorage.compressed=true \ # join實現主要有3種,即BroadcastHashJoinExec、ShuffledHashJoinExec和SortMergeJoinExec,優先級為 #1 如果canBroadcast,則BroadcastHashJoinExec; #2 如果spark.sql.join.preferSortMergeJoin=false,則ShuffledHashJoinExec; #3 否則為SortMergeJoinExec; --conf spark.sql.join.preferSortMergeJoin=true \ # Spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網絡傳輸的過程會申請堆外內存(netty是零拷貝),所以使用了堆外內存。 --conf spark.reducer.maxSizeInFlight=96M /** spark.reducer.maxSizeInFlight默認值:48m 參數說明:該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。 錯誤:reduce oom reduce task去map拉數據,reduce 一邊拉數據一邊聚合 reduce段有一塊聚合內存(executor memory * 0.2) 解決辦法:1、增加reduce 聚合的內存的比例 設置spark.shuffle.memoryFraction 2、 增加executor memory的大小 --executor-memory 5G 3、減少reduce task每次拉取的數據量 設置spark.reducer.maxSizeInFlight 24m */
