一.目的
通過實際的SparkSession代碼來熟悉並且深入理解Builder的作用
二.SparkSession創建
//常規創建 val sparkSession = SparkSession.builder() .appName("Common") .master("local[0]") .getOrCreate() //通過sparkConf //sparkConf val sparkConf = new SparkConf().setMaster("local[0]").setAppName("Recommender") //spark SparkSession.builder().config(sparkConf).getOrCreate() //Hive支持 val sparkSession = SparkSession.builder() .appName("HiveSql") .master("local[0]") .enableHiveSupport() .getOrCreate()
三.結合配置文件來創建
Builder模式的好處之一是可以將對象的創建權交給Builder類,可將對象的配置傳遞和被創建對象隔離開,方便使用配置文件來配置創建對象,下面的代碼可以讀取外部配置文件來創建sparkSession
val sparkBuild = SparkSession .builder() .appName("HiveSql") .master(s"local[0]") .enableHiveSupport() val configuration = new Configuration() configuration.addResource("spark/hive/core-site.xml") configuration.addResource("spark/hive/hdfs-site.xml") configuration.addResource("spark/hive/hive-site.xml") val iterator = configuration.iterator() while (iterator.hasNext){ val next = iterator.next() sparkBuild.config(next.getKey,next.getValue) } sparkBuild.getOrCreate()
四.角色划分
五.SparkSession的Builder類的源碼
//這里將構造方法私有化,只能由Builder類創建或者newSession方法創建 @InterfaceStability.Stable class SparkSession private( @transient val sparkContext: SparkContext, @transient private val existingSharedState: Option[SharedState], @transient private val parentSessionState: Option[SessionState], @transient private[sql] val extensions: SparkSessionExtensions) extends Serializable with Closeable with Logging { self => private[sql] def this(sc: SparkContext) { this(sc, None, None, new SparkSessionExtensions) } sparkContext.assertNotStopped() def version: String = SPARK_VERSION //.....
def newSession(): SparkSession = {
new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)
}
//....
}
Builder內部類的代碼
@InterfaceStability.Stable object SparkSession { @InterfaceStability.Stable class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] private[this] val extensions = new SparkSessionExtensions private[this] var userSuppliedContext: Option[SparkContext] = None private[spark] def sparkContext(sparkContext: SparkContext): Builder = synchronized { userSuppliedContext = Option(sparkContext) this } def appName(name: String): Builder = config("spark.app.name", name) def config(key: String, value: String): Builder = synchronized { options += key -> value this } def config(key: String, value: Long): Builder = synchronized { options += key -> value.toString this } def config(key: String, value: Double): Builder = synchronized { options += key -> value.toString this } def config(key: String, value: Boolean): Builder = synchronized { options += key -> value.toString this } def config(conf: SparkConf): Builder = synchronized { conf.getAll.foreach { case (k, v) => options += k -> v } this } def master(master: String): Builder = config("spark.master", master) def enableHiveSupport(): Builder = synchronized { if (hiveClassesArePresent) { config(CATALOG_IMPLEMENTATION.key, "hive") } else { throw new IllegalArgumentException( "Unable to instantiate SparkSession with Hive support because " + "Hive classes are not found.") } } def withExtensions(f: SparkSessionExtensions => Unit): Builder = { f(extensions) this } def getOrCreate(): SparkSession = synchronized { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } SparkSession.synchronized { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } val sparkContext = userSuppliedContext.getOrElse { // set app name if not given val randomAppName = java.util.UUID.randomUUID().toString val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(randomAppName) } val sc = SparkContext.getOrCreate(sparkConf) options.foreach { case (k, v) => sc.conf.set(k, v) } if (!sc.conf.contains("spark.app.name")) { sc.conf.setAppName(randomAppName) } sc } val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) if (extensionConfOption.isDefined) { val extensionConfClassName = extensionConfOption.get try { val extensionConfClass = Utils.classForName(extensionConfClassName) val extensionConf = extensionConfClass.newInstance() .asInstanceOf[SparkSessionExtensions => Unit] extensionConf(extensions) } catch { case e @ (_: ClassCastException | _: ClassNotFoundException | _: NoClassDefFoundError) => logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e) } } session = new SparkSession(sparkContext, None, None, extensions) options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } defaultSession.set(session) sparkContext.addSparkListener(new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) sqlListener.set(null) } }) } return session } } def builder(): Builder = new Builder //...... }
六.總結
由以上可知,SparkSession通過SparkSession類,SparkSession靜態類和Builder內部類實現了非常方便調用的接口,可以靈活根據參數和配置文件來創建SparkSession對象
以上參考..\spark-2.2.0\sql\core\src\main\scala\org\apache\spark\sql\SparkSession.scala 的代碼,spark源碼可自行去spark官網下載,地址在下方