1、java: 找不到符號
map(o->o._2)處提示找不到符號
SparkSession spark = SparkSession.builder().appName(appName).getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); JavaRDD<ObjectArrayWritable> rdd = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf).map(o->o._2);
使用sequenceFile代替newAPIHadoopFile
或者將原先代碼分為兩行
JavaPairRDD<ObjectArrayWritable,ObjectArrayWritable> pairRDD = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf); JavaRDD<ObjectArrayWritable> rdd = pairRDD.map(o->o._2);
2、java.lang.ClassCastException: java.util.Arrays$ArrayList cannot be cast to scala.collection.Seq
原因:在創建Row時突發奇想把java的List當做元素放入
List<Object> list = new ArrayList<>(); Row r = RowFactory.create(list);
然后在shuffle后取出時報錯
Row r = ... List<Object> list = r.getList(0);
看了下scala的代碼發現,雖然放入時是按照java類型放入的;但是使用getList取出時是按照scala的seq序列取出,所以導致類型轉換異常
根據源碼看到getAs方法,將getList替換為getAs,這次沒有報ClassCastException;但是出現另外一個錯誤!
Caused by: java.lang.UnsupportedOperationException
原因是因為我獲取到List后是沒有具體子類實現的,所以在調用addAll時,最后使用了AbstractList的add方法
創建新的list,將getAs的list遍歷放入新的list。
3、spark Container killed on request. Exit code is 143
很大可能是由於物理內存達到限制,導致container被kill掉報錯。
粗暴簡單的解決方式,增加executor內存大小
spark.executor.memory 4g
再增加內存后依然不行,查看application的log日志發現(yarn logs -applicationId …)
ERROR RetryingBlockFetcher: Exception while beginning fetch of 5 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to 主機:端口
原來是某個executor掛了,某個exetutor想要fetch數據(應該是shuffle read),但那個有數據的executor掛了,導致fetch失敗
shuffle分為shuffle write和shuffle read兩部分。
shuffle write的分區數由上一階段的RDD分區數控制,shuffle read的分區數則是由Spark提供的一些參數控制。
shuffle write可以簡單理解為類似於saveAsLocalDiskFile的操作,將計算的中間結果按某種規則臨時放到各個executor所在的本地磁盤上。
如果shuffle read的量很大,那么將會導致一個task需要處理的數據非常大,從而導致JVM crash以及取shuffle數據失敗,最后executor也丟失了,看到Failed to connect to host的錯誤(executor lost)或者造成長時間的gc。
解決方案:
(a) 減少shuffle數據和操作 思考是否可以使用map side join或是broadcast join來規避shuffle的產生。 將不必要的數據在shuffle前進行過濾,比如原始數據有20個字段,只要選取需要的字段進行處理即可,將會減少一定的shuffle數據。
(b) 控制分區數 對於SparkSQL和DataFrame的join,group by等操作 通過spark.sql.shuffle.partitions控制分區數,默認為200,根據shuffle的量以及計算的復雜度提高這個值。 對於Rdd的join,groupBy,reduceByKey等操作 通過spark.default.parallelism控制shuffle read與reduce處理的分區數,默認為運行任務的core的總數(mesos細粒度模式為8個,local模式為本地的core總數),官方建議為設置成運行任務的core的2-3倍。
(c)提高executor的內存 通過spark.executor.memory適當提高executor的memory值。
(d)增加並行task的數目 通過增加並行task的數目,從而減小每個task的數據量。(spark.default.parallelism)
(e)查看是否存在數據傾斜的問題 是否存在某個key數據特別大導致傾斜?如果存在可以單獨處理或者考慮改變數據分區規則。