跑spark程序的時候,公司服務器需要排隊等資源,參考一些設置,之前不知道,跑的很慢,懂得設置之后簡直直接起飛。
簡單粗暴上設置代碼:
1 def conf(self): 2 conf = super(TbtestStatisBase, self).conf 3 conf.update({ 4 'spark.shuffle.service.enabled': 'true', 5 'spark.dynamicAllocation.enabled': 'false', 6 'spark.dynamicAllocation.initialExecutors': 50, 7 'spark.dynamicAllocation.minExecutors': 1, 8 'spark.dynamicAllocation.maxExecutors': 125, 9 'spark.sql.parquet.compression.codec': 'snappy', 10 'spark.yarn.executor.memoryOverhead': 4096, 11 "spark.speculation": 'true', 12 'spark.kryoserializer.buffer.max': '512m', 13 })
一小部分設置。簡單解析一下:
1、spark.shuffle.service.enabled。用來設置是否開啟動態分配。開啟了動態分配的Application在申請資源的時候默認會擁有更高的優先級
2、spark.dynamicAllocation.initialExecutors (默認下是3)
spark.dynamicAllocation.minExecutors (默認下是0)
spark.dynamicAllocation.maxExecutors (默認下是30)
Executor應該是所謂資源單位,自己理解為越多執行越快嘛,如果是Yarn的話,就是Containers,一個道理
3、spark.yarn.executor.memoryOverhead 是設置堆外內存大小,和 executor_memory 做個對比:
ExecutorMemory為JVM進程的JAVA堆區域。
MemoryOverhead是JVM進程中除Java堆以外占用的空間大小,包括方法區(永久代)、Java虛擬機棧、本地方法棧、JVM進程本身所用的內存、直接內存(Direct Memory)等。
兩者關系:如果用於存儲RDD的空間不足,先存儲的RDD的分區會被后存儲的覆蓋。當需要使用丟失分區的數據時,丟失的數據會被重新計算。ExecutorMemory + MemoryOverhead之和(JVM進程總內存)
我只是簡單理解堆外內存為一個備用區域吧,還不知道具體什么作用。有遇到內存不夠報錯的情況,然后調大了MemoryOverhead。
4、理論上:非動態分配情況下,我們必須要等到有100個Executor才能運行Application,並且這100個會一直被占用到程序結束,即便只有一個任務運行了很長時間。動態分配情況下,當有10個Executor的時候,我們的Application就開始運行了,並且我們后續可以繼續申請資源,最多申請到100個Executor,當我們有空閑資源的時候,我們可以被釋放資源到最少只保留10個Executor,當需要的時候我們有更高的優先級從YARN那兒拿到資源。
但是!
5、用了之后簡直起飛。。公司服務器好像根本不存在動態這回事,總是只給幾個executor,雖然開始是很快,但執行過程很慢,所以我放棄了,不動態調整了,直接設置死 num_executors = 90。。。然后:
發現一般資源還是挺富裕的嘛,寫了90個 也不用等很久。為什么動態一直不肯分給我。。。好多資源,瞬間2個小時縮短為20分鍾。