在講解 createOrReplaceTempView 和createGlobalTempView的區別前,先了解下Spark Application 和 Spark Session區別
Spark Application
Spark Application 使用:
- 針對單個批處理作業
- 多個job通過session交互式
- 不斷滿足請求的,長期存在的server
- 一個Spark job 可以包含多個map和reduce
- Spark Application 可以包含多個session實例
Spark Session
SparkSession與Spark應用程序相關聯:
- session 是兩個或更多實體之間的交互媒介
- 在Spark 2.0中,你可以使用SparkSession創建
- 可以在不創建SparkConf,SparkContext或SQLContext的情況下創建SparkSession(它們封裝在SparkSession中)
createOrReplaceTempView使用
createOrReplaceTempView:創建臨時視圖,此視圖的生命周期與用於創建此數據集的[SparkSession]相關聯。
createGlobalTempView:創建全局臨時視圖,此時圖的生命周期與Spark Application綁定。
df.createOrReplaceTempView("tempViewName") df.createGlobalTempView("tempViewName")
createOrReplaceTempView(): 創建或替換本地臨時視圖。
此視圖的生命周期依賴於SparkSession類,如果想drop此視圖可采用dropTempView刪除
spark.catalog.dropTempView("tempViewName")
或者 stop() 來停掉 session
self.ss = SparkSession(sc)
...
self.ss.stop()
createGlobalTempView使用
createGlobalTempView():創建全局臨時視圖。
這種視圖的生命周期取決於spark application本身。如果想drop此視圖可采用dropGlobalTempView刪除
spark.catalog.dropGlobalTempView("tempViewName")
或者stop() 將停止
ss = SparkContext(conf=conf, ......)
...
ss.stop()
注:Spark 2.1.0版本中引入了Global temporary views 。
當您希望在不同sessions 之間共享數據並保持活動直到application結束時,此功能非常有用。
為了說明createTempView和createGlobalTempView的用法,展現實例如下:
object NewSessionApp { def main(args: Array[String]): Unit = { val logFile = "data/README.md" // Should be some file on your system val spark = SparkSession. builder. appName("Simple Application"). master("local"). getOrCreate() val logData = spark.read.textFile(logFile).cache() logData.createGlobalTempView("logdata") spark.range(1).createTempView("foo") // within the same session the foo table exists println("""spark.catalog.tableExists("foo") = """ + spark.catalog.tableExists("foo")) //spark.catalog.tableExists("foo") = true // for a new session the foo table does not exists val newSpark = spark.newSession println("""newSpark.catalog.tableExists("foo") = """ + newSpark.catalog.tableExists("foo")) //newSpark.catalog.tableExists("foo") = false //both session can access the logdata table spark.sql("SELECT * FROM global_temp.logdata").show() newSpark.sql("SELECT * FROM global_temp.logdata").show() spark.stop() } }