前幾節介紹了下常用的函數和常踩的坑以及如何打包程序,現在來說下如何調參優化。當我們開發完一個項目,測試完成后,就要提交到服務器上運行,但運行不穩定,老是拋出如下異常,這就很納悶了呀,明明測試上沒問題,咋一到線上就出bug了呢!別急,我們來看下這bug到底怎么回事~
一、錯誤分析
1、參數設置及異常信息
18/10/08 16:23:51 WARN TransportChannelHandler: Exception in connection from /10.200.2.95:40888 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745) 18/10/08 16:23:51 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.200.2.95:40888 is closed
2、分析原因
運行的程序其實邏輯上比較簡單,只是從hive表里讀取的數據量很大,差不多60+G,並且需要將某些hive表讀取到dirver節點上,用來獲取每個executor上某些數據的映射值,所以driver設定的資源較大。運行時拋出的異常信息,從網上查了下原因大致是服務器的並發連接數超過了其承載量,服務器會將其中一些連接Down掉,這也就是說在運行spark程序時,過多的申請資源並發執行。那應該怎樣去合理設置參數才能最大化提升並發的性能呢?這些參數又分別代表什么?
二、常見參數及含義
常用參數 |
含義及建議 |
spark.executor.memory (--executor-memory) |
含義:每個執行器進程分配的內存 建議:該設置需要和下面的executor-num數一起考慮,一般設置為4G~8G如果和其他人共用隊列,為防止獨占資源建議memory*num <= 資源隊列最大總內存*(1/2~1/3) |
spark.executor.num (--executor-num) |
含義:設置用來執行spark作業的執行器進程的個數 建議:一般設置為50~100個,設置太少會導致無法充分利用資源,太多又導致大部分任務分配不到充足的資源 |
spark.executor.cores (--executor-cores) |
含義:每個執行器進程的cpu core數量,決定了執行器進程並發執行task線程的個數,一個core同一時間只能執行一個task線程。如果core數量越多,完成自己task越快。 建議:一般設置2~4個,需要和executor-num一起考慮,num*core<=隊列總core*(1/2~1/3) |
spark.driver.memory (--driver-memory) |
含義:設置驅動進程的內存 建議:driver一般不設置,默認的就可以,不過如果你在程序中需要使用collect算子拉取rdd到驅動節點上,那就得設置相應的內存大小(大於幾十k建議使用廣播變量) |
spark.default.parallelism | 含義:設置每個stage的默認任務數量 建議:官方建議設置為 executor-num*executor-cores的2~3倍 |
spark.storage.memoryFraction | 含義:默認Executor 60%的內存,可以用來保存持久化的RDD數據 建議:spark作業中,如果有較多rdd需要持久化,該參數可適當提高一些,保證持久化的數據存儲在內存中,避免被寫入磁盤影響運行速度。如果shuffle類操作較多,可調低該參數。並且如果由於頻繁的垃圾回收導致運行緩慢,證明執行task的內存不夠用,建議調低該參數。 |
spark.shuffle.memoryFraction | 含義:設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例 建議:shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。結合上一個參數調整。 |
spark.speculation | 含義:設為true時開啟task預測執行機制。當出現較慢的任務時,這種機制會在另外的節點嘗試執行該任務的一個副本。 建議:true,打開此選項會減少大規模集群中個別較慢的任務帶來的影響。 |
spark.storage.blockManagerTimeoutIntervalMs | 含義:內部通過超時機制追蹤執行器進程是否存活的閾值。 建議:對於會引發長時間垃圾回收(GC)暫停的作業,需要把這個值調到100秒(100000)以防止失敗。 |
spark.sql.shuffle.partitions | 含義:配置join或者聚合操作shuffle數據時分區的數量,默認200 建議:同spark.default.parallelism |
三、實踐
通過適當調整以上講到的幾個參數,降低spark.default.parallelism的同時又設置了spark.sql.shuffle.partitions、spark.speculation、spark.storage.blockManagerTimeoutIntervalMs三個參數。由於項目中頻繁的讀取hive表數據,並進行連接操作,所以在shuffle階段增大了partitions。對於woker傾斜,設置spark.speculation=true,把預測不樂觀的節點去掉來保證程序可穩定運行,通過這幾個參數的調整這樣並大大減少了運行時間。