Builder模式實戰2 (Spark/SparkSession)


一.目的

  通過實際的SparkSession代碼來熟悉並且深入理解Builder的作用

  Builder模式實戰1

 

二.SparkSession創建

//常規創建

val sparkSession = SparkSession.builder()
   .appName("Common")
   .master("local[0]")
   .getOrCreate()

//通過sparkConf
//sparkConf
    val sparkConf = new SparkConf().setMaster("local[0]").setAppName("Recommender")
    //spark
    SparkSession.builder().config(sparkConf).getOrCreate()

//Hive支持
val sparkSession = SparkSession.builder()
    .appName("HiveSql")
    .master("local[0]")
    .enableHiveSupport()
    .getOrCreate()

 

三.結合配置文件來創建

Builder模式的好處之一是可以將對象的創建權交給Builder類,可將對象的配置傳遞和被創建對象隔離開,方便使用配置文件來配置創建對象,下面的代碼可以讀取外部配置文件來創建sparkSession

val sparkBuild = SparkSession
        .builder()
        .appName("HiveSql")
        .master(s"local[0]")
        .enableHiveSupport()

      val configuration = new Configuration()
      configuration.addResource("spark/hive/core-site.xml")
      configuration.addResource("spark/hive/hdfs-site.xml")
      configuration.addResource("spark/hive/hive-site.xml")
      val iterator = configuration.iterator()

      while (iterator.hasNext){
        val next = iterator.next()
        sparkBuild.config(next.getKey,next.getValue)
      }
      sparkBuild.getOrCreate()

 

四.角色划分

 

 

 

五.SparkSession的Builder類的源碼

//這里將構造方法私有化,只能由Builder類創建或者newSession方法創建

@InterfaceStability.Stable
class SparkSession private(
    @transient val sparkContext: SparkContext,
    @transient private val existingSharedState: Option[SharedState],
    @transient private val parentSessionState: Option[SessionState],
    @transient private[sql] val extensions: SparkSessionExtensions)
  extends Serializable with Closeable with Logging { self =>

  private[sql] def this(sc: SparkContext) {
    this(sc, None, None, new SparkSessionExtensions)
  }

  sparkContext.assertNotStopped()

  
  def version: String = SPARK_VERSION
//.....

  def newSession(): SparkSession = {
    new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)
  }

//....
}

 

Builder內部類的代碼

@InterfaceStability.Stable
object SparkSession {
  @InterfaceStability.Stable
  class Builder extends Logging {

    private[this] val options = new scala.collection.mutable.HashMap[String, String]

    private[this] val extensions = new SparkSessionExtensions

    private[this] var userSuppliedContext: Option[SparkContext] = None

    private[spark] def sparkContext(sparkContext: SparkContext): Builder = synchronized {
      userSuppliedContext = Option(sparkContext)
      this
    }

   
    def appName(name: String): Builder = config("spark.app.name", name)

    def config(key: String, value: String): Builder = synchronized {
      options += key -> value
      this
    }

    def config(key: String, value: Long): Builder = synchronized {
      options += key -> value.toString
      this
    }

    def config(key: String, value: Double): Builder = synchronized {
      options += key -> value.toString
      this
    }

  
    def config(key: String, value: Boolean): Builder = synchronized {
      options += key -> value.toString
      this
    }

    def config(conf: SparkConf): Builder = synchronized {
      conf.getAll.foreach { case (k, v) => options += k -> v }
      this
    }

    def master(master: String): Builder = config("spark.master", master)

    def enableHiveSupport(): Builder = synchronized {
      if (hiveClassesArePresent) {
        config(CATALOG_IMPLEMENTATION.key, "hive")
      } else {
        throw new IllegalArgumentException(
          "Unable to instantiate SparkSession with Hive support because " +
            "Hive classes are not found.")
      }
    }

    def withExtensions(f: SparkSessionExtensions => Unit): Builder = {
      f(extensions)
      this
    }

    def getOrCreate(): SparkSession = synchronized {
      var session = activeThreadSession.get()
      if ((session ne null) && !session.sparkContext.isStopped) {
        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
        if (options.nonEmpty) {
          logWarning("Using an existing SparkSession; some configuration may not take effect.")
        }
        return session
      }

   
      SparkSession.synchronized {

        session = defaultSession.get()
        if ((session ne null) && !session.sparkContext.isStopped) {
          options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
          if (options.nonEmpty) {
            logWarning("Using an existing SparkSession; some configuration may not take effect.")
          }
          return session
        }

        val sparkContext = userSuppliedContext.getOrElse {
          // set app name if not given
          val randomAppName = java.util.UUID.randomUUID().toString
          val sparkConf = new SparkConf()
          options.foreach { case (k, v) => sparkConf.set(k, v) }
          if (!sparkConf.contains("spark.app.name")) {
            sparkConf.setAppName(randomAppName)
          }
          val sc = SparkContext.getOrCreate(sparkConf)
         
          options.foreach { case (k, v) => sc.conf.set(k, v) }
          if (!sc.conf.contains("spark.app.name")) {
            sc.conf.setAppName(randomAppName)
          }
          sc
        }

        val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
        if (extensionConfOption.isDefined) {
          val extensionConfClassName = extensionConfOption.get
          try {
            val extensionConfClass = Utils.classForName(extensionConfClassName)
            val extensionConf = extensionConfClass.newInstance()
              .asInstanceOf[SparkSessionExtensions => Unit]
            extensionConf(extensions)
          } catch {
           
            case e @ (_: ClassCastException |
                      _: ClassNotFoundException |
                      _: NoClassDefFoundError) =>
              logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
          }
        }

        session = new SparkSession(sparkContext, None, None, extensions)
        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
        defaultSession.set(session)

     
        sparkContext.addSparkListener(new SparkListener {
          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
            defaultSession.set(null)
            sqlListener.set(null)
          }
        })
      }

      return session
    }
  }

 
  def builder(): Builder = new Builder
  //......
}

六.總結

由以上可知,SparkSession通過SparkSession類,SparkSession靜態類和Builder內部類實現了非常方便調用的接口,可以靈活根據參數和配置文件來創建SparkSession對象

以上參考..\spark-2.2.0\sql\core\src\main\scala\org\apache\spark\sql\SparkSession.scala 的代碼,spark源碼可自行去spark官網下載,地址在下方

https://archive.apache.org/dist/spark/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM