1.1 經典14問
1.2 問題前提
2.1 Spark序列化出現情況
2.2 Spark序列化問題解決
3.1 Spark線程安全出現情況
3.2 Spark線程安全問題解決
正文
一,必備知識
1.1 經典14問
1.SparkContext哪一端生成的? Driver端 2.DAG是在哪一端被構建的? Driver端 3.RDD是在哪一端生成的? Driver端 4.廣播變量是在哪一端調用的方法進行廣播的? Driver端 5.要廣播的數據應該在哪一端先創建好再廣播呢? Driver端 6.調用RDD的算子(Transformation和Action)是在哪一端調用的 Driver端 7.RDD在調用Transformation和Action時需要傳入一個函數,函數是在哪一端聲明和傳入的? Driver端 8.RDD在調用Transformation和Action時需要傳入函數,請問傳入的函數是在哪一端執行了函數的業務邏輯? Executor中的Task執行的 9.自定義的分區器這個類是在哪一端實例化的? Driver端 10.分區器中的getParitition方法在哪一端調用的呢? Executor中的Task中調用的 11.Task是在哪一端生成的呢? Driver端 12.DAG是在哪一端構建好的並被切分成一到多個State的 Driver端 13.DAG是哪個類完成的切分Stage的功能? DAGScheduler 14.DAGScheduler將切分好的Stage以什么樣的形式給TaskScheduler TaskSet
1.2 需求前提
在上面的12問的7-8問中,函數的申明和調用分別在Driver和Execute中進行,這其中就會牽扯到序列化問題和線程安全問題。接下來會對其進行解釋。
二,序列化問題
2.1 Spark序列化出現情況
工具類:
package cn.edu360.spark05 // 隨意定義一工具類 class MyUtil { def get(msg: String): String ={ msg+"aaa" } }
Spark實現類:
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) // 對類進行實例化 val util = new MyUtil // 調用實例的方法 val value: RDD[String] = words.map(word => util.get(word)) value.collect() sc.stop() } }
報錯信息如下:
上述報錯信息就說明是MyUtil實例的序列化問題。該實例是在Driver端創建,通過網絡發送到Worker的Executer端。但是這個實例並為序列化,所以會報這些錯誤。
2.2 Spark序列化問題解決
解決方案一:實現序列化接口
package cn.edu360.spark05 // 繼承Serializable class MyUtil extends Serializable { def get(msg: String): String ={ msg+"aaa" } }
弊端:需要自己實現序列化接口,相對麻煩
解決方案二:不實現序列化接口,在Executer進行MyUtil內進行實例化
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) val value: RDD[String] = words.map(word => { // 在這里進行實例化,這里的操作是在Executer中 val util = new MyUtil util.get(word) }) val result: Array[String] = value.collect() print(result.toBuffer) sc.stop() } }
弊端:每一次調用都需要創建一個新的實例,浪費資源,浪費內存。
解決方案三:采用單例模式
MyUtil類:
package cn.edu360.spark05 // 將class 改為 object的單例模式 object MyUtil { def get(msg: String): String ={ msg+"aaa" } }
Spark實現類:
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) val value: RDD[String] = words.map(word => { // 調用方法 MyUtil.get(word) }) val result: Array[String] = value.collect() print(result.toBuffer) sc.stop() } }
三,線程安全問題
3.1 Spark線程安全出現情況、
有共享成員變量:
1. 工具類使用object,說明工具類是單例的,有線程安全問題。在函數內部使用,是在Executer中被初始化,一個Executer中有一個實例,所以 就出現了線程安全問題。
2. 工具類使用Class,說明是多例的,沒有線程安全問題。每個task都會持有一份工具類的實例。
沒有共享成員變量:
1. 工具類Object,沒有線程安全問題
2. 工具類使用class,實現序列化即可
3.2 Spark線程安全問題解決
工具類優先使用object,但盡可能不使用成員變量,若實在有這方面的需求,可以定義類的類型,或者把成員變量變成線程安全的成員變量,例如加鎖等。