錯誤信息:
17/05/20 18:51:39 ERROR JobScheduler: Error running job streaming job 1495277499000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
問題原因:再對RDD進行操作時引用了類的成員變量而該成員變量無法被序列化所導致的
例如如下代碼:
object Test2 extends App{
val conf = new SparkConf().setAppName("RVM").setMaster("local")
val sc = new SparkContext(conf)
val matrix = new DenseMatrix(2,2,Array(1.0,2,3,4))
new Test(sc,matrix).run()
}
class Test(scc:SparkContext,PHI:DenseMatrix) extends Serializable{
val ts = 0.1
def run(): Unit ={
val rdds = scc.parallelize(0 to 3)
val a = rdds.map(
x =>{
PHI.toArray.apply(x)*x
}
)
a.collect.foreach(println(_))
}
}
這一段代碼運行確實會報錯,而且報錯如預期一樣,最開始以為是因為DenseMatrix不能序列化導致的,結果將DenseMatrix換成了其它類型如Double等基本類型同樣會報錯,然后發現是scc(SparkContext)不能序列化導致的錯誤。
解決辦法是在不能序列化的變量前添加注釋@transient告訴編譯器該變量不需要進行序列化。網上還有其它的一些處理方法暫時未深入研究,
如果還是沒有得到解決:
可以試下如下方法:
出現“org.apache.spark.SparkException: Task not serializable"這個錯誤,一般是因為在map、filter等的參數使用了外部的變量,但是這個變量不能序列化。特別是當引用了某個類(經常是當前類)的成員函數或變量時,會導致這個類的所有成員(整個類)都需要支持序列化。解決這個問題最常用的方法有:
- 如果可以,將依賴的變量放到map、filter等的參數內部定義。這樣就可以使用不支持序列化的類;
- 如果可以,將依賴的變量獨立放到一個小的class中,讓這個class支持序列化;這樣做可以減少網絡傳輸量,提高效率;
- 如果可以,將被依賴的類中不能序列化的部分使用transient關鍵字修飾,告訴編譯器它不需要序列化。
- 將引用的類做成可序列化的。
