sparksql系列(五) SparkSql異常處理,優化,及查看執行計划


有了上面四篇文章,再加上一些異常處理、優化,開發基本就沒什么問題了。下面我們開始:

一:SparkSql異常處理

將類轉換為DF

實際開發過程中有很多需要將一個數字或者匯聚出來的數據轉換為DF的需求

這時候可以將數字或者數據轉換成一個類,將類轉換為DF

val data = scala.collection.mutable.MutableList[Data]()
data.+=(Data("a","b"))
import sparkSession.implicits._
data.toDF().show(100)

讀JSON文件異常處理

    val sparkSession= SparkSession.builder().master("local").getOrCreate()

    var df2 = sparkSession.emptyDataFrame
    try {
      df2 = sparkSession.read.json("/JAVA/data/")
    } catch {
      case e: Exception => {
        println("error info")
      }
    }
    df2.show(100)

讀CSV文件異常處理

    val sparkSession= SparkSession.builder().master("local").getOrCreate()

    var df2 = sparkSession.emptyDataFrame
    try {
      df2 = sparkSession.read.option("sep", "|").csv("/JAVA/data/")
        .toDF("name","sex")
    } catch {
      case e: Exception => {
        println("error info")
      }
    }
    df2.show(100)

讀TEXT文件異常處理。

    個人理解CSV和TEXT一樣,直接csv即可。還有一個原因是TEXT需要手動的去切分字符串作為一個列,使用起來太不方便了。還不如直接使用CSV

寫文件異常

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    var df = sparkSession.emptyDataFrame
    df = sparkSession.read.option("sep", "|").csv("/JAVA/data")
.      toDF("name","sex")
    df.write.mode(SaveMode.Overwrite).option("sep", "|").csv("/JAVA/data1")

 

    SaveMode.Overwrite:覆蓋式寫文件,沒有文件夾會創建文件夾

    SaveMode.Append:添加式寫文件,沒有文件夾會報錯,建議使用SaveMode.Overwrite

數據異常填充

    進行真正開發的時候,經常join導致有一些空值(NULL),有時候產品需要將空值轉換為一些特殊處理值:

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList(
      "{'name':'','age':''}",
      "{'name':'sunliu','age':'19','vip':'true'}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf.na.fill(Map("name"->"zhangsan","age"->"18","vip"->"false")).show(100)//第一個數據不是空值,是空字符串

age name vip
    false
19 wangwu true
19 wangwu true

二:SparkSql優化

緩存

                     Spark中當一個Rdd多次使用的時候就需要進行緩存。緩存將大大的提高代碼運行效率。

      val sparkSession= SparkSession.builder().master("local").getOrCreate()
      val javasc = new JavaSparkContext(sparkSession.sparkContext)

      val nameRDD = javasc.parallelize(Arrays.asList(
        "{'name':'','age':''}",
        "{'name':'sunliu','age':'19','vip':'true'}"));
      val namedf = sparkSession.read.json(nameRDD)
      namedf.persist(StorageLevel.MEMORY_AND_DISK_SER)     

      個人建議使用MEMORY_AND_DISK_SER,因為內存還是比較珍貴的,磁盤雖然慢但是大。

      盡量不要使用MEMORY_AND_DISK_SER_2,這種后面有一個_2的,因為這是備份兩個,一般情況下是不需要備份兩個的。備份多了浪費內存。

Join策略

    Spark有三種join的策略:broadcast join、Shuffle Hash Join、BroadcastHashJoin

    broadcastHash join(大表和極小表):

      當大表join小表的時候:將小表進行廣播到各個節點。

      優點:不用進行數據shuffle,每個節點進行自己節點上數據的計算

      缺點:將一個表的數據全部加載到主節點,對主節點的壓力較大。

      參數:廣播的默認大小是10M可以適當將大小調整。 sparkSession.sql("set spark.sql.autoBroadcastJoinThreshold=134217728")

    Shuffle Hash Join(大表和小表)

      兩個表進行重新分區之后,進行兩個分區的數據遍歷。

      優點:分區之后數據更小了,就全部加載到內存遍歷就行了

      缺點:相對於broadcastHash join來說還是有一次shuffle

    SortMergeJoin(大表和小表)

      兩個表進行重新分區之后,進行兩個分區的數據遍歷,個人感覺分區前和Shuffle Hash Join沒什么區別。

      缺點:分區之后數據還不能全部加載到內存,需要進行排序。將相同key的加載到內存。

執行計划

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList("{'name':'wangwu','age':'18','vip':'t'}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf.explain()//顯示執行計划

上線提交命令示例

    spark-submit
    --class class
    --master yarn                                      
    --executor-memory 6g                     //最大值取決於yarn.scheduler.maximum-allocation-mb
    --driver-memory 4g                           //driver內存
    --num-executors 4                            //executors個數
    --executor-cores 6                            //執行的核數
    --deploy-mode cluster                      //必須配置,默認是單節點模式
    --conf spark.driver.maxResultSize=6g
    Jar.jar

               //executor-memory 和executor-cores的比例,應該和集群內存核數比例相同.例如集群1000G內存200核.那executor-memory除executor-cores應該是5

 

 

 

Apache中文文檔

                http://spark.apachecn.org/#/docs/7?id=spark-sql-dataframes-and-datasets-guide

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM