spark遇到的問題(持續更新)


1.DataFrame使用unionAll算子

java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 44 columns and the right has 45;

問題描述:unionAll需要兩個DataFrame擁有一樣的列,如果連接兩個表,則需要兩個表字段數量一致。如果不一致,則可以采取一些處理方法:加載的時候加載的字段一致。該算子本人經常使用,但是還是踩了坑:如下,我需要將一個天表數據按照一個小時為粒度取出,分開進行hiveSql運算,合並接着運算,最終得到一個DF,但是由於經過hiveContext.sql之后的字段有了改變,所以,這個坑還是無意外的踩了。

解決方法:可以對每個小時的數據進行取出,得到df1(原始數據集),進行hiveContext.sql操作,得到df2,此時df1與df2字段個數不同,將df2存進DataFrame數組dfs,然后依次將24小時存進數組dfs。此時dfs一共擁有24個df2字段數一直的DataFrame。

此時,可遍歷依次將dfs[index]的數據集unionAll操作,sql操作,得到新的DataFrame。

此方法可以解決大的DataFrame計算完,無法進行存儲的缺陷。(為什么這么說,如果一次性加載一天的數據,數據量大概為3000W條,字段數為12個,通過計算,主要是hiveContext.sql計算,得到480W數據量,此時DataFrame好像只有count操作能夠執行,其它的都執行不了。也不能進行forEachPartition操作,這是因為JVM內存限制,可能跟設置無關,因為我需要將計算結果入庫,所以必須要實現DataFrame進行forEachPartition)。代碼做了一點優化,將每小時的數據合並成每六小時。

為什么一定要這么實現dataFrame入庫(mysql)?

本人嘗試:①使用原生的dataframe.jdbc,結果:入庫慢,一般地10萬數據,幾十個字段,使用forEachPartition批量入,大概10s,使用官方jdbc需要3~4min。最重要的是也無法對我實際中產生的數據入庫。(一條都沒入庫,處於計算完待入庫,轉換過程中歇菜~)

②原來的入庫方式是dataFrame.toJavaRdd().forEachPartition(),對小數據量能入,對計算量大數據量大的不能入,觀察sparkUI發現,卡在.toJavaRdd算子上,所以想改造dataFrame直接入庫,dataFrame.forEachPartition形式。實現scala.Function1類即可。結果:與DataFrame.toJavaRdd一樣入不了庫,通過查看源碼發現,基本所有的dataFrame算子都會轉換成rdd的形式,此例子的也是。

③為什么dataFrame可以count卻不能進行其他算子操作,想去模仿count的實現(scala實現),發現是空指針。。

④大的dataFrame轉換成小的dataFrame,嘗試dataFrame.randomSplit操作,將分割成若干個小df,結果表明,任何想要操作大dataFrame的行為都是不可行的!

⑤所以,只能將小的dataFrame計算完,與其他小的dataFrame.unionAll的方式,進行處理。我們此處是數據量經過處理一定會變小,1+1<2的形式,經過實踐,此方法可行,就是時間很慢,計算需要7s,計算完到入庫完,需要7~8min.

 

原因:DataFrame不是不能夠對大數據量的數據表進行操作,而是需要提高加載並行度,將數據分布到各個執行器上,hiveContext.read().jdbc算是轉換算子,會惰性執行,可以將加載后的數據進行緩存提高速度。

 2.說到forEachPartition入庫操作,就得順帶一提序列化陷阱。

使用forEachPartition對每個分區的數據進行存儲操作,傳入forEachPartition匿名類的所有參數對象都必須序列化,dataSource是一定傳不了的,還想着少建立與數據庫連接,此時為空指針,需要在匿名類內創建。還有日志也是打印不了的,可以傳遞些字符串或者自定義序列化過的bean等。

 

3.提高Spark並行度

①設置參數:

spark.default.parallelism=60 (應該為總executor-cores的2~3倍,官方推薦)
spark.sql.shuffle.partitions = 60

只有在shuffle操作之后生效;

②如果數據在HDFS上,增大block;

③如果數據是在關系數據庫上,增加加載並行度,應該是有四種方式增大並行度;

④RDD.repartition,給RDD重新設置partition的數量;

⑤reduceByKey的算子指定partition的數量;
val rdd2 = rdd1.reduceByKey(_+_,10)

⑥val rdd3 = rdd1.join(rdd2) rdd3里面partiiton的數量是由父RDD中最多的partition數量來決定,因此使用join算子的時候,增加父RDD中partition的數量。

參考博客:https://www.cnblogs.com/haozhengfei/p/e19171de913caf91228d9b432d0eeefb.html

4.hdfs錯誤 

Caused by: java.io.IOException: Filesystem closed

原因:多個datanode在getFileSystem過程中,由於Configuration一樣,會得到同一個FileSystem。如果有一個datanode在使用完關閉連接,其它的datanode在訪問就會出現上述異常
解決辦法
在hdfs core-site.xml里把fs.hdfs.impl.disable.cache設置為true

5.dataFrame.write().mode("overwrite").text(hdfspath);

org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 6 columns.;

dataFrame只能保存一列數據,如果仍需要保存多列數據,則將多列數據合並成一列,再保存;

或者轉換為RDD再保存:

dataFrame.toJavaRDD().saveAsTextFile(hdfsPath);

 6.DataFrame超過200列(>200columns)無法緩存,超過8117列無法計算。

來源:https://issues.apache.org/jira/browse/SPARK-16664


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM