【sparkSQL】SparkSession的認識


在Spark1.6中我們使用的叫Hive on spark,主要是依賴hive生成spark程序,有兩個核心組件SQLcontext和HiveContext。

這是Spark 1.x 版本的語法

//set up the spark configuration and create contexts
 val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
 // your handle to SparkContext to access other context like SQLContext
 val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

而Spark2.0中我們使用的就是sparkSQL,是后繼的全新產品,解除了對Hive的依賴。

從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext

來實現對數據的加載、轉換、處理等工作,並且實現了SQLcontext和HiveContext的所有功能。

我們在新版本中並不需要之前那么繁瑣的創建很多對象,只需要創建一個SparkSession對象即可。

SparkSession支持從不同的數據源加載數據,並把數據轉換成DataFrame,並支持把DataFrame轉換成SQLContext自身中的表。

然后使用SQL語句來操作數據,也提供了HiveQL以及其他依賴於Hive的功能支持。

創建SparkSession

SparkSession 是 Spark SQL 的入口。

使用 Dataset 或者 Datafram 編寫 Spark SQL 應用的時候,第一個要創建的對象就是 SparkSession。

Builder 是 SparkSession 的構造器。 通過 Builder, 可以添加各種配置。

Builder 的方法如下:

Method Description
getOrCreate 獲取或者新建一個 sparkSession
enableHiveSupport 增加支持 hive Support
appName 設置 application 的名字
config 設置各種配置

你可以通過 SparkSession.builder 來創建一個 SparkSession 的實例,並通過 stop 函數來停止 SparkSession。

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // avoid hardcoding the deployment environment
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir", "target/spark-warehouse")
  .getOrCreate

這樣我就就可以使用我們創建的SparkSession類型的spark對象了。

設置參數

創建SparkSession之后可以通過 spark.conf.set 來設置運行參數

//set new runtime options
 spark.conf.set("spark.sql.shuffle.partitions", 6)
 spark.conf.set("spark.executor.memory", "2g")
 //get all settings
 val configMap:Map[String, String] = spark.conf.getAll()//可以使用Scala的迭代器來讀取configMap中的數據。

讀取元數據

如果需要讀取元數據(catalog),可以通過SparkSession來獲取。

//fetch metadata data from the catalog
 spark.catalog.listDatabases.show(false)
 spark.catalog.listTables.show(false)

這里返回的都是Dataset,所以可以根據需要再使用Dataset API來讀取。

注意:catalog 和 schema 是兩個不同的概念

Catalog是目錄的意思,從數據庫方向說,相當於就是所有數據庫的集合;

Schema是模式的意思, 從數據庫方向說, 類似Catelog下的某一個數據庫;

創建Dataset和Dataframe

通過SparkSession來創建Dataset和Dataframe有多種方法。

最簡單的就是通過range()方法來創建dataset,通過createDataFrame()來創建dataframe。

 //create a Dataset using spark.range starting from 5 to 100, with increments of 5
 val numDS = spark.range(5, 100, 5)//創建dataset
 // reverse the order and display first 5 items
 numDS.orderBy(desc("id")).show(5)
 //compute descriptive stats and display them
 numDs.describe().show()
 // create a DataFrame using spark.createDataFrame from a List or Seq
 val langPercentDF = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20)))//創建dataframe
 //rename the columns
 val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
 //order the DataFrame in descending order of percentage
 lpDF.orderBy(desc("percent")).show(false)

 

讀取數據

可以用SparkSession讀取JSON、CSV、TXT和parquet表。

import spark.implicits //使RDD轉化為DataFrame以及后續SQL操作
//讀取JSON文件,生成DataFrame
val jsonFile = args(0)
val zipsDF = spark.read.json(jsonFile)

使用SparkSQL

借助SparkSession用戶可以像SQLContext一樣使用Spark SQL的全部功能。

zipsDF.createOrReplaceTempView("zips_table")//對上面的dataframe創建一個表
zipsDF.cache()//緩存表
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM zips_table")//對表調用SQL語句
resultsDF.show(10)//展示結果

存儲/讀取Hive表 

下面的代碼演示了通過SparkSession來創建Hive表並進行查詢的方法。

//drop the table if exists to get around existing table error
 spark.sql("DROP TABLE IF EXISTS zips_hive_table")
 //save as a hive table
 spark.table("zips_table").write.saveAsTable("zips_hive_table")
 //make a similar query against the hive table
 val resultsHiveDF = spark.sql("SELECT city, pop, state, zip FROM zips_hive_table WHERE pop > 40000")
 resultsHiveDF.show(10)

下圖是 SparkSession 的類和方法, 這些方法包含了創建 DataSet, DataFrame, Streaming 等等。

Method Description
builder "Opens" a builder to get or create a SparkSession instance
version Returns the current version of Spark.
implicits Use import spark.implicits._ to import the implicits conversions and create Datasets from (almost arbitrary) Scala objects.
emptyDataset[T] Creates an empty Dataset[T].
range Creates a Dataset[Long].
sql Executes a SQL query (and returns a DataFrame).
udf Access to user-defined functions (UDFs).
table Creates a DataFrame from a table.
catalog Access to the catalog of the entities of structured queries
read Access to DataFrameReader to read a DataFrame from external files and storage systems.
conf Access to the current runtime configuration.
readStream Access to DataStreamReader to read streaming datasets.
streams Access to StreamingQueryManager to manage structured streaming queries.
newSession Creates a new SparkSession.
stop Stops the SparkSession.

當我們使用Spark-Shell的時候,Spark會自動幫助我們建立好了一個名字為spark的SparkSesson和一個名字為sc的SparkContext。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM