問題:
跑本地模式 一直卡在下圖最下面日志部分30分鍾不動
查看運行日志一直卡在 箭頭處不動,沒有任何報錯。
因為處理邏輯只是簡單的sparksql兩個表left join, union, having等簡單的函數操作。
測試環境 數據僅有3w條。

雖然將程序打包到集群,但還是跑的local模式, 下面是腳本配置
#!/bin/bash #jdk export JAVA_HOME=/usr/java/jdk1.8.0_162 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin #hadoop export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24 export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native #spark #export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0 #export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin realtime_queue=root my_job_name="userhierarchy" main_class="com.df.App" /opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[2] \ --name ${my_job_name} \ --class ${main_class} \ --driver-memory 2g \ --executor-memory 2g \ --executor-cores 8 \ --queue ${realtime_queue} \ /opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar #--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8787" \
1.首先將數據量減少到5000 可以正常執行,測試1w條 又不行。
考慮到會不會因為產生笛卡爾積。 因為5000條產生笛卡爾積是5000*5000 ,而3w * 3w 的數據量也還是不小。
故對所有使用join的SQL都進行排查, 發現沒有笛卡爾積。
然后嘗試使用IN 來代替 left join 。 還是不行。
然后通過使用where代替having。
因為 having在SQL中的執行在最后,需要對表進行分組 排序完之后在做having的條件篩選,而 where是先進行條件篩選之后在對剩余數據進行處理。效率會比having要高。 但是程序還是卡在上圖位置不動。
2.轉變思路對 參數進行優化,查閱資料發現一個參數
spark.locality.wait 數據本地化等待時間
為什么會有這個參數呢?
spark在driver上 ,對application的每個state的task分配之前,會先計算出每個task要計算的是哪個分片數據(RDD上的某個partition),spark分配的task的算法,優先希望每個task簽好分配到它所要計算的數據的節點上,這樣就盡可能的避免了網絡間數據傳輸。
但實際上,有時候 ,task並沒有分配到它所要計算的數據的節點上,因為有可能那個節點的計算資源和計算能力滿了,因為task處理數據是需要計算資源的,所以通常來說spark會等待一段時間,看是否能將task分配到它要處理數據所在節點上,這個等待時長默為3s(3s不是絕對的,針對不同的本地化級別可以設置不同等待時長)。如果超過等待時長,無法計算等待,就會選擇一個性能比較差的本地化級別,比如:task分配到距離它所要處理數據節點比較近的一個節點上,然后傳輸數據進行計算。
而對於我們來說最好是,task正好分配到它要處理數據所在節點上,這樣直接從本地executor對應的blockManager中獲取數據,純內存傳出數據,或帶有部分磁盤IO。
本地化級別:
本地化就是指task被分配要處理部分數據,task和它要處理的數據可能會在不同的節點位置,根據這種位置關系又5種不同的本地化級別:
1.PROCESS_LOCAL:進程本地化,計算數據的task由某個executor執行,數據也就在這個executor對應的BlockManager。這種本地化級別 性能最好
2.NODE_LOCAL:節點本地化。第一種情況,數據作為HDFS block數據塊就在節點上, 而task節點是在某個executor上運行;第二種情況,task和它要處理的數據,在同一節點的不同executor上,數據需要在進程之間傳輸
3.NO_PREF: 對於task來說,數據在哪里獲取都一樣,無好壞之分
4.RACK_LOCAL:機架本地化,task和它要處理的數據在同一機架的不同節點上, 數據需要通過網絡在節點之間傳輸
5.ANY:task和它要處理的數據可能在集群的任何地方,而且不在同一機架上(RACK),數據要跨機架傳輸,性能最差。
何時調節該參數呢?
在本地模式下跑程序觀察spark作業的日志,查看starting task........ PROCESS_LOCAL NODE_LOCAL 觀察大多數task的數據本地化級別,如果大多數是NODE_LOCAL / ANY 那么可以調節,觀察時間是否有縮短,反復幾次 尋找最優的時間。

添加配置后的腳本
#!/bin/bash #jdk export JAVA_HOME=/usr/java/jdk1.8.0_162 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin #hadoop export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24 export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native #spark #export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0 #export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin realtime_queue=root my_job_name="userhierarchy" main_class="com.df.App" /opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[3] \ --name ${my_job_name} \ --class ${main_class} \ --driver-memory 3g \ --executor-memory 3g \ --executor-cores 5 \ --conf spark.executor.memoryOverhead=2048M \ --conf spark.locality.wait=10 \ --queue ${realtime_queue} \ /opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar order_info_test