【sparkSQL】SparkSession的認識


https://www.cnblogs.com/zzhangyuhang/p/9039695.html

 https://www.jianshu.com/p/dea6a78b9dff

 

在Spark1.6中我們使用的叫Hive on spark,主要是依賴hive生成spark程序,有兩個核心組件SQLcontext和HiveContext。

這是Spark 1.x 版本的語法

1
2
3
4
5
//set up the spark configuration and create contexts
  val sparkConf =  new  SparkConf().setAppName( "SparkSessionZipsExample" ).setMaster( "local" )
  // your handle to SparkContext to access other context like SQLContext
  val sc =  new  SparkContext(sparkConf).set( "spark.some.config.option" "some-value" )
  val sqlContext =  new  org.apache.spark.sql.SQLContext(sc)

而Spark2.0中我們使用的就是sparkSQL,是后繼的全新產品,解除了對Hive的依賴。

從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext

來實現對數據的加載、轉換、處理等工作,並且實現了SQLcontext和HiveContext的所有功能。

我們在新版本中並不需要之前那么繁瑣的創建很多對象,只需要創建一個SparkSession對象即可。

SparkSession支持從不同的數據源加載數據,並把數據轉換成DataFrame,並支持把DataFrame轉換成SQLContext自身中的表。

然后使用SQL語句來操作數據,也提供了HiveQL以及其他依賴於Hive的功能支持。

創建SparkSession

SparkSession 是 Spark SQL 的入口。

使用 Dataset 或者 Datafram 編寫 Spark SQL 應用的時候,第一個要創建的對象就是 SparkSession。

Builder 是 SparkSession 的構造器。 通過 Builder, 可以添加各種配置。

Builder 的方法如下:

Method Description
getOrCreate 獲取或者新建一個 sparkSession
enableHiveSupport 增加支持 hive Support
appName 設置 application 的名字
config 設置各種配置

你可以通過 SparkSession.builder 來創建一個 SparkSession 的實例,並通過 stop 函數來停止 SparkSession。

1
2
3
4
5
6
7
import  org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
   .appName( "My Spark Application" )   // optional and will be autogenerated if not specified
   .master( "local[*]" )                // avoid hardcoding the deployment environment
   .enableHiveSupport()               // self-explanatory, isn't it?
   .config( "spark.sql.warehouse.dir" "target/spark-warehouse" )
   .getOrCreate

這樣我就就可以使用我們創建的SparkSession類型的spark對象了。

2.在SparkSession這個類中,有builder,通過builder去構建SparkSession實例,用法如下。

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("spark://hadoopmaste:

7077").appName("test").config("spark.xxxx.conf", "some-value")

.getOrCreate()

master用於指定spark集群地址

appName用於設置app的名稱

config中以key,value的形式進行一些配置

config可以以鏈式編程的方式多次調用,每次調用可設置一組key,value配置。而且conf中還可以傳入一個關鍵字參數conf,指定外部的SparkConf配置對象getOrCreate,若存在sparksession實例直接返回,否則實例化一個sparksession返回

設置參數

創建SparkSession之后可以通過 spark.conf.set 來設置運行參數

1
2
3
4
5
//set new runtime options
  spark.conf.set( "spark.sql.shuffle.partitions" 6 )
  spark.conf.set( "spark.executor.memory" "2g" )
  //get all settings
  val configMap:Map[String, String] = spark.conf.getAll() //可以使用Scala的迭代器來讀取configMap中的數據。

讀取元數據

如果需要讀取元數據(catalog),可以通過SparkSession來獲取。

1
2
3
//fetch metadata data from the catalog
  spark.catalog.listDatabases.show( false )
  spark.catalog.listTables.show( false )

這里返回的都是Dataset,所以可以根據需要再使用Dataset API來讀取。

注意:catalog 和 schema 是兩個不同的概念

Catalog是目錄的意思,從數據庫方向說,相當於就是所有數據庫的集合;

Schema是模式的意思, 從數據庫方向說, 類似Catelog下的某一個數據庫;

創建Dataset和Dataframe

通過SparkSession來創建Dataset和Dataframe有多種方法。

最簡單的就是通過range()方法來創建dataset,通過createDataFrame()來創建dataframe。

1
2
3
4
5
6
7
8
9
10
11
12
//create a Dataset using spark.range starting from 5 to 100, with increments of 5
val numDS = spark.range( 5 100 5 ) //創建dataset
// reverse the order and display first 5 items
numDS.orderBy(desc( "id" )).show( 5 )
//compute descriptive stats and display them
numDs.describe().show()
// create a DataFrame using spark.createDataFrame from a List or Seq
val langPercentDF = spark.createDataFrame(List(( "Scala" 35 ), ( "Python" 30 ), ( "R" 15 ), ( "Java" 20 ))) //創建dataframe
//rename the columns
val lpDF = langPercentDF.withColumnRenamed( "_1" "language" ).withColumnRenamed( "_2" "percent" )
//order the DataFrame in descending order of percentage
lpDF.orderBy(desc( "percent" )).show( false )

 

讀取數據

可以用SparkSession讀取JSON、CSV、TXT和parquet表。

1
2
3
4
import  spark.implicits  //使RDD轉化為DataFrame以及后續SQL操作
//讀取JSON文件,生成DataFrame
val jsonFile = args( 0 )
val zipsDF = spark.read.json(jsonFile)

使用SparkSQL

借助SparkSession用戶可以像SQLContext一樣使用Spark SQL的全部功能。

1
2
3
4
zipsDF.createOrReplaceTempView( "zips_table" ) //對上面的dataframe創建一個表
zipsDF.cache() //緩存表
val resultsDF = spark.sql( "SELECT city, pop, state, zip FROM zips_table" ) //對表調用SQL語句
resultsDF.show( 10 ) //展示結果

存儲/讀取Hive表 

下面的代碼演示了通過SparkSession來創建Hive表並進行查詢的方法。

1
2
3
4
5
6
7
//drop the table if exists to get around existing table error
  spark.sql( "DROP TABLE IF EXISTS zips_hive_table" )
  //save as a hive table
  spark.table( "zips_table" ).write.saveAsTable( "zips_hive_table" )
  //make a similar query against the hive table
  val resultsHiveDF = spark.sql( "SELECT city, pop, state, zip FROM zips_hive_table WHERE pop > 40000" )
  resultsHiveDF.show( 10 )

下圖是 SparkSession 的類和方法, 這些方法包含了創建 DataSet, DataFrame, Streaming 等等。

Method Description
builder "Opens" a builder to get or create a SparkSession instance
version Returns the current version of Spark.
implicits Use import spark.implicits._ to import the implicits conversions and create Datasets from (almost arbitrary) Scala objects.
emptyDataset[T] Creates an empty Dataset[T].
range Creates a Dataset[Long].
sql Executes a SQL query (and returns a DataFrame).
udf Access to user-defined functions (UDFs).
table Creates a DataFrame from a table.
catalog Access to the catalog of the entities of structured queries
read Access to DataFrameReader to read a DataFrame from external files and storage systems.
conf Access to the current runtime configuration.
readStream Access to DataStreamReader to read streaming datasets.
streams Access to StreamingQueryManager to manage structured streaming queries.
newSession Creates a new SparkSession.
stop Stops the SparkSession.

當我們使用Spark-Shell的時候,Spark會自動幫助我們建立好了一個名字為spark的SparkSesson和一個名字為sc的SparkContext。

 

開發實踐

1. 讀取mysql表數據

import com.test.spark.db.ConnectionInfos; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Arrays; public class SparkSimple01 { public static void main(String[] args) { // 創建spark會話,實質上是SQLContext和HiveContext的組合 SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate(); // 設置日志級別,默認會打印DAG,TASK執行日志,設置為WARN之后可以只關注應用相關日志 sparkSession.sparkContext().setLogLevel("WARN"); // 分區方式讀取mysql表數據 Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", (String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties()); predicateSet.show(); } } 

為了確認該查詢對mysql發出的具體sql,我們先查看一下mysql執行sql日志,

#mysql 命令窗口執行以下命令打開日志記錄 SHOW VARIABLES LIKE "general_log%"; SET GLOBAL general_log = 'ON'; 
 
mysql log.png

打開Lenovo.log得到以上代碼在mysql上的執行情況:


 
分區執行sql

通過分區查詢獲取表數據的方式有以下幾個優點:

  • 利用表索引查詢提高查詢效率
  • 自定義sql條件使分區數據更加均勻,方便后面的並行計算
  • 分區並發讀取可以通過控制並發控制對mysql的查詢壓力
  • 可以讀取大數據量的mysql表

spark jdbc 讀取msyql表還有直接讀取(無法讀取大數據量表),指定字段分區讀取(分區不夠均勻)等方式,通過項目實踐總結,以上的分區讀取方式是我們目前認為對mysql最友好的方式。
分庫分表的系統也可以利用這種方式讀取各個表在內存中union所有spark view得到一張統一的內存表,在業務操作中將分庫分表透明化。如果線上數據表數據量較大的時候,在union之前就需要將spark view通過指定字段的方式查詢,避免on line ddl 在做變更時union表報錯,因為可能存在部分表已經添加新字段,部分表還未加上新字段,而union要求所有表的表結構一致,導致報錯。

2. Dataset 分區數據查看

我們都知道 Dataset 的分區是否均勻,對於結果集的並行處理效果有很重要的作用,spark Java版暫時無法查看partition分區中的數據分布,這里用java調用scala 版api方式查看,線上不推薦使用,因為這里的分區查看使用foreachPartition,多了一次action操作,並且打印出全部數據。

import org.apache.spark.sql.{Dataset, Row} /** * Created by lesly.lai on 2017/12/25. */ class SparkRddTaskInfo { def getTask(dataSet: Dataset[Row]) { val size = dataSet.rdd.partitions.length println(s"==> partition size: $size " ) import scala.collection.Iterator val showElements = (it: Iterator[Row]) => { val ns = it.toSeq import org.apache.spark.TaskContext val pid = TaskContext.get.partitionId println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}") } dataSet.foreachPartition(showElements) } } 

還是用上面讀取mysql數據的例子來演示調用,將predicateSet作為參數傳入

new SparkRddTaskInfo().getTask(predicateSet); 

控制台打印結果


 
分區結果.png

通過分區數據,我們可以看到之前的predicate 方式得到的分區數就是predicate size 大小,並且按照我們想要的數據分區方式分布數據,這對於業務數據的批處理,executor的local cache,spark job執行參數調優都很有幫助,例如調整spark.executor.cores,spark.executor.memory,GC方式等等。
這里涉及java和Scala容器轉換的問題,Scala和Java容器庫有很多相似點,例如,他們都包含迭代器、可迭代結構、集合、 映射和序列。但是他們有一個重要的區別。Scala的容器庫特別強調不可變性,因此提供了大量的新方法將一個容器變換成一個新的容器。
在Scala內部,這些轉換是通過一系列“包裝”對象完成的,這些對象會將相應的方法調用轉發至底層的容器對象。所以容器不會在Java和Scala之間拷貝來拷貝去。一個值得注意的特性是,如果你將一個Java容器轉換成其對應的Scala容器,然后再將其轉換回同樣的Java容器,最終得到的是一個和一開始完全相同的容器對象(這里的相同意味着這兩個對象實際上是指向同一片內存區域的引用,容器轉換過程中沒有任何的拷貝發生)。

3. sql 自定義函數

自定義函數,可以簡單方便的實現業務邏輯。

import com.tes.spark.db.ConnectionInfos; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; public class SparkSimple02 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate(); sparkSession.sparkContext().setLogLevel("WARN"); Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties()); originSet.cache().createOrReplaceTempView("people"); // action操作 打印原始結果集 originSet.show(); // 注冊自定義函數 sparkSession.sqlContext().udf().register("genderUdf", gender -> { if("M".equals(gender)){ return "男"; }else if("F".equals(gender)){ return "女"; } return "未知"; }, DataTypes.StringType); // 查詢結果 Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people "); // action操作 打印函數處理后結果集 peopleDs.show(); } } 

執行結果:


 
image.png

在sql中用使用java代碼實現邏輯操作,這為sql的處理邏輯能力提升了好幾個層次,將函數抽取成接口實現類可以方便的管理和維護這類自定義函數類。此外,spark也支持自定義內聚函數,窗口函數等等方式,相比傳統開發實現的功能方式,使用spark sql開發效率可以明顯提高。

4. mysql 查詢連接復用

最近線上任務遇到一個獲取mysql connection blocked的問題,從spark ui的executor thread dump 可以看到blocked的棧信息,如圖:


 
connection blocked.png

查看代碼發現DBConnectionManager 調用了 spark driver注冊mysql driver 使用同步方式的代碼


 
driverRegister.png

看到這里我們很容易覺得是注冊driver 導致的blocked,其實再仔細看回報錯棧信息,我們會發現,這里的getConnection是在dataset 的foreachpartition 中調用,並且是在每次db 操作時獲取一次getConnection 操作,這意味着在該分區下有多次重復的在同步方法中注冊driver獲取連接的操作,看到這里線程blocked的原因就很明顯了,這里我們的解決方式是:
a. 在同個partition中的connection 復用進行db操作
b. 為了避免partition數據分布不均導致連接active時間過長,加上定時釋放連接再從連接池重新獲取連接操作
通過以上的連接處理,解決了blocked問題,tps也達到了4w左右。

5. executor 並發控制

我們都知道,利用spark 集群分區並行能力,可以很容易實現較高的並發處理能力,如果是並發的批處理,那並行處理的能力可以更好,但是,mysql 在面對這么高的並發的時候,是有點吃不消的,因此我們需要適當降低spark 應用的並發和上下游系統和平相處。控制spark job並發可以通過很多參數配置組合、集群資源、yarn隊列限制等方式實現,經過實踐,我們選擇以下參數實現:

#需要關閉動態內存分配,其他配置才生效 spark.dynamicAllocation.enabled = false spark.executor.instances = 2 spark.executor.cores = 2 
 
image.png

這里發現除了設置executor配置之外,還需要關閉spark的動態executor分配機制,spark 的ExecutorAllocationManager 是 一個根據工作負載動態分配和刪除 executors 的管家, ExecutorAllocationManager 維持一個動態調整的目標executors數目, 並且定期同步到資源管理者,也就是 yarn ,啟動的時候根據配置設置一個目標executors數目, spark 運行過程中會根據等待(pending)和正在運行(running)的tasks數目動態調整目標executors數目,因此需要關閉動態配置資源才能達到控制並發的效果。

除了executor是動態分配之外,Spark 1.6 之后引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,可以動態占用對方的空閑區域,我們先看看worker中的內存規划是怎樣的:


 
worker memory schedule.png

worker 可以根據實例配置,內存配置,cores配置動態生成executor數量,每一個executor為一個jvm進程,因此executor 的內存管理是建立在jvm的內存管理之上的。從本文第一張spark on yarn圖片可以看到,yarn模式的 executor 是在yarn container 中運行,因此container的內存分配大小同樣可以控制executor的數量。
RDD 的每個 Partition 經過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID ),從上圖可以看出,開發過程中常用的分區(partition)數據是以block的方式存儲在堆內的storage內存區域的,還有為了減少網絡io而做的broadcast數據也存儲在storage區域;堆內的另一個區域內存則主要用於緩存rdd shuffle產生的中間數據;此外,worker 中的多個executor還共享同一個節點上的堆外內存,這部分內存主要存儲經序列化后的二進制數據,使用的是系統的內存,可以減少不必要的開銷以及頻繁的GC掃描和回收。

為了更好的理解executor的內存分配,我們再來看一下executor各個內存塊的參數設置:


 
executor jvm

 
off-heap.png

了解spark 內存管理的機制后,就可以根據mysql的處理能力來設置executor的並發處理能力,讓我們的spark 應用處理能力收放自如。調整executor數量還有另外一個好處,就是集群資源規划,目前我們的集群隊列是yarn fair 模式,


 
yarn fair 集群模式.png

先看看yarn fair模式,舉個例子,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啟動一個job而B沒有任務時,A會獲得全部集群資源;當B啟動一個job后,A的job會繼續運行,當A的job執行完釋放資源后,不過一會兒之后兩個任務會各自獲得一半的集群資源。如果此時B再啟動第二個job並且其它job還在運行,則它將會和B的第一個job共享B這個隊列的資源,也就是B的兩個job會用於四分之一的集群資源,而A的job仍然用於集群一半的資源,結果就是資源最終在兩個用戶之間平等的共享。

在這種情況下,即使有多個隊列執行任務,fair模式容易在資源空閑時占用其他隊列資源,一旦占用時間過長,就會導致其他任務都卡住,這也是我們遇到的實際問題。如果我們在一開始能評估任務所用的資源,就可以在yarn隊列的基礎上指定應用的資源,例如executor的內存,cpu,實例個數,並行task數量等等參數來管理集群資源,這有點類似於yarn Capacity Scheduler 隊列模式,但又比它有優勢,因為spark 應用可以通過spark context的配置來動態的設置,不用在配置yarn 隊列后重啟集群,稍微靈活了一點。

除了以上提到的幾點總結,我們還遇到很多其他的疑問和實踐,例如,什么時候出現shuffle;如何比較好避開或者利用shuffle;Dataset 的cache操作會不會有性能問題,如何從spark ui中分析定位問題;spark 任務異常處理等等,暫時到這里,待續...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM