一:SparkSql異常處理
將類轉換為DF
實際開發過程中有很多需要將一個數字或者匯聚出來的數據轉換為DF的需求
這時候可以將數字或者數據轉換成一個類,將類轉換為DF
val data = scala.collection.mutable.MutableList[Data]()
data.+=(Data("a","b"))
import sparkSession.implicits._
data.toDF().show(100)
讀JSON文件異常處理
val sparkSession= SparkSession.builder().master("local").getOrCreate()
var df2 = sparkSession.emptyDataFrame
try {
df2 = sparkSession.read.json("/JAVA/data/")
} catch {
case e: Exception => {
println("error info")
}
}
df2.show(100)
讀CSV文件異常處理
val sparkSession= SparkSession.builder().master("local").getOrCreate()
var df2 = sparkSession.emptyDataFrame
try {
df2 = sparkSession.read.option("sep", "|").csv("/JAVA/data/")
.toDF("name","sex")
} catch {
case e: Exception => {
println("error info")
}
}
df2.show(100)
讀TEXT文件異常處理。
個人理解CSV和TEXT一樣,直接csv即可。還有一個原因是TEXT需要手動的去切分字符串作為一個列,使用起來太不方便了。還不如直接使用CSV
寫文件異常
val sparkSession= SparkSession.builder().master("local").getOrCreate()
var df = sparkSession.emptyDataFrame
df = sparkSession.read.option("sep", "|").csv("/JAVA/data")
. toDF("name","sex")
df.write.mode(SaveMode.Overwrite).option("sep", "|").csv("/JAVA/data1")
SaveMode.Overwrite:覆蓋式寫文件,沒有文件夾會創建文件夾
SaveMode.Append:添加式寫文件,沒有文件夾會報錯,建議使用SaveMode.Overwrite
數據異常填充
進行真正開發的時候,經常join導致有一些空值(NULL),有時候產品需要將空值轉換為一些特殊處理值:
val sparkSession= SparkSession.builder().master("local").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(Arrays.asList(
"{'name':'','age':''}",
"{'name':'sunliu','age':'19','vip':'true'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.na.fill(Map("name"->"zhangsan","age"->"18","vip"->"false")).show(100)//第一個數據不是空值,是空字符串
age | name | vip |
false | ||
19 | wangwu | true |
19 | wangwu | true |
二:SparkSql優化
緩存
Spark中當一個Rdd多次使用的時候就需要進行緩存。緩存將大大的提高代碼運行效率。
val sparkSession= SparkSession.builder().master("local").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(Arrays.asList(
"{'name':'','age':''}",
"{'name':'sunliu','age':'19','vip':'true'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.persist(StorageLevel.MEMORY_AND_DISK_SER)
個人建議使用MEMORY_AND_DISK_SER,因為內存還是比較珍貴的,磁盤雖然慢但是大。
盡量不要使用MEMORY_AND_DISK_SER_2,這種后面有一個_2的,因為這是備份兩個,一般情況下是不需要備份兩個的。備份多了浪費內存。
Join策略
Spark有三種join的策略:broadcast join、Shuffle Hash Join、BroadcastHashJoin
broadcastHash join(大表和極小表):
當大表join小表的時候:將小表進行廣播到各個節點。
優點:不用進行數據shuffle,每個節點進行自己節點上數據的計算
缺點:將一個表的數據全部加載到主節點,對主節點的壓力較大。
參數:廣播的默認大小是10M可以適當將大小調整。 sparkSession.sql("set spark.sql.autoBroadcastJoinThreshold=134217728")
Shuffle Hash Join(大表和小表)
兩個表進行重新分區之后,進行兩個分區的數據遍歷。
優點:分區之后數據更小了,就全部加載到內存遍歷就行了
缺點:相對於broadcastHash join來說還是有一次shuffle
SortMergeJoin(大表和小表)
兩個表進行重新分區之后,進行兩個分區的數據遍歷,個人感覺分區前和Shuffle Hash Join沒什么區別。
缺點:分區之后數據還不能全部加載到內存,需要進行排序。將相同key的加載到內存。
執行計划
val sparkSession= SparkSession.builder().master("local").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(Arrays.asList("{'name':'wangwu','age':'18','vip':'t'}"));
val namedf = sparkSession.read.json(nameRDD)
namedf.explain()//顯示執行計划
上線提交命令示例
spark-submit
--class class
--master yarn
--executor-memory 6g //最大值取決於yarn.scheduler.maximum-allocation-mb
--driver-memory 4g //driver內存
--num-executors 4 //executors個數
--executor-cores 6 //執行的核數
--deploy-mode cluster //必須配置,默認是單節點模式
--conf spark.driver.maxResultSize=6g
Jar.jar
//executor-memory 和executor-cores的比例,應該和集群內存核數比例相同.例如集群1000G內存200核.那executor-memory除executor-cores應該是5
Apache中文文檔
http://spark.apachecn.org/#/docs/7?id=spark-sql-dataframes-and-datasets-guide