一.目的
通过实际的SparkSession代码来熟悉并且深入理解Builder的作用
二.SparkSession创建
//常规创建 val sparkSession = SparkSession.builder() .appName("Common") .master("local[0]") .getOrCreate() //通过sparkConf //sparkConf val sparkConf = new SparkConf().setMaster("local[0]").setAppName("Recommender") //spark SparkSession.builder().config(sparkConf).getOrCreate() //Hive支持 val sparkSession = SparkSession.builder() .appName("HiveSql") .master("local[0]") .enableHiveSupport() .getOrCreate()
三.结合配置文件来创建
Builder模式的好处之一是可以将对象的创建权交给Builder类,可将对象的配置传递和被创建对象隔离开,方便使用配置文件来配置创建对象,下面的代码可以读取外部配置文件来创建sparkSession
val sparkBuild = SparkSession .builder() .appName("HiveSql") .master(s"local[0]") .enableHiveSupport() val configuration = new Configuration() configuration.addResource("spark/hive/core-site.xml") configuration.addResource("spark/hive/hdfs-site.xml") configuration.addResource("spark/hive/hive-site.xml") val iterator = configuration.iterator() while (iterator.hasNext){ val next = iterator.next() sparkBuild.config(next.getKey,next.getValue) } sparkBuild.getOrCreate()
四.角色划分
五.SparkSession的Builder类的源码
//这里将构造方法私有化,只能由Builder类创建或者newSession方法创建 @InterfaceStability.Stable class SparkSession private( @transient val sparkContext: SparkContext, @transient private val existingSharedState: Option[SharedState], @transient private val parentSessionState: Option[SessionState], @transient private[sql] val extensions: SparkSessionExtensions) extends Serializable with Closeable with Logging { self => private[sql] def this(sc: SparkContext) { this(sc, None, None, new SparkSessionExtensions) } sparkContext.assertNotStopped() def version: String = SPARK_VERSION //.....
def newSession(): SparkSession = {
new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)
}
//....
}
Builder内部类的代码
@InterfaceStability.Stable object SparkSession { @InterfaceStability.Stable class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] private[this] val extensions = new SparkSessionExtensions private[this] var userSuppliedContext: Option[SparkContext] = None private[spark] def sparkContext(sparkContext: SparkContext): Builder = synchronized { userSuppliedContext = Option(sparkContext) this } def appName(name: String): Builder = config("spark.app.name", name) def config(key: String, value: String): Builder = synchronized { options += key -> value this } def config(key: String, value: Long): Builder = synchronized { options += key -> value.toString this } def config(key: String, value: Double): Builder = synchronized { options += key -> value.toString this } def config(key: String, value: Boolean): Builder = synchronized { options += key -> value.toString this } def config(conf: SparkConf): Builder = synchronized { conf.getAll.foreach { case (k, v) => options += k -> v } this } def master(master: String): Builder = config("spark.master", master) def enableHiveSupport(): Builder = synchronized { if (hiveClassesArePresent) { config(CATALOG_IMPLEMENTATION.key, "hive") } else { throw new IllegalArgumentException( "Unable to instantiate SparkSession with Hive support because " + "Hive classes are not found.") } } def withExtensions(f: SparkSessionExtensions => Unit): Builder = { f(extensions) this } def getOrCreate(): SparkSession = synchronized { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } SparkSession.synchronized { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } val sparkContext = userSuppliedContext.getOrElse { // set app name if not given val randomAppName = java.util.UUID.randomUUID().toString val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(randomAppName) } val sc = SparkContext.getOrCreate(sparkConf) options.foreach { case (k, v) => sc.conf.set(k, v) } if (!sc.conf.contains("spark.app.name")) { sc.conf.setAppName(randomAppName) } sc } val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) if (extensionConfOption.isDefined) { val extensionConfClassName = extensionConfOption.get try { val extensionConfClass = Utils.classForName(extensionConfClassName) val extensionConf = extensionConfClass.newInstance() .asInstanceOf[SparkSessionExtensions => Unit] extensionConf(extensions) } catch { case e @ (_: ClassCastException | _: ClassNotFoundException | _: NoClassDefFoundError) => logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e) } } session = new SparkSession(sparkContext, None, None, extensions) options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } defaultSession.set(session) sparkContext.addSparkListener(new SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { defaultSession.set(null) sqlListener.set(null) } }) } return session } } def builder(): Builder = new Builder //...... }
六.总结
由以上可知,SparkSession通过SparkSession类,SparkSession静态类和Builder内部类实现了非常方便调用的接口,可以灵活根据参数和配置文件来创建SparkSession对象
以上参考..\spark-2.2.0\sql\core\src\main\scala\org\apache\spark\sql\SparkSession.scala 的代码,spark源码可自行去spark官网下载,地址在下方