spark程序的序列化問題
transformation操作為什么需要序列化
spark是分布式執行引擎,其核心抽象是彈性分布式數據集RDD,其代表了分布在不同節點的數據。Spark的計算是在executor上分布式執行的,所以用戶執行RDD的map,flatMap,reduceByKey等transformation 操作時可能有如下執行過程:
- 代碼中對象在driver本地序列化
- 對象序列化后傳輸到遠程executor節點
- 遠程executor節點反序列化對象
- 最終遠程節點執行
這些操作要序列化的原因:
我們知道,transformation這些算子都是要傳入參數的,而且很多的參數都是函數,類似於閉包,閉包可簡單理解成“定義在一個函數內部的函數”。
假如說作為算子參數的函數是:x=>(x,外部定義的對象或變量等),外部定義的對象或變量等是在driver端創建的,那么如果作為算子參數的函數要使用外部的東西,就要從driver端拉取外部對象等過來到當前executor,從而使用。
因此,對象在執行中需要序列化通過網絡傳輸,則必須經過序列化過程。
spark的任務序列化異常原因
報錯的可能原因
在編寫spark程序中,由於在map,foreachPartition等算子內部使用了外部定義的變量和函數,從而引發Task未序列化問題。
然而spark算子在計算過程中使用外部變量在許多情形下確實在所難免,比如在filter算子根據外部指定的條件進行過濾,map根據相應的配置進行變換。
經常會出現“org.apache.spark.SparkException: Task not serializable”這個錯誤,出現這個錯誤的原因可能是:
- 這些算子使用了外部的變量,但是這個變量不能序列化。
- 當前類使用了“extends Serializable”聲明支持序列化,但是由於某些字段不支持序列化,仍然會導致整個類序列化時出現問題,最終導致出現Task未序列化問題。
示例1:
數據庫連接定義在了foreachPartition算子外部,當算子內部要使用該連接時,就出現了序列化錯誤。這是因為這個數據庫連接是在driver端構建的,而數據庫連接沒有實現序列化,無法傳輸到不同機器的executor,就報錯了。
示例2:
看下圖,serialDemo是在object外部定義的類,雖然serialDemo extends Serializable實現序列化,但是,因為該類里面的數據庫連接是conne是不支持序列化的,導致序列化不成功。雖然引用的是name變量,但還是報錯了。
如果函數中使用了該類對象的成員變量,該類除了要實現序列化之外,所有的成員變量必須要實現序列化。
spark中解決序列化問題的辦法
- 如果函數中使用了該類對象,該類要實現序列化,序列化方法:class xxx extends Serializable{}
- 如果函數中使用了該類對象的成員變量,該類除了要實現序列化之外,所有的成員變量必須要實現序列化
- 對於不能序列化的成員變量使用“@transient”標注,告訴編譯器不需要序列化
- 也可將依賴的變量獨立放到一個小的class中,讓這個class支持序列化,這樣做可以減少網絡傳輸量,提高效率。
- 可以把對象的創建直接在該函數中構建這樣避免需要序列化
因此,遵從這些方法,將上面示例2中的代碼改成如下就可以運行成功了:
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
class serialDemo extends Serializable {
val name:String="krystal"
@transient
val conne = DriverManager.getConnection("jdbc:mysql://node03:3306/demo1", "root", "123456")
}
object Data2MysqlForeachPartition {
def main(args: Array[String]): Unit = {
val sparkkconf = new SparkConf().setAppName("ForeachMysql").setMaster("local[2]")
val sc = new SparkContext(sparkkconf)
val rdd1=sc.parallelize(1 to 10)
val sd=new serialDemo()
val rdd2=rdd1.map(x=>(x,sd.name))
rdd2.foreach(println)
}
}
運行結果為:
(6,krystal)
(1,krystal)
(2,krystal)
(3,krystal)
(4,krystal)
(5,krystal)
(7,krystal)
(8,krystal)
(9,krystal)
(10,krystal)