Spark 學習(六) Spark 的線程安全和序列化問題


一,必備知識

  1.1 經典14問

  1.2 問題前提

二,序列化問題

  2.1 Spark序列化出現情況

  2.2 Spark序列化問題解決

三,線程安全問題

  3.1 Spark線程安全出現情況

  3.2 Spark線程安全問題解決

 

 

 

 

正文

一,必備知識

  1.1 經典14問

1.SparkContext哪一端生成的?
    Driver端

2.DAG是在哪一端被構建的?
    Driver端
    
3.RDD是在哪一端生成的?
    Driver端

4.廣播變量是在哪一端調用的方法進行廣播的?
    Driver端

5.要廣播的數據應該在哪一端先創建好再廣播呢? 
    Driver端

6.調用RDD的算子(Transformation和Action)是在哪一端調用的
    Driver端
    
7.RDD在調用Transformation和Action時需要傳入一個函數,函數是在哪一端聲明和傳入的?
    Driver端

8.RDD在調用Transformation和Action時需要傳入函數,請問傳入的函數是在哪一端執行了函數的業務邏輯?
    Executor中的Task執行的

9.自定義的分區器這個類是在哪一端實例化的?
    Driver端

10.分區器中的getParitition方法在哪一端調用的呢?
    Executor中的Task中調用的

11.Task是在哪一端生成的呢? 
    Driver端

12.DAG是在哪一端構建好的並被切分成一到多個State的
    Driver端

13.DAG是哪個類完成的切分Stage的功能?
    DAGScheduler
    
14.DAGScheduler將切分好的Stage以什么樣的形式給TaskScheduler
    TaskSet

  1.2 需求前提

  在上面的12問的7-8問中,函數的申明和調用分別在Driver和Execute中進行,這其中就會牽扯到序列化問題和線程安全問題。接下來會對其進行解釋。

二,序列化問題

  2.1 Spark序列化出現情況

  工具類:

package cn.edu360.spark05

// 隨意定義一工具類
class MyUtil {
    def get(msg: String): String ={
        msg+"aaa"
    }
}

  Spark實現類:

package cn.edu360.spark05
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SequenceTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
        var sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
        val words = lines.flatMap(_.split(" "))
        // 對類進行實例化
        val util = new MyUtil
        // 調用實例的方法
        val value: RDD[String] = words.map(word => util.get(word))
        value.collect()
        sc.stop()
    }
}

  報錯信息如下:

  

  

  上述報錯信息就說明是MyUtil實例的序列化問題。該實例是在Driver端創建,通過網絡發送到Worker的Executer端。但是這個實例並為序列化,所以會報這些錯誤。

  2.2 Spark序列化問題解決

  解決方案一:實現序列化接口

package cn.edu360.spark05

// 繼承Serializable
class MyUtil extends Serializable {
    def get(msg: String): String ={
        msg+"aaa"
    }
}

  弊端:需要自己實現序列化接口,相對麻煩

  解決方案二:不實現序列化接口,在Executer進行MyUtil內進行實例化

package cn.edu360.spark05
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SequenceTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
        var sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
        val words = lines.flatMap(_.split(" "))
        val value: RDD[String] = words.map(word => {
            // 在這里進行實例化,這里的操作是在Executer中
            val util = new MyUtil
            util.get(word)
        })
        val result: Array[String] = value.collect()
        print(result.toBuffer)
        sc.stop()
    }
}

  弊端:每一次調用都需要創建一個新的實例,浪費資源,浪費內存。

  解決方案三:采用單例模式

  MyUtil類:

package cn.edu360.spark05

// 將class 改為 object的單例模式
object MyUtil {
    def get(msg: String): String ={
        msg+"aaa"
    }
}

  Spark實現類:

package cn.edu360.spark05
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SequenceTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
        var sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
        val words = lines.flatMap(_.split(" "))
        val value: RDD[String] = words.map(word => {
            // 調用方法
            MyUtil.get(word)
        })
        val result: Array[String] = value.collect()
        print(result.toBuffer)
        sc.stop()
    }
}

三,線程安全問題

  3.1 Spark線程安全出現情況、

  有共享成員變量:

    1. 工具類使用object,說明工具類是單例的,有線程安全問題。在函數內部使用,是在Executer中被初始化,一個Executer中有一個實例,所以 就出現了線程安全問題。

    2. 工具類使用Class,說明是多例的,沒有線程安全問題。每個task都會持有一份工具類的實例。

  沒有共享成員變量:

    1. 工具類Object,沒有線程安全問題

    2. 工具類使用class,實現序列化即可

  3.2 Spark線程安全問題解決

    工具類優先使用object,但盡可能不使用成員變量,若實在有這方面的需求,可以定義類的類型,或者把成員變量變成線程安全的成員變量,例如加鎖等。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM