記一次--------sparkSQL程序local模式運行不起來,增加參數配置spark.locality.wait


問題: 
    跑本地模式 一直卡在下圖最下面日志部分30分鍾不動
查看運行日志一直卡在 箭頭處不動,沒有任何報錯。
因為處理邏輯只是簡單的sparksql兩個表left join,  union, having等簡單的函數操作。
測試環境 數據僅有3w條。
 
雖然將程序打包到集群,但還是跑的local模式, 下面是腳本配置  
#!/bin/bash
#jdk
export JAVA_HOME=/usr/java/jdk1.8.0_162
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
#hadoop
export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
 
 
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native
 
 
#spark
#export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0
#export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
 
 
realtime_queue=root
my_job_name="userhierarchy"
main_class="com.df.App"
 
 
/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[2] \
--name ${my_job_name} \
--class ${main_class} \
--driver-memory 2g     \
--executor-memory 2g     \
--executor-cores 8   \
--queue ${realtime_queue} \
/opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar
 
 
 
 
#--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8787" \
 
 

 

 
1.首先將數據量減少到5000 可以正常執行,測試1w條 又不行。
考慮到會不會因為產生笛卡爾積。 因為5000條產生笛卡爾積是5000*5000  ,而3w * 3w 的數據量也還是不小。
故對所有使用join的SQL都進行排查, 發現沒有笛卡爾積。
然后嘗試使用IN 來代替 left join 。  還是不行。
然后通過使用where代替having。  
因為 having在SQL中的執行在最后,需要對表進行分組 排序完之后在做having的條件篩選,而 where是先進行條件篩選之后在對剩余數據進行處理。效率會比having要高。  但是程序還是卡在上圖位置不動。
 
2.轉變思路對 參數進行優化,查閱資料發現一個參數
spark.locality.wait     數據本地化等待時間
為什么會有這個參數呢?
    spark在driver上 ,對application的每個state的task分配之前,會先計算出每個task要計算的是哪個分片數據(RDD上的某個partition),spark分配的task的算法,優先希望每個task簽好分配到它所要計算的數據的節點上,這樣就盡可能的避免了網絡間數據傳輸。
    但實際上,有時候 ,task並沒有分配到它所要計算的數據的節點上,因為有可能那個節點的計算資源和計算能力滿了,因為task處理數據是需要計算資源的,所以通常來說spark會等待一段時間,看是否能將task分配到它要處理數據所在節點上,這個等待時長默為3s(3s不是絕對的,針對不同的本地化級別可以設置不同等待時長)。如果超過等待時長,無法計算等待,就會選擇一個性能比較差的本地化級別,比如:task分配到距離它所要處理數據節點比較近的一個節點上,然后傳輸數據進行計算。
    而對於我們來說最好是,task正好分配到它要處理數據所在節點上,這樣直接從本地executor對應的blockManager中獲取數據,純內存傳出數據,或帶有部分磁盤IO。
 
本地化級別:
    本地化就是指task被分配要處理部分數據,task和它要處理的數據可能會在不同的節點位置,根據這種位置關系又5種不同的本地化級別:
 
1.PROCESS_LOCAL:進程本地化,計算數據的task由某個executor執行,數據也就在這個executor對應的BlockManager。這種本地化級別 性能最好
 
2.NODE_LOCAL:節點本地化。第一種情況,數據作為HDFS block數據塊就在節點上, 而task節點是在某個executor上運行;第二種情況,task和它要處理的數據,在同一節點的不同executor上,數據需要在進程之間傳輸
 
3.NO_PREF: 對於task來說,數據在哪里獲取都一樣,無好壞之分
 
4.RACK_LOCAL:機架本地化,task和它要處理的數據在同一機架的不同節點上, 數據需要通過網絡在節點之間傳輸
 
5.ANY:task和它要處理的數據可能在集群的任何地方,而且不在同一機架上(RACK),數據要跨機架傳輸,性能最差。
 
何時調節該參數呢?
在本地模式下跑程序觀察spark作業的日志,查看starting task........ PROCESS_LOCAL NODE_LOCAL 觀察大多數task的數據本地化級別,如果大多數是NODE_LOCAL / ANY 那么可以調節,觀察時間是否有縮短,反復幾次 尋找最優的時間。
 
 添加配置后的腳本
#!/bin/bash
#jdk
export JAVA_HOME=/usr/java/jdk1.8.0_162
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
#hadoop
export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native

#spark
#export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0
#export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

realtime_queue=root
my_job_name="userhierarchy"
main_class="com.df.App"

 /opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[3] \
--name ${my_job_name} \
--class ${main_class} \
--driver-memory 3g     \
--executor-memory 3g     \
--executor-cores 5   \
--conf  spark.executor.memoryOverhead=2048M \
--conf spark.locality.wait=10 \ --queue ${realtime_queue} \
/opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar order_info_test

 

 
 
 
 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM