https://www.cnblogs.com/zzhangyuhang/p/9039695.html
https://www.jianshu.com/p/dea6a78b9dff
在Spark1.6中我們使用的叫Hive on spark,主要是依賴hive生成spark程序,有兩個核心組件SQLcontext和HiveContext。
這是Spark 1.x 版本的語法
1
2
3
4
5
|
//set up the spark configuration and create contexts
val sparkConf =
new
SparkConf().setAppName(
"SparkSessionZipsExample"
).setMaster(
"local"
)
// your handle to SparkContext to access other context like SQLContext
val sc =
new
SparkContext(sparkConf).set(
"spark.some.config.option"
,
"some-value"
)
val sqlContext =
new
org.apache.spark.sql.SQLContext(sc)
|
而Spark2.0中我們使用的就是sparkSQL,是后繼的全新產品,解除了對Hive的依賴。
從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext
來實現對數據的加載、轉換、處理等工作,並且實現了SQLcontext和HiveContext的所有功能。
我們在新版本中並不需要之前那么繁瑣的創建很多對象,只需要創建一個SparkSession對象即可。
SparkSession支持從不同的數據源加載數據,並把數據轉換成DataFrame,並支持把DataFrame轉換成SQLContext自身中的表。
然后使用SQL語句來操作數據,也提供了HiveQL以及其他依賴於Hive的功能支持。
創建SparkSession
SparkSession 是 Spark SQL 的入口。
使用 Dataset 或者 Datafram 編寫 Spark SQL 應用的時候,第一個要創建的對象就是 SparkSession。
Builder 是 SparkSession 的構造器。 通過 Builder, 可以添加各種配置。
Builder 的方法如下:
Method | Description |
---|---|
getOrCreate | 獲取或者新建一個 sparkSession |
enableHiveSupport | 增加支持 hive Support |
appName | 設置 application 的名字 |
config | 設置各種配置 |
你可以通過 SparkSession.builder 來創建一個 SparkSession 的實例,並通過 stop 函數來停止 SparkSession。
1
2
3
4
5
6
7
|
import
org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName(
"My Spark Application"
)
// optional and will be autogenerated if not specified
.master(
"local[*]"
)
// avoid hardcoding the deployment environment
.enableHiveSupport()
// self-explanatory, isn't it?
.config(
"spark.sql.warehouse.dir"
,
"target/spark-warehouse"
)
.getOrCreate
|
這樣我就就可以使用我們創建的SparkSession類型的spark對象了。
2.在SparkSession這個類中,有builder,通過builder去構建SparkSession實例,用法如下。
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://hadoopmaste:
7077").appName("test").config("spark.xxxx.conf", "some-value")
.getOrCreate()
master用於指定spark集群地址
appName用於設置app的名稱
config中以key,value的形式進行一些配置
config可以以鏈式編程的方式多次調用,每次調用可設置一組key,value配置。而且conf中還可以傳入一個關鍵字參數conf,指定外部的SparkConf配置對象getOrCreate,若存在sparksession實例直接返回,否則實例化一個sparksession返回
設置參數
創建SparkSession之后可以通過 spark.conf.set 來設置運行參數
1
2
3
4
5
|
//set new runtime options
spark.conf.set(
"spark.sql.shuffle.partitions"
,
6
)
spark.conf.set(
"spark.executor.memory"
,
"2g"
)
//get all settings
val configMap:Map[String, String] = spark.conf.getAll()
//可以使用Scala的迭代器來讀取configMap中的數據。
|
讀取元數據
如果需要讀取元數據(catalog),可以通過SparkSession來獲取。
1
2
3
|
//fetch metadata data from the catalog
spark.catalog.listDatabases.show(
false
)
spark.catalog.listTables.show(
false
)
|
這里返回的都是Dataset,所以可以根據需要再使用Dataset API來讀取。
注意:catalog 和 schema 是兩個不同的概念
Catalog是目錄的意思,從數據庫方向說,相當於就是所有數據庫的集合;
Schema是模式的意思, 從數據庫方向說, 類似Catelog下的某一個數據庫;
創建Dataset和Dataframe
通過SparkSession來創建Dataset和Dataframe有多種方法。
最簡單的就是通過range()方法來創建dataset,通過createDataFrame()來創建dataframe。
1
2
3
4
5
6
7
8
9
10
11
12
|
//create a Dataset using spark.range starting from 5 to 100, with increments of 5
val numDS = spark.range(
5
,
100
,
5
)
//創建dataset
// reverse the order and display first 5 items
numDS.orderBy(desc(
"id"
)).show(
5
)
//compute descriptive stats and display them
numDs.describe().show()
// create a DataFrame using spark.createDataFrame from a List or Seq
val langPercentDF = spark.createDataFrame(List((
"Scala"
,
35
), (
"Python"
,
30
), (
"R"
,
15
), (
"Java"
,
20
)))
//創建dataframe
//rename the columns
val lpDF = langPercentDF.withColumnRenamed(
"_1"
,
"language"
).withColumnRenamed(
"_2"
,
"percent"
)
//order the DataFrame in descending order of percentage
lpDF.orderBy(desc(
"percent"
)).show(
false
)
|
讀取數據
可以用SparkSession讀取JSON、CSV、TXT和parquet表。
1
2
3
4
|
import
spark.implicits
//使RDD轉化為DataFrame以及后續SQL操作
//讀取JSON文件,生成DataFrame
val jsonFile = args(
0
)
val zipsDF = spark.read.json(jsonFile)
|
使用SparkSQL
借助SparkSession用戶可以像SQLContext一樣使用Spark SQL的全部功能。
1
2
3
4
|
zipsDF.createOrReplaceTempView(
"zips_table"
)
//對上面的dataframe創建一個表
zipsDF.cache()
//緩存表
val resultsDF = spark.sql(
"SELECT city, pop, state, zip FROM zips_table"
)
//對表調用SQL語句
resultsDF.show(
10
)
//展示結果
|
存儲/讀取Hive表
下面的代碼演示了通過SparkSession來創建Hive表並進行查詢的方法。
1
2
3
4
5
6
7
|
//drop the table if exists to get around existing table error
spark.sql(
"DROP TABLE IF EXISTS zips_hive_table"
)
//save as a hive table
spark.table(
"zips_table"
).write.saveAsTable(
"zips_hive_table"
)
//make a similar query against the hive table
val resultsHiveDF = spark.sql(
"SELECT city, pop, state, zip FROM zips_hive_table WHERE pop > 40000"
)
resultsHiveDF.show(
10
)
|
下圖是 SparkSession 的類和方法, 這些方法包含了創建 DataSet, DataFrame, Streaming 等等。
Method | Description |
---|---|
builder | "Opens" a builder to get or create a SparkSession instance |
version | Returns the current version of Spark. |
implicits | Use import spark.implicits._ to import the implicits conversions and create Datasets from (almost arbitrary) Scala objects. |
emptyDataset[T] | Creates an empty Dataset[T]. |
range | Creates a Dataset[Long]. |
sql | Executes a SQL query (and returns a DataFrame). |
udf | Access to user-defined functions (UDFs). |
table | Creates a DataFrame from a table. |
catalog | Access to the catalog of the entities of structured queries |
read | Access to DataFrameReader to read a DataFrame from external files and storage systems. |
conf | Access to the current runtime configuration. |
readStream | Access to DataStreamReader to read streaming datasets. |
streams | Access to StreamingQueryManager to manage structured streaming queries. |
newSession | Creates a new SparkSession. |
stop | Stops the SparkSession. |
當我們使用Spark-Shell的時候,Spark會自動幫助我們建立好了一個名字為spark的SparkSesson和一個名字為sc的SparkContext。
開發實踐
1. 讀取mysql表數據
import com.test.spark.db.ConnectionInfos; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Arrays; public class SparkSimple01 { public static void main(String[] args) { // 創建spark會話,實質上是SQLContext和HiveContext的組合 SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate(); // 設置日志級別,默認會打印DAG,TASK執行日志,設置為WARN之后可以只關注應用相關日志 sparkSession.sparkContext().setLogLevel("WARN"); // 分區方式讀取mysql表數據 Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", (String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties()); predicateSet.show(); } }
為了確認該查詢對mysql發出的具體sql,我們先查看一下mysql執行sql日志,
#mysql 命令窗口執行以下命令打開日志記錄 SHOW VARIABLES LIKE "general_log%"; SET GLOBAL general_log = 'ON';

打開Lenovo.log得到以上代碼在mysql上的執行情況:

通過分區查詢獲取表數據的方式有以下幾個優點:
- 利用表索引查詢提高查詢效率
- 自定義sql條件使分區數據更加均勻,方便后面的並行計算
- 分區並發讀取可以通過控制並發控制對mysql的查詢壓力
- 可以讀取大數據量的mysql表
spark jdbc 讀取msyql表還有直接讀取(無法讀取大數據量表),指定字段分區讀取(分區不夠均勻)等方式,通過項目實踐總結,以上的分區讀取方式是我們目前認為對mysql最友好的方式。
分庫分表的系統也可以利用這種方式讀取各個表在內存中union所有spark view得到一張統一的內存表,在業務操作中將分庫分表透明化。如果線上數據表數據量較大的時候,在union之前就需要將spark view通過指定字段的方式查詢,避免on line ddl 在做變更時union表報錯,因為可能存在部分表已經添加新字段,部分表還未加上新字段,而union要求所有表的表結構一致,導致報錯。
2. Dataset 分區數據查看
我們都知道 Dataset 的分區是否均勻,對於結果集的並行處理效果有很重要的作用,spark Java版暫時無法查看partition分區中的數據分布,這里用java調用scala 版api方式查看,線上不推薦使用,因為這里的分區查看使用foreachPartition,多了一次action操作,並且打印出全部數據。
import org.apache.spark.sql.{Dataset, Row} /** * Created by lesly.lai on 2017/12/25. */ class SparkRddTaskInfo { def getTask(dataSet: Dataset[Row]) { val size = dataSet.rdd.partitions.length println(s"==> partition size: $size " ) import scala.collection.Iterator val showElements = (it: Iterator[Row]) => { val ns = it.toSeq import org.apache.spark.TaskContext val pid = TaskContext.get.partitionId println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}") } dataSet.foreachPartition(showElements) } }
還是用上面讀取mysql數據的例子來演示調用,將predicateSet作為參數傳入
new SparkRddTaskInfo().getTask(predicateSet);
控制台打印結果

通過分區數據,我們可以看到之前的predicate 方式得到的分區數就是predicate size 大小,並且按照我們想要的數據分區方式分布數據,這對於業務數據的批處理,executor的local cache,spark job執行參數調優都很有幫助,例如調整spark.executor.cores,spark.executor.memory,GC方式等等。
這里涉及java和Scala容器轉換的問題,Scala和Java容器庫有很多相似點,例如,他們都包含迭代器、可迭代結構、集合、 映射和序列。但是他們有一個重要的區別。Scala的容器庫特別強調不可變性,因此提供了大量的新方法將一個容器變換成一個新的容器。
在Scala內部,這些轉換是通過一系列“包裝”對象完成的,這些對象會將相應的方法調用轉發至底層的容器對象。所以容器不會在Java和Scala之間拷貝來拷貝去。一個值得注意的特性是,如果你將一個Java容器轉換成其對應的Scala容器,然后再將其轉換回同樣的Java容器,最終得到的是一個和一開始完全相同的容器對象(這里的相同意味着這兩個對象實際上是指向同一片內存區域的引用,容器轉換過程中沒有任何的拷貝發生)。
3. sql 自定義函數
自定義函數,可以簡單方便的實現業務邏輯。
import com.tes.spark.db.ConnectionInfos; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; public class SparkSimple02 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate(); sparkSession.sparkContext().setLogLevel("WARN"); Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties()); originSet.cache().createOrReplaceTempView("people"); // action操作 打印原始結果集 originSet.show(); // 注冊自定義函數 sparkSession.sqlContext().udf().register("genderUdf", gender -> { if("M".equals(gender)){ return "男"; }else if("F".equals(gender)){ return "女"; } return "未知"; }, DataTypes.StringType); // 查詢結果 Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people "); // action操作 打印函數處理后結果集 peopleDs.show(); } }
執行結果:

在sql中用使用java代碼實現邏輯操作,這為sql的處理邏輯能力提升了好幾個層次,將函數抽取成接口實現類可以方便的管理和維護這類自定義函數類。此外,spark也支持自定義內聚函數,窗口函數等等方式,相比傳統開發實現的功能方式,使用spark sql開發效率可以明顯提高。
4. mysql 查詢連接復用
最近線上任務遇到一個獲取mysql connection blocked的問題,從spark ui的executor thread dump 可以看到blocked的棧信息,如圖:

查看代碼發現DBConnectionManager 調用了 spark driver注冊mysql driver 使用同步方式的代碼

看到這里我們很容易覺得是注冊driver 導致的blocked,其實再仔細看回報錯棧信息,我們會發現,這里的getConnection是在dataset 的foreachpartition 中調用,並且是在每次db 操作時獲取一次getConnection 操作,這意味着在該分區下有多次重復的在同步方法中注冊driver獲取連接的操作,看到這里線程blocked的原因就很明顯了,這里我們的解決方式是:
a. 在同個partition中的connection 復用進行db操作
b. 為了避免partition數據分布不均導致連接active時間過長,加上定時釋放連接再從連接池重新獲取連接操作
通過以上的連接處理,解決了blocked問題,tps也達到了4w左右。
5. executor 並發控制
我們都知道,利用spark 集群分區並行能力,可以很容易實現較高的並發處理能力,如果是並發的批處理,那並行處理的能力可以更好,但是,mysql 在面對這么高的並發的時候,是有點吃不消的,因此我們需要適當降低spark 應用的並發和上下游系統和平相處。控制spark job並發可以通過很多參數配置組合、集群資源、yarn隊列限制等方式實現,經過實踐,我們選擇以下參數實現:
#需要關閉動態內存分配,其他配置才生效 spark.dynamicAllocation.enabled = false spark.executor.instances = 2 spark.executor.cores = 2

這里發現除了設置executor配置之外,還需要關閉spark的動態executor分配機制,spark 的ExecutorAllocationManager 是 一個根據工作負載動態分配和刪除 executors 的管家, ExecutorAllocationManager 維持一個動態調整的目標executors數目, 並且定期同步到資源管理者,也就是 yarn ,啟動的時候根據配置設置一個目標executors數目, spark 運行過程中會根據等待(pending)和正在運行(running)的tasks數目動態調整目標executors數目,因此需要關閉動態配置資源才能達到控制並發的效果。
除了executor是動態分配之外,Spark 1.6 之后引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,可以動態占用對方的空閑區域,我們先看看worker中的內存規划是怎樣的:

worker 可以根據實例配置,內存配置,cores配置動態生成executor數量,每一個executor為一個jvm進程,因此executor 的內存管理是建立在jvm的內存管理之上的。從本文第一張spark on yarn圖片可以看到,yarn模式的 executor 是在yarn container 中運行,因此container的內存分配大小同樣可以控制executor的數量。
RDD 的每個 Partition 經過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID ),從上圖可以看出,開發過程中常用的分區(partition)數據是以block的方式存儲在堆內的storage內存區域的,還有為了減少網絡io而做的broadcast數據也存儲在storage區域;堆內的另一個區域內存則主要用於緩存rdd shuffle產生的中間數據;此外,worker 中的多個executor還共享同一個節點上的堆外內存,這部分內存主要存儲經序列化后的二進制數據,使用的是系統的內存,可以減少不必要的開銷以及頻繁的GC掃描和回收。
為了更好的理解executor的內存分配,我們再來看一下executor各個內存塊的參數設置:


了解spark 內存管理的機制后,就可以根據mysql的處理能力來設置executor的並發處理能力,讓我們的spark 應用處理能力收放自如。調整executor數量還有另外一個好處,就是集群資源規划,目前我們的集群隊列是yarn fair 模式,

先看看yarn fair模式,舉個例子,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啟動一個job而B沒有任務時,A會獲得全部集群資源;當B啟動一個job后,A的job會繼續運行,當A的job執行完釋放資源后,不過一會兒之后兩個任務會各自獲得一半的集群資源。如果此時B再啟動第二個job並且其它job還在運行,則它將會和B的第一個job共享B這個隊列的資源,也就是B的兩個job會用於四分之一的集群資源,而A的job仍然用於集群一半的資源,結果就是資源最終在兩個用戶之間平等的共享。
在這種情況下,即使有多個隊列執行任務,fair模式容易在資源空閑時占用其他隊列資源,一旦占用時間過長,就會導致其他任務都卡住,這也是我們遇到的實際問題。如果我們在一開始能評估任務所用的資源,就可以在yarn隊列的基礎上指定應用的資源,例如executor的內存,cpu,實例個數,並行task數量等等參數來管理集群資源,這有點類似於yarn Capacity Scheduler 隊列模式,但又比它有優勢,因為spark 應用可以通過spark context的配置來動態的設置,不用在配置yarn 隊列后重啟集群,稍微靈活了一點。
除了以上提到的幾點總結,我們還遇到很多其他的疑問和實踐,例如,什么時候出現shuffle;如何比較好避開或者利用shuffle;Dataset 的cache操作會不會有性能問題,如何從spark ui中分析定位問題;spark 任務異常處理等等,暫時到這里,待續...