spark 2.1.1
一 問題重現
問題代碼示例
object MethodPositionTest { val sparkConf = new SparkConf().setAppName("MethodPositionTest") val sc = new SparkContext(sparkConf) val spark = SparkSession.builder().enableHiveSupport().getOrCreate() def main(args : Array[String]) : Unit = { val cnt = spark.sql("select * from test_table").rdd.map(item => mapFun(item.getString(0))).count println(cnt) } def mapFun(str : String) : String = "p:" + str }
當如下3行代碼放到main外時
val sparkConf = new SparkConf().setAppName(getName)
val sc = new SparkContext(sparkConf)
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
Caused by: java.lang.ExceptionInInitializerError
at app.package.AppClass$$anonfun$1.apply(AppClass.scala:208)
at org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)
at org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
at app.package.AppClass$.<clinit>(AppClass.scala)
二 問題解析
MethodPositionTest 定義了一個匿名函數anonfun,這個匿名函數在RDD.map中調用,即在Executor中執行,匿名函數中又依賴mapFun方法,觸發類的初始化:MethodPositionTest$.<clinit>,初始化時會執行main外的spark初始化代碼,即在Executor中創建SparkConf和SparkContext,這是不應該發生的,一個spark應用中只能有一個SparkContext並且應該在Driver端而不是Executor,而且發生之后會導致錯誤,代碼如下:
org.apache.spark.SparkContext
try { _conf = config.clone() _conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") }
問題1)為什么在Driver端不會報錯找不到master,而在Executor端會報錯
Spark應用代碼如下:
val sparkConf = new SparkConf().setAppName(getName)
這里SparkConf只設置了AppName,為什么在Driver端不會報錯找不到master,而在Executor端會報錯,這里需要看Spark Submit的執行過程,詳見 https://www.cnblogs.com/barneywill/p/9820684.html
Driver端執行時SparkSubmit會將各種參數包括命令行、配置文件、系統環境變量等,統一設置到系統環境變量
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
然后SparkConf會默認從系統環境變量中加載配置
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
所以Driver端的SparkConf會包含所有的參數,但是Executor端則沒有。
問題2)當spark相關的初始化代碼在main外時,為什么有時報錯,有時不報錯
具體情形如下:
1)如果main里邊的transformation(示例中的map方法)不依賴外部函數調用,正常;
2)如果main里邊的transformation(示例中的map方法)依賴main里的函數,報錯;
3)如果main里邊的transformation(示例中的map方法)依賴main外的函數,報錯;
這里可以通過反編譯代碼來看原因,示例MethodPositionTest的反編譯代碼如下:
public final class MethodPositionTest$ { public static final MethodPositionTest$ MODULE$ = this; private final SparkConf sparkConf = (new SparkConf()).setAppName("MethodPositionTest"); private final SparkContext sc = new SparkContext(sparkConf()); private final SparkSession spark; public SparkConf sparkConf() { return sparkConf; } public SparkContext sc() { return sc; } public SparkSession spark() { return spark; } public String mapFun(String str) { return (new StringBuilder()).append("p:").append(str).toString(); } public void main(String args[]) { long cnt = spark().sql("select * from test_table").rdd().map(new Serializable() { public static final long serialVersionUID = 0L; public final String apply(Row item) { return MethodPositionTest$.MODULE$.mapFun(item.getString(0)); } public final volatile Object apply(Object v1) { return apply((Row)v1); } }, ClassTag$.MODULE$.apply(java/lang/String)).count(); Predef$.MODULE$.println(BoxesRunTime.boxToLong(cnt)); } private MethodPositionTest$() { spark = SparkSession$.MODULE$.builder().enableHiveSupport().getOrCreate(); } static { new MethodPositionTest$(); } }
可見這里的匿名內部類依賴類MethodPositionTest$的方法mapFun,所以會觸發類MethodPositionTest$的加載以及靜態代碼塊執行,觸發報錯;
綜上,不建議將spark的初始化代碼放到main外,很容易出問題。