spark(9)spark程序的序列化問題及解決方法


spark程序的序列化問題

transformation操作為什么需要序列化

spark是分布式執行引擎,其核心抽象是彈性分布式數據集RDD,其代表了分布在不同節點的數據。Spark的計算是在executor上分布式執行的,所以用戶執行RDD的map,flatMap,reduceByKey等transformation 操作時可能有如下執行過程:

  1. 代碼中對象在driver本地序列化
  2. 對象序列化后傳輸到遠程executor節點
  3. 遠程executor節點反序列化對象
  4. 最終遠程節點執行

這些操作要序列化的原因:

我們知道,transformation這些算子都是要傳入參數的,而且很多的參數都是函數,類似於閉包,閉包可簡單理解成“定義在一個函數內部的函數”。

假如說作為算子參數的函數是:x=>(x,外部定義的對象或變量等),外部定義的對象或變量等是在driver端創建的,那么如果作為算子參數的函數要使用外部的東西,就要從driver端拉取外部對象等過來到當前executor,從而使用。

因此,對象在執行中需要序列化通過網絡傳輸,則必須經過序列化過程。

image-20200416162320980

spark的任務序列化異常原因

報錯的可能原因

在編寫spark程序中,由於在map,foreachPartition等算子內部使用了外部定義的變量和函數,從而引發Task未序列化問題

然而spark算子在計算過程中使用外部變量在許多情形下確實在所難免,比如在filter算子根據外部指定的條件進行過濾,map根據相應的配置進行變換。

經常會出現“org.apache.spark.SparkException: Task not serializable”這個錯誤,出現這個錯誤的原因可能是:

  1. 這些算子使用了外部的變量,但是這個變量不能序列化。
  2. 當前類使用了“extends Serializable”聲明支持序列化,但是由於某些字段不支持序列化,仍然會導致整個類序列化時出現問題,最終導致出現Task未序列化問題。

示例1:

數據庫連接定義在了foreachPartition算子外部,當算子內部要使用該連接時,就出現了序列化錯誤。這是因為這個數據庫連接是在driver端構建的,而數據庫連接沒有實現序列化,無法傳輸到不同機器的executor,就報錯了。

image-20200416163718819

示例2:

看下圖,serialDemo是在object外部定義的類,雖然serialDemo extends Serializable實現序列化,但是,因為該類里面的數據庫連接是conne是不支持序列化的,導致序列化不成功。雖然引用的是name變量,但還是報錯了。

如果函數中使用了該類對象的成員變量,該類除了要實現序列化之外,所有的成員變量必須要實現序列化

image-20200416164933460

spark中解決序列化問題的辦法

  1. 如果函數中使用了該類對象,該類要實現序列化,序列化方法:class xxx extends Serializable{}
  2. 如果函數中使用了該類對象的成員變量,該類除了要實現序列化之外,所有的成員變量必須要實現序列化
  3. 對於不能序列化的成員變量使用“@transient”標注,告訴編譯器不需要序列化
  4. 也可將依賴的變量獨立放到一個小的class中,讓這個class支持序列化,這樣做可以減少網絡傳輸量,提高效率。
  5. 可以把對象的創建直接在該函數中構建這樣避免需要序列化

因此,遵從這些方法,將上面示例2中的代碼改成如下就可以運行成功了:

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

class serialDemo extends Serializable {
  val name:String="krystal"
  @transient
  val conne = DriverManager.getConnection("jdbc:mysql://node03:3306/demo1", "root", "123456")
}
object Data2MysqlForeachPartition {
  def main(args: Array[String]): Unit = {
    val sparkkconf = new SparkConf().setAppName("ForeachMysql").setMaster("local[2]")
    val sc = new SparkContext(sparkkconf)
    val rdd1=sc.parallelize(1 to 10)
    val sd=new serialDemo()
    val rdd2=rdd1.map(x=>(x,sd.name))
    rdd2.foreach(println)
  }
}

運行結果為:

(6,krystal)
(1,krystal)
(2,krystal)
(3,krystal)
(4,krystal)
(5,krystal)
(7,krystal)
(8,krystal)
(9,krystal)
(10,krystal)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM