大數據篇:Spark


大數據篇:Spark

  • Spark是什么

Spark是一個快速(基於內存),通用,可擴展的計算引擎,采用Scala語言編寫。2009年誕生於UC Berkeley(加州大學伯克利分校,CAL的AMP實驗室),2010年開源,2013年6月進入Apach孵化器,2014年成為Apach頂級項目,目前有1000+個活躍者。就是說用Spark就對了。

Spark支持Scala,Java,R,Python語言,並提供了幾十種(目前80+種)高性能的算法,這些如果讓我們自己來做,幾乎不可能。

Spark得到眾多公司支持,如:阿里、騰訊、京東、攜程、百度、優酷、土豆、IBM、Cloudera、Hortonworks等。

  • 如果沒有Spark

解決MapReduce慢的問題而誕生,官網解釋比同樣的MapReduce任務快100倍!

spark.apache.org

1 內置模塊

機器學習(MLlib),圖計算(GraphicX),實時處理(SparkStreaming),SQL解析(SparkSql)

1.1 集群資源管理

Spark設計為可以高效的在一個計算節點到數千個計算節點之間伸縮計算,為了實現這樣的要求,同時獲得最大靈活性,Spark支持在各種集群資源管理器上運行,目前支持的3種如下:(上圖中下三個)

  1. Hadoop YARN(國內幾乎都用)
  2. Apach Mesos(國外使用較多)
  3. Standalone(Spark自帶的資源調度器,需要在集群中的每台節點上配置Spark)

1.2 Spark Core

實現了Spark的基本功能,包含任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。其中還包含了對彈性分布式數據集(RDD:Resilient Distributed DataSet)的API定義

1.3 Spark SQL

是Spark用來操作結構化數據的程序包,通過Spark SQL 我們可以使用SQL或者HQL來查詢數據。且支持多種數據源:Hive、Parquet、Json等

1.4 Spark Streaming

是Spark提供的對實時數據進行流式計算的組件

1.5 Spark MLlib

提供常見的機器學習功能和程序庫,包括分類、回歸、聚類、協同過濾等。還提供了模型評估、數據導入等額外的支持功能。

2 運行模式

2.1 核心概念介紹

  • Master

    • Spark特有的資源調度系統Leader,掌控整個集群資源信息,類似於Yarn框架中的ResourceManager
    • 監聽Worker,看Worker是否正常工作
    • Master對Worker、Application等的管理(接收Worker的注冊並管理所有的Worker,接收Client提交的Application,調度等待Application並向Worker提交)
  • Worker

    • Spark特有的資源調度Slave,有多個,每個Slave掌管着所有節點的資源信息,類似Yarn框架中的NodeManager
    • 通過RegisterWorker注冊到Master
    • 定時發送心跳給Master
    • 根據Master發送的Application配置進程環境,並啟動ExecutorBackend(執行Task所需的進程)
  • Driver

    • Spark的驅動器,是執行開發程序中的main方法的線程
    • 負責開發人員編寫SparkContext、RDD,以及進行RDD操作的代碼執行,如果使用Spark Shell,那么啟動時后台自啟動了一個Spark驅動器,預加載一個叫做sc的SparkContext對象,如果驅動器終止,那么Spark應用也就結束了。
    • 4大主要職責:
      • 將用戶程序轉化為作業(Job)
      • 在Executor之間調度任務(Task)
      • 跟蹤Executor的執行情況
      • 通過UI展示查詢運行情況
  • Excutor

    • Spark Executor是一個工作節點,負責在Spark作業中運行任務,任務間相互獨立。Spark應用啟動時,Executor節點被同時啟動,並且始終伴隨着整個Spark應用的生命周期而存在,如果有Executor節點發生了故障或崩潰,Spark應用也可以繼續執行,會將出錯節點上的任務調度到其他Executor節點上繼續運行
    • 兩個核心功能:
      • 負責運行組成Spark應用的任務,並將結果返回給驅動器(Driver)
      • 它通過自身塊管理器(BlockManager)為用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接存在Executor進程內的,因此任務可以在運行時充分利用緩存數據加速運算。
  • RDDs

    • Resilient Distributed DataSet:彈性分布式數據集
    • 一旦擁有SparkContext對象,就可以用它來創建RDD
  • 通用流程圖

2.2 WordCount案例

  • Spark Shell方式
#創建word.txt文件
vim word.txt
#--->
hadoop hello spark
spark word
hello hadoop spark
#---<
#上傳HDFS集群
hadoop dfs -put word.txt /
#鏈接客戶端
spark-shell

sc.textFile("/word.txt").flatMap(line => line.split(' ')).map((_,1)).reduceByKey(_ + _).collect

每個Spark應用程序都包含一個驅動程序,驅動程序負責把並行操作發布到集群上,驅動程序包含Spark應用中的主函數,定義了分布式數據集以應用在集群中,在前面的wordcount案例中,spark-shell就是我們的驅動程序,所以我們鍵入我們任何想要的操作,然后由它負責發布,驅動程序通過SparkContext對象來訪問Spark,SparkContext對象相當於一個到Spark集群的鏈接

2.3 Job划分和調度

  • Application應用
    • 一個SparkContext就是一個Application
  • Job作業:
    • 一個行動算子(Action)就是一個Job
  • Stage階段:
    • 一次寬依賴(一次shuffle)就是一個Stage,划分是從后往前划分
  • Task任務:
    • 一個核心就是一個Task,體現任務的並行度,常常根據核心數的1.5倍進行設置

  • 使用WordCount案例分析

一個行動算子collect(),一個job

一次寬依賴shuffle算子reduceByKey(),切分成2個Stage階段

Stage階段,默認文件被切分成2份,所以有2個task

Stage階段0

Stage階段1

2.4 Shuffle洗牌

2.4.1 ShuffleMapStage And ResultStage

  • 在划分stage時,最后一個stage稱為FinalStage,本質上是一個ResultStage對象,前面所有的stage被稱為ShuffleMapStage

  • ShuffleMapStage 的結束伴隨着shuffle文件寫磁盤

  • ResultStage對應代碼中的action算子,即將一個函數應用在RDD的各個Partition(分區)的數據集上,意味着一個Job運行結束

2.4.2 HashShuffle

  • 未優化HashShuffle流程圖:目前已經沒有了

如上圖,最終結果會有12個小文件

  • 優化后HashShuffle流程圖

如上圖,最終結果會有6個小文件,比未優化前少了一半

2.4.3 SortShuffle

該模式下,數據會先寫入一個數據結果,reduceByKey寫入Map,一邊通過Map局部聚合,一邊寫入內存,

Join算子寫入ArrayList直接寫入內存中,然后需要判斷是否達到閥值,如果達到就會將內存數據寫入磁盤,釋放內存資源

2.4.4 Bypass SortShuffle

  • Bypass SortShuffle運行機制觸發條件
    • shuffle map task 數量小於 spark.shuffle.sort.bypassMargeThreshold參數的值,默認為200
    • 不是聚合類的shuffle算子

2.5 Submit語法

spark-submit \
--class <main-calss> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
...  #其他 options
<application-jar> \
[application-arguments]
  • --class:應用啟動類全類名(如:org.apache.spark.examples.SparkPi)
  • --master:指定master地址,默認本機Local(本地一般使用Local[*],集群一般使用yarn)
  • --deploy-mode:是否發布到驅動worker節點(參數:cluster),或者作為一個本地客戶端(參數:client),默認本地client
  • --conf:任意Spark配置屬性,格式key=value,如包含空格,可以加引號"key=value"
  • application-jar:打包好的應用程序jar,包含依賴,這個URL在集群中全局課件,如HDFS上的jar->hdfs://path;如linux上的jar->file://path 且所有節點路徑都需要包含這個jar
  • application-arguments:給main()方法傳參數
  • --executor-memory 1G:指定每個executor可用內存為1G
  • --total-executor-cores 6:指定所有executor使用的cpu核數為6個
  • --executor-cores 2:表示每個executor使用的cpu的核數2個

2.6 Local模式

Local模式就是在一台計算機上運行Spark,通常用於開發中。(單機)

  • Submit提交方式
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[*] \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100

2.7 Standalone模式

構建一個由 Master + Slave 構成的Spark集群,Spark運行在集群中,只依賴Spark,不依賴別的組件(如:Yarn)。(獨立的Spark集群)

#鏈接客戶端
spark-shell --master spark://cdh01.cm:7337

參考wordCount案例

  • Standalone-Client流程圖

  • Standalone-Cluster流程圖

2.8 Yarn模式

Spark客戶端可以直接連接Yarn,不需要構建Spark集群。

有yarn-client和yarn-cluster兩種模式,主要區別在:Driver程序的運行節點不同。

yarn-client:Driver程序運行在客戶端,適用於交互、調試,希望立即看見APP輸出

yarn-cluster:Driver程序運行在由ResourceManager啟動的ApplicationMaster上,適用於生產環境

  • Yarn-Client流程圖

  • Yarn-Cluster流程圖

  • 客戶端模式:Driver是在Client端,日志結果可以直接在后台看見
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100
  • 集群模式:Driver是在NodeManager端,日志結果需要通過監控日志查看
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100    

3 使用IDEA開發Spark

  • pom.xml
    <dependencies>
        <!-- scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>
        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark SQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark On Hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Hbase On Spark-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>2.1.0-cdh6.2.0</version>
        </dependency>
        <!-- Spark Streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark Streaming Kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-tools</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-examples</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!--mysql依賴的jar包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <build>
        <plugins>
            <!-- 在maven項目中既有java又有scala代碼時配置 maven-scala-plugin 插件打包時可以將兩類代碼一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <!-- MAVEN 編譯使用的JDK版本 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase><!--綁定到package生命周期階段-->
                        <goals>
                            <goal>single</goal><!--只運行一次-->
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <!--<finalName></finalName>&lt;!&ndash;主類入口&ndash;&gt;-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • WorkCount案例

    1. 在resources文件夾下,新建word.csv文件
    hello,spark
    hello,scala,hadoop
    hello,hdfs
    hello,spark,hadoop
    hello
    
    1. WorkCount.scala
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WorkCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("WorkCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val tuples: Array[(String, Int)] = sc.textFile(ClassLoader.getSystemResource("word.csv").getPath)
          .flatMap(_.split(","))
          .map((_, 1))
          .reduceByKey(_ + _)
          .collect()
        tuples.foreach(println)
      }
    }
    

    結果:

    (scala,1)
    (hello,5)
    (spark,2)
    (hadoop,2)
    (hdfs,1)

4 Spark Core

4.1 什么是RDD

Resilient Distributed DataSet:彈性分布式數據集,是Spark中最基本數據抽象,可以理解為數據集合。

在代碼中是一個抽象類,它代表一個彈性的、不可變的、可分區,里面的元素可並行計算的集合。

4.2 RDD的五個主要特性

  1. 分區性
    • 多個分區,分區可以看成是數據集的基本組成單位
    • 對於RDD來說,每個分區都會被一個計算任務處理,並決定了並行計算的粒度。
    • 用戶可以在創建RDD時,指定RDD的分區數,如果沒有指定,那么采用默認值(程序所分配到的CPU Coure的數目)
    • 每個分配的儲存是由BlockManager實現的,每個分區都會被邏輯映射成BlockManager的一個Block,而這個Block會被一個Task負責計算。
  2. 計算每個分區的函數
    • Spark中RDD的計算是以分區為單位的,每個RDD都會實現compute函數以達到這個目的
  3. 依賴性
    • RDD的每次轉換都會生成一個新的RDD,所以RDD之間會形成類似於流水線一樣的前后依賴關系,在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
  4. 對儲存鍵值對的RDD,還有一個可選的分區器
    • 只有對key-value的RDD,才會有Partitioner,非key-value的RDD的Rartitioner的值是None
    • Partitioner不但決定了RDD的分區數量,也決定了parent RDD Shuffle輸出時的分區數量
    • 默認是HashPartitioner,還有RangePartition,自定義分區
  5. 儲存每個分區優先位置的列表(本地計算性)
    • 比如對於一個HDFS文件來說,這個列表保存的就是每個Partition所在文件快的位置,按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的儲存位置。

4.3 Transformation和Action算子

在Spark中,Transformation算子(也稱轉換算子),在沒有Action算子(也稱行動算子)去觸發的時候,是不會執行的,可以理解為懶算子,而Action算子可以理解為觸發算子,常用Action算子如下:

  • redece:通過函數聚集RDD的所有元素,先聚合分區內的數據,在聚合分區間的數據(預聚合)
  • collect:以數組的形式返回RDD中的所有元素,所有數據都會被拉到Driver端,內存開銷很大,所以慎用
  • count:返回RDD中元素個數
  • take:返回RDD中前N個元素組成的數組
  • first:返回RDD中的第一個元素,類似於tack(1)
  • takeOrdered:返回排序后的前N個元素,默認升序,數據也會拉到Driver端
  • aggregate:分區內聚合后,在分區間聚合
  • fold:aggregate簡化操作,如果分區內和分區間算法一樣,則可以使用
  • saveAsTextFile:將數據集的元素以textFile的形式保存到HDFS文件系統或者其他文件系統,對每個元素,Spark都會調用toString方法轉換為文本
  • saveAsSequenceFile:將數據集的元素以Hadoop SquenceFile的形式保存到指定目錄下,可以是HDFS或者其他文件系統
  • saveAsObjectFile:將RDD中的元素序列化成對象,儲存到文件中
  • countByKey:針對k-v類型RDD,返回一個Map(Key,count),可以用來查看數據是否傾斜
  • foreach:針對RDD中的每一個元素都執行一次函數,每個函數實在Executor上執行的

常用Transformation算子如下:

  • map:輸入變換函數應用於RDD中所有元素,轉換其類型
  • mapPartitions:輸入變換函數應用於每個分區中所有元素
  • mapPartitionsWithIndex:輸入變換函數應用於每個分區中所有元素,帶有分區號
  • filter:過濾算子
  • flatMap:扁平化算子
  • sample:抽樣算子
  • union:並集算子
  • intersection:交集算子
  • distinct:去重算子
  • groupByKey:根據Key分組算子
  • reduceByKey:根據Key聚合算子
  • aggregateByKey:根據Key聚合算子
  • sortByKey:根據Key排序算子
  • join:鏈接算子
  • coalesce:壓縮分區算子
  • repartition:重分區算子

4.4 RDD的創建

4.4.1 從集合中創建

object Demo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WorkCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * 通過parallelize方法傳入序列得到RDD
      * 傳入分區數為1,結果為1	2	3	4	5	6	7	8	9	10
      * 傳入分區數大於1,結果順序不定,因為數據被打散在2個分區里
      * */
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 1)
    rdd.foreach(x => print(x + "\t"))
  }
}

4.4.2 從外部儲存創建RDD

  • 讀取textFile

WordCount案例介紹了此種用法

  • 讀取Json文件

在idea中,resources目錄下創建word.json文件

{"name": "zhangsa"}
{"name": "lisi", "age": 30}
{"name": "wangwu"}
["aa","bb"]
object Demo0 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("json").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[String] = sc.textFile(this.getClass().getClassLoader.getResource("word.json").getPath)
    val rdd2: RDD[Option[Any]] = rdd1.map(JSON.parseFull(_))
    rdd2.foreach(println)
    /**
      * Some(Map(name -> zhangsa))
      * Some(Map(name -> wangwu))
      * Some(List(aa, bb))
      * Some(Map(name -> lisi, age -> 30.0))
      * */
  }
}
  • 讀取Object對象文件
object Demo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("object").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
//    rdd1.saveAsObjectFile("hdfs://cdh01.cm/test")

    val rdd2: RDD[Nothing] = sc.objectFile("hdfs://cdh01.cm/test")
    rdd2.foreach(println)

    /**
      * 2
      * 5
      * 1
      * 4
      * 3
      * */
  }
}

4.4.3 從其他RDD轉換得到新的RDD

  • 根據RDD的數據類型的不同,整體分為2種RDD:Value類型,Key-Value類型(二維元組)

map()返回一個新的RDD,該RDD是由原RDD的每個元素經過函數轉換后的值組成,主要作用就是轉換結構。(不存在shuffle)

  • 案例一:
object Demo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("map").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * map算子,一共有多少元素就會執行多少次,和分區數無關
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 1)
    val mapRdd: RDD[Int] = rdd.map(x => {
      println("執行") //一共被執行10次
      x * 2
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t")) //2	4	6	8	10	12	14	16	18	20
  }
}
  • 案例二:
object demo3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitions").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * mapPartitions算子,一個分區內處理,幾個分區就執行幾次,優於map函數
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val mapRdd: RDD[Int] = rdd.mapPartitions(it => {
      println("執行") //分區2次,共打印2次
      it.map(x => x * 2)
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t")) //2	4	6	8	10	12	14	16	18	20
  }
}
  • 案例三:
object Demo4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * mapPartitionsWithIndex算子,一個分區內處理,幾個分區就執行幾次,返回帶有分區號的結果集
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, it) => it.map((index, _)))
    val result: Array[(Int, Int)] = value.collect()
    result.foreach(x => print(x + "\t")) //(0,1)	(0,2)	(0,3)	(0,4)	(0,5)	(1,6)	(1,7)	(1,8)	(1,9)	(1,10)
  }
}

4.5 flatMap

扁平化(不存在shuffle)

object Demo5 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flatMap").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
    val map_result: RDD[String] = rdd.map(ele => ele._1 + ele._2)
    val flatMap_result: RDD[Char] = rdd.flatMap(ele => ele._1 + ele._2)
    
    /**
      * C3
      * A1
      * B2
      **/
    map_result.foreach(println)

    /**
      * B
      * A
      * C
      * 1
      * 2
      * 3
      **/
    flatMap_result.foreach(println)
  }
}

4.6 glom

將每一個分區的元素合並成一個數組,形成新的RDD類型:RDD[Array[T]] (不存在shuffle)

object Demo6 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("glom").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10), 3)
    val result: RDD[Array[Int]] = rdd.glom()

    /**
      * 1,2,3
      * 7,8,9,10
      * 4,5,6
      * */
    result.foreach(x=>{
      println(x.toList.mkString(","))
    })

  }
}

4.7 groupBy

根據條件函數分組(存在shuffle)

object Demo7 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("groupBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))
    val result1: RDD[(Int, Iterable[Int])] = rdd.groupBy(x => x % 2)
    val result2: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(x => x % 2 == 0)

    /**
      * (0,CompactBuffer(2, 4, 6, 8, 10))
      * (1,CompactBuffer(1, 3, 5, 7, 9))
      **/
    result1.foreach(println)

    /**
      * (true,CompactBuffer(2, 4, 6, 8, 10))
      * (false,CompactBuffer(1, 3, 5, 7, 9))
      **/
    result2.foreach(println)
  }
}

4.8 filter

過濾(不存在shuffle)

object Demo8 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("filter").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))
    val result: RDD[Int] = rdd.filter(x => x % 2 == 0)
    result.foreach(x => print(x + "\t"))  //6	10	8	4	2	
  }
}

4.9 sample

sample(withReplacement,fraction,seed)抽樣,常用在解決定位大key問題

  • 以指定的隨機種子隨機抽樣出比例為fraction的數據(抽取到的數量是size*fraction),注意:得到的結果並不能保證准確的比例,也就是說fraction只決定了這個數被選中的比率,並不是從數據中抽出多少百分比的數據,決定的不是個數,而是比率。
  • withReplacement表示抽出的數據是否放回,true為有放回抽樣,flase為無放回抽樣,放回表示數據有可能會被重復抽取到,false則不可能重復抽取到,如果為false則fraction必須在[0,1]內,是true則大於0即可。
  • seed用於指定隨機數生成器種子,一般默認的,或者傳入當前的時間戳,(如果傳入定值,每次取出結果一樣)
object Demo9 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sample").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))

    /**
      * 不放回抽樣
      * 從結果中可以看出,抽出結果沒有重復
      * */
    val result1: RDD[Int] = rdd.sample(false,0.5)
    result1.foreach(println)
    /**
      * 放回抽樣
      * 從結果中可以看出,抽出結果有重復
      * */
    val result2: RDD[Int] = rdd.sample(true,2)
    result2.foreach(println)
  }
}

4.10 distinct

distinct([numTasks])去重,參數表示任務數量,默認值和分區數保持一致(不存在shuffle)

object Demo10 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("distinct").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4,2,3,4,3,4,5))
    val result: RDD[Int] = rdd.distinct(2)
    result.foreach(println)
  }
}

4.11 coalesce

coalesce(numPatitions)縮減,縮減分區到指定數量,用於大數據集過濾后,提高小數據集的執行效率,只能減不能加。(不存在shuffle)

object Demo11 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("coalesce").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10),5)
    println(rdd.partitions.length)  //5
    val result: RDD[Int] = rdd.coalesce(2)
    println(result.partitions.length)  //2
  }
}

4.12 repartition

repartition(numPatitions)更改分區,更改分區到指定數量,可加可減,但是減少還是使用coalesce,將這個理解為增加。(存在shuffle)

object Demo12 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("repartition").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10),2)
    println(rdd.partitions.length)  //2
    val result: RDD[Int] = rdd.repartition(5)
    println(result.partitions.length)  //5
  }
}

4.13 sortBy

排序(存在shuffle)

object Demo13 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sortBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(Array(4, 2, 3, 1, 5), 1)
    val result1: RDD[Int] = rdd.sortBy(x => x, false)
    result1.foreach(x => print(x + "\t"))  //5	4	3	2	1
    val result2: RDD[Int] = rdd.sortBy(x => x, true)
    result2.foreach(x => print(x + "\t"))  //1	2	3	4	5
  }
}

4.14 RDD與RDD互交

  • 並集:union
  • 差集:subtract
  • 交集:intersection
  • 笛卡爾積:cartesian
  • 拉鏈:zip
object Demo14 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD AND RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[Int] = sc.parallelize(1.to(5))
    val rdd2: RDD[Int] = sc.parallelize(3.to(8))

    //並集
    rdd1.union(rdd2).collect().foreach(x => print(x + "\t"))  //1	2	3	4	5	3	4	5	6	7	8
    //差集
    rdd1.subtract(rdd2).collect().foreach(x => print(x + "\t")) //1	2
    //交集
    rdd1.intersection(rdd2).collect().foreach(x => print(x + "\t")) //3	4	5\
    //笛卡爾積
    /*(1,3)	(1,4)	(1,5)	(1,6)	(1,7)	(1,8)
      (2,3)	(2,4)	(2,5)	(2,6)	(2,7)	(2,8)
      (3,3)	(3,4)	(3,5) (3,6) (3,7)	(3,8)
      (4,3)	(4,4)	(4,5)	(4,6)	(4,7)	(4,8)
      (5,3)	(5,4)	(5,5)	(5,6)	(5,7)	(5,8)*/
    rdd1.cartesian(rdd2).collect().foreach(x => print(x + "\t"))
    //拉鏈:必須保證RDD分區元素數量相同
    val rdd3: RDD[Int] = sc.parallelize(1.to(5))
    val rdd4: RDD[Int] = sc.parallelize(2.to(6))
    rdd3.zip(rdd4).collect().foreach(x => print(x + "\t"))  //(1,2)	(2,3)	(3,4)	(4,5)	(5,6)
  }
}

4.15 k-v類型 partitionBy

大多數Spark算子都可以用在任意類型的RDD上,但是有一些比較特殊的操作只能用在key-value類型的RDD上

使用HashPartitioner分區器

object Demo15 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    println(rdd2.partitions.length) //1
    println(rdd2.partitioner) //None
    val rdd3: RDD[(String, Int)] = rdd2.partitionBy(new HashPartitioner(2))
    println(rdd3.partitions.length) //2
    println(rdd3.partitioner) //Some(org.apache.spark.HashPartitioner@2)
    val result: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, it) => {
      it.map(x => (index, (x._1, x._2)))
    })
    result.foreach(println)

    /**
      * (1,(spark,1))
      * (0,(hello,1))
      * (0,(hadooop,1))
      * (0,(hello,1))
      **/
  }
}

自定義分區器

object Demo16 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    println(rdd2.partitions.length) //1
    println(rdd2.partitioner) //None

    val rdd3: RDD[(String, Int)] = rdd2.partitionBy(new MyPatitioner(2))
    println(rdd3.partitions.length) //2
    println(rdd3.partitioner) //Some(com.test.sparkcore.MyPatitioner@769a58e5)
    val result: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, it) => {
      it.map(x => (index, (x._1, x._2)))
    })
    result.foreach(println)

    /**
      * (0,(hadooop,1))
      * (1,(hello,1))
      * (0,(spark,1))
      * (1,(hello,1))
      **/
  }
}

class MyPatitioner(num: Int) extends Partitioner {
  override def numPartitions: Int = num

  override def getPartition(key: Any): Int = {
    System.identityHashCode(key) % num.abs
  }
}

4.16 k-v類型 reduceByKey

reduceByKey(V , V)=>V 根據key進行聚合,在shuffle之前會有combine(預聚合)操作

object Demo17 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.reduceByKey(_ + _)
    result.foreach(x => print(x + "\t"))  //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.17 k-v類型 groupByKey

根據key進行分組,直接shuffle

object Demo18 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Iterable[Int])] = rdd2.groupByKey()
    result.foreach(x => print(x + "\t"))  //(spark,CompactBuffer(1))	(hadooop,CompactBuffer(1))	(hello,CompactBuffer(1, 1))
    result.map(x=>(x._1,x._2.size)).foreach(x => print(x + "\t")) 	//(spark,1)	(hadooop,1)	(hello,2)      
  }
}

4.18 k-v類型 aggrateByKey

aggrateByKey(zero : U)(( U , V )=>U , (U , U)=>U)

基於Key分組然后去聚合的操作,耗費資源太多,這時可以使用reduceByKey或aggrateByKey算子去提高性能

aggrateByKey分區內聚合,后在進行shuffle聚合。

object Demo19 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("aggregateByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.aggregateByKey(0)(_ + _, _ + _)
    result.foreach(x => print(x + "\t")) //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.19 k-v類型 foldByKey

foldByKey(zero : V)((V , V)=>V) 折疊計算,沒有aggrateByKey靈活,如果分區內和分區外聚合計算不一樣,則不行

object Demo20 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("foldByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.foldByKey(0)(_+_)
    result.foreach(x => print(x + "\t")) //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.20 k-v類型 combineByKey

combineByKey(V=>U,(U , V)=>U , (U , U)=>U) 根據Key組合計算

object Demo21 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("combineByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.combineByKey(v => v, (c: Int, v: Int) => c + v, (c1: Int, c2: Int) => c1 + c2)
    result.foreach(x => print(x + "\t"))  //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.21 k-v類型 sortByKey

根據Key排序

object Demo22 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("ahello", "bhadooop", "chello", "dspark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    rdd2.sortByKey(false).foreach(x => print(x + "\t")) //(dspark,1)	(chello,1)	(bhadooop,1)	(ahello,1)
  }
}

4.22 k-v類型 mapValues

只對value操作的map轉換操作

object Demo23 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapValues").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    rdd2.mapValues(x => x + 1).foreach(x => print(x + "\t")) //(hello,2)	(hadooop,2)	(hello,2)	(spark,2)
  }
}

4.23 k-v類型 join

object Demo24 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("join").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[(String, Int)] = sc.parallelize(Array(("a",10),("b",10),("a",20),("d",10)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a",30),("b",20),("c",10)))
    //內連接 (a,(10,30))	(b,(10,20))	(a,(20,30))
    rdd1.join(rdd2).foreach(x => print(x + "\t"))

    //左鏈接(b,(10,Some(20)))	(d,(10,None))	(a,(10,Some(30)))	(a,(20,Some(30)))
    rdd1.leftOuterJoin(rdd2).foreach(x => print(x + "\t"))

    //右鏈接(c,(None,10))	(a,(Some(10),30))	(b,(Some(10),20))	(a,(Some(20),30))
    rdd1.rightOuterJoin(rdd2).foreach(x => print(x + "\t"))
    
    //全鏈接(b,(Some(10),Some(20)))	(c,(None,Some(10)))	(d,(Some(10),None))	(a,(Some(10),Some(30)))	(a,(Some(20),Some(30)))
    rdd1.fullOuterJoin(rdd2).foreach(x => print(x + "\t"))
  }
}

4.24 k-v類型 cogroup

根據Key聚合RDD

object Demo25 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cogroup").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[(String, Int)] = sc.parallelize(Array(("a",10),("b",10),("a",20),("d",10)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a",30),("b",20),("c",10)))

    /**
      * (c,(CompactBuffer(),CompactBuffer(10)))
      * (b,(CompactBuffer(10),CompactBuffer(20)))
      * (a,(CompactBuffer(10, 20),CompactBuffer(30)))
      * (d,(CompactBuffer(10),CompactBuffer()))
      */
    rdd1.cogroup(rdd2).foreach(println)
  }
}

4.25 keyo序列化

在分布式應用中,經常會進行IO操作,傳遞對象,而網絡傳輸過程中就必須要序列化。

Java序列化可以序列化任何類,比較靈活,但是相當慢,並且序列化后對象的提交也比較大。

Spark出於性能考慮,在2.0以后,開始支持kryo序列化機制,速度是Serializable的10倍以上,當RDD在Shuffle數據的時候,簡單數據類型,簡單數據類型數組,字符串類型已經使用kryo來序列化。

object Demo26 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("keyo")
      .setMaster("local[*]")
      //替換默認序列化機制
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //注冊需要使用的kryo序列化自定義類
      .registerKryoClasses(Array(classOf[MySearcher]))

    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hadoop yarn", "hadoop hdfs", "c"))
    val rdd2: RDD[String] = MySearcher("hadoop").getMathcRddByQuery(rdd1)
    rdd2.foreach(println)
  }
}

case class MySearcher(val query: String) {
  def getMathcRddByQuery(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

4.26 依賴

  • 窄依賴:(不會shuffle)

    • 如果RDD2由RDD1計算得到,則RDD2就是子RDD,RDD1就是父RDD
    • 如果依賴關系在設計的時候就可以確定,而不需要考慮父RDD分區中的記錄,並且父RDD中的每個分區最多只有一個子分區,這就叫窄依賴
    • 父RDD的每個分區中的數據最多被一個子RDD的分區使用
  • 寬依賴:(會shuffle)

    • 寬依賴往往對應着shuffle操作,需要在運行過程中將同一個父RDD的分區傳入到不同的子RDD分區中。
    • 對於寬依賴,重算的父RDD分區對應多個子RDD分區,這樣實際上父RDD 中只有一部分的數據是被用於恢復這個丟失的子RDD分區的,另一部分對應子RDD的其它未丟失分區,這就造成了多余的計算;
    • 寬依賴中子RDD分區通常來自多個父RDD分區,極端情況下,所有的父RDD分區都要進行重新計算。

4.27 持久化

object Demo27 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cache").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("a", "b", "c"))
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("執行flatMap操作")
      x.split("")
    })
    val rdd3: RDD[(String, Int)] = rdd2.map((_, 1))

    /** 持久化到內存 */
    //rdd3.cache() //持久化到內存
    /**
      * 持久化到磁盤
      * DISK_ONLY:持久化到磁盤
      * DISK_ONLY_2:持久化到磁盤並且存一個副本(2個文件)
      * MEMORY_ONLY:持久化到內存
      * MEMORY_ONLY_2:持久化到內存並且存一個副本(2個文件)
      * MEMORY_ONLY_SER:持久化到內存,並且序列化
      * MEMORY_ONLY_SER_2:持久化到內存,並且序列化,還要存一個副本(2個文件)
      * MEMORY_AND_DISK:持久化到內存和磁盤
      * MEMORY_AND_DISK_2:持久化到內存和磁盤並且存一個副本(2個文件)
      * MEMORY_AND_DISK_SER:持久化到內存和磁盤,並且序列化
      * MEMORY_AND_DISK_SER_2:持久化到內存和磁盤,並且序列化,還要存一個副本(2個文件)
      * OFF_HEAP:持久化在堆外內存中,Spark自己管理的內存
      * */
    rdd3.persist(StorageLevel.DISK_ONLY) //持久化到磁盤

    rdd3.collect.foreach(x => print(x + "\t"))
    println("------------")
    //輸出語句不會執行
    rdd3.collect.foreach(x => print(x + "\t"))
  }
}

4.28 checkpoint

持久化只是將數據保存在BlockManager中,而RDD的Lineage是不變的,但是checkpoint執行完后,RDD已經沒有之前所謂的依賴了,而只是一個強行為其設定的checkpointRDD,RDD的Lineage改變了。

持久化的數據丟失可能性更大,磁盤、內存都有可能會存在數據丟失情況。但是checkpoint的數據通常是儲存在如HDFS等容錯、高可用的文件系統,數據丟失可能性較小。

默認情況下,如果某個RDD沒有持久化,但是設置了checkpoint Job想要將RDD的數據寫入文件系統,需要全部重新計算一次,再將計算出來的RDD數據checkpoint到文件系統,所以,建議對checkpoint的RDD使用十九畫,這樣RDD只需要計算一次就可以了。

object Demo28 {
  def main(args: Array[String]): Unit = {
    //設置當前用戶
    System.setProperty("HADOOP_USER_NAME", "Heaton")
    val conf = new SparkConf().setAppName("checkpoint").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //設置checkpoint目錄
    sc.setCheckpointDir("hdfs://cdh01.cm:8020/test")
    val rdd1: RDD[String] = sc.parallelize(Array("abc"))
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))

    /**
      * 標記RDD2的checkpoint
      * RDD2會被保存到文件中,並且會切斷到父RDD的引用,該持久化操作,必須在job運行之前調用
      * 如果不進行持久化操作,那么在保存到文件的時候需要重新計算
      **/
    rdd2.cache()
    rdd2.collect.foreach(x => print(x + "\t"))
    rdd2.collect.foreach(x => print(x + "\t"))
  }
}

4.29 累加器

4.29.1 累加器問題拋出

object Demo29 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    var a = 1
    rdd1.foreach(x => {
      a += 1
      println("rdd:  "+a)
    })
    println("-----")
    println("main:  "+a)

    /**
      * rdd:  2
      * rdd:  2
      * rdd:  3
      * rdd:  3
      * rdd:  4
      * -----
      * main:  1
      * */
  }
}

從上面可以看出,2個問題:

  1. 變量是在RDD分區中進行累加,並且2個RDD分區中的變量不同
  2. 最后並沒有main方法中的變量值改變

考慮到main方法中的a變量是在Driver端,而RDD分區又是在Excutor端進行計算,所以只是拿了一個Driver端的鏡像,而且不同步回Driver端

在實際開發中,我們需要進行這種累加,這時就用到了累加器

4.29.2 累加器案例

Spark提供了一些常用累加器,主要針對值類型

object Demo30 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc: util.LongAccumulator = sc.longAccumulator("acc")
    rdd1.foreach(x => {
      acc.add(1)
      println("rdd:  "+acc.value)
    })
    println("-----")
    println("main:  "+acc.count)

    /**
      * rdd:  1
      * rdd:  1
      * rdd:  2
      * rdd:  2
      * rdd:  3
      * -----
      * main:  5
      * */
  }
}

如上代碼,我們發現累加器是分區內先累加,再分區間累加

4.29.3 自定義累加器

  • 案例一:自定義Int累加器
object Demo31 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc = new MyAccumulator
    //注冊累加器
    sc.register(acc)

    rdd1.foreach(x => {
      acc.add(1)
      println("rdd:  " + acc.value)
    })
    println("-----")
    println("main:  " + acc.value)

    /**
      * rdd:  1
      * rdd:  1
      * rdd:  2
      * rdd:  3
      * rdd:  2
      * -----
      * main:  5
      **/
  }
}

class MyAccumulator extends AccumulatorV2[Int, Int] {
  var sum: Int = 0

  //判斷累加的值是不是空
  override def isZero: Boolean = sum == 0

  //如何把累加器copy到Executor
  override def copy(): AccumulatorV2[Int, Int] = {
    val accumulator = new MyAccumulator
    accumulator.sum = sum
    accumulator
  }

  //重置值
  override def reset(): Unit = {
    sum = 0
  }

  //分區內的累加
  override def add(v: Int): Unit = {
    sum += v
  }

  //分區間的累加,累加器最終的值
  override def merge(other: AccumulatorV2[Int, Int]): Unit = {
    other match {
      case o: MyAccumulator => this.sum += o.sum
      case _ =>
    }
  }

  override def value: Int = this.sum
}
  • 案例二:自定義map平均值累加器
object Demo32 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc = new MyAccumulator
    //注冊累加器
    sc.register(acc)

    rdd1.foreach(x => {
      acc.add(x)
    })
    println("main:  " + acc.value)

    /**main:  Map(sum -> 15.0, count -> 17.0, avg -> 0.8823529411764706) */
  }
}

class MyAccumulator extends AccumulatorV2[Int, Map[String, Double]] {
  var map: Map[String, Double] = Map[String, Double]()

  //判斷累加的值是不是空
  override def isZero: Boolean = map.isEmpty

  //如何把累加器copy到Executor
  override def copy(): AccumulatorV2[Int, Map[String, Double]] = {
    val accumulator = new MyAccumulator
    accumulator.map ++= map
    accumulator
  }

  //重置值
  override def reset(): Unit = {
    map = Map[String, Double]()
  }

  //分區內的累加
  override def add(v: Int): Unit = {
    map += "sum" -> (map.getOrElse("sum", 0d) + v)
    map += "count" -> (map.getOrElse("sum", 0d) + 1)
  }

  //分區間的累加,累加器最終的值
  override def merge(other: AccumulatorV2[Int, Map[String, Double]]): Unit = {
    other match {
      case o: MyAccumulator =>
        this.map += "sum" -> (map.getOrElse("sum", 0d) + o.map.getOrElse("sum", 0d))
        this.map += "count" -> (map.getOrElse("count", 0d) + o.map.getOrElse("count", 0d))
      case _ =>
    }
  }

  override def value: Map[String, Double] = {
    map += "avg" -> map.getOrElse("sum", 0d) / map.getOrElse("count", 1d)
    map
  }
}

4.30 廣播變量

廣播變量在每個節點上保存一個只讀的變量的緩存,而不用給每個task來傳送一個copy

object Demo33 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[String] = sc.parallelize(Array("a", "b"))
    val broadArr: Broadcast[Array[Int]] = sc.broadcast(Array(1, 2))
    rdd.foreach(x => {
      val value: Array[Int] = broadArr.value
      println(value.toList)
    })
    /**
      * List(1, 2)
      * List(1, 2)
      * */
  }
}

5 Spark SQL

Spark SQL是Spark用於結構化數據處理的Spark模塊。如:Mysql,Hbase,Hive

Spark SQL將SQL轉換成RDD,然后提交到集群執行,執行效率非常快,而且使只會寫SQL的同學可以直接開發

Spark SQL提供了2個編程抽象,等同於Spark Core中的RDD,分別是:DataFrame,DataSet

5.1 DataFrame

與RDD類似,DataFrame是一個分布式的數據容器

DataFrame更像是傳統數據庫的二維表格,除了數據以外,還記錄了數據的結構信息(Schema)

與Hive類似,DataFrame也支持嵌套數據類型(Struct、Array、Map)

  • 底層架構

  • Predicate Pushdown 機制

5.2 DataSet

DataSet是DataFrame的一個擴展,是SparkSQL1.6后新增的數據抽象,API友好

scala樣例類支持非常好,用樣例類在DataSet中定義數據結構信息,樣例類中每個屬性的沒成直接映射到DataSet中的字段名稱。

DataFrame是DataSet的特例,DataFrame=DataSet[Row],可以通過as方法將DataFrame轉換成DataSet,Row是一個類型,可以是Person、Animal,所有的表結構信息都用Row來表示

DataFrame只知道字段,不知道字段類型,而DataSet不僅知道字段,還知道類型。

DataSet具有強類型的數據集合,需要提供對應的類型信息。

5.3 SparkSession

從Spark2.0開始,SparkSession是Spark新的查詢起始點,其內部封裝了SparkContext,所以計算實際上是由SparkContext完成

5.4 DataFrame編程

5.4.1 解析Json數據

  • 讀取Json文件

在idea中,resources目錄下創建student.json文件

{"id":1,"name": "zhangsa", "age": 10}
{"id":2,"name": "lisi", "age": 20}
{"id":3,"name": "wangwu", "age": 30}
{"id":4,"name": "zhaoliu", "age": 12}
{"id":5,"name": "hahaqi", "age": 24}
{"id":6,"name": "xixiba", "age": 33}
object SparkSQLDemo1 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder().appName("demo1").master("local[*]") getOrCreate()
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.show(100)
    /**
      * +---+---+-------+
      * |age| id|   name|
      * +---+---+-------+
      * | 10|  1|zhangsa|
      * | 20|  2|   lisi|
      * | 30|  3| wangwu|
      * | 12|  4|zhaoliu|
      * | 24|  5| hahaqi|
      * | 33|  6| xixiba|
      * +---+---+-------+
      */
    println(frame.schema)
    /**
      * StructType(StructField(age,LongType,true), StructField(id,LongType,true), StructField(name,StringType,true))
      */
  }
}

5.4.2 TempView

  • 在使用sql查詢之前需要注冊臨時視圖
    • createTempView():注冊視圖,當前Session有效
    • createOrReplaceTempView():注冊視圖,當前Session有效,如果已經存在,那么替換
    • createGlobalTempView():注冊全局視圖,在所有Session中生效
    • createOrReplaceGlobalTempView():注冊全局視圖,在所有Session中生效,如果已經存在,那么替換

使用全局視圖,需要在表名前添加global_tmp,如student表,寫法為:global_tmp.student

object SparkSQLDemo2 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder().appName("demo2").master("local[*]") getOrCreate()
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)

    frame.createOrReplaceTempView("student")
    val result: DataFrame = spark.sql("select * from student where age >= 20")
    result.show()
    /**
      * +---+---+------+
      * |age| id|  name|
      * +---+---+------+
      * | 20|  2|  lisi|
      * | 30|  3|wangwu|
      * | 24|  5|hahaqi|
      * | 33|  6|xixiba|
      * +---+---+------+
      */
  }
}

5.5 DataSet編程

  • DataSet簡單使用
object SparkSQLDemo3 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder().appName("demo3").master("local[*]") getOrCreate()
    import spark.implicits._
    val sRDD: Dataset[Student] = Seq(Student(1,"zhangsan",15),Student(2,"lisi",16)).toDS
    sRDD.foreach(s=>{
      println(s.name+":"+s.age)
    })
    /**
      * zhangsan:15
      * lisi:16
      * */
  }
}

case class Student(id: Long, name: String, age: Long)

5.6 DataSet和DataFrame和RDD互相轉換

涉及到RDD,DataFrame,DataSet之間操作時,需要隱式轉換導入: import spark.implicits._ 這里的spark不是報名,而是代表了SparkSession的那個對象名,所以必須先創建SparkSession對象在導入

RDD轉DF:toDF

RDD轉DS:toDS

DF轉RDD:rdd

DS轉RDD:rdd

DS轉DF:toDF

DF轉DS:as

  • 創建student.csv文件
1,zhangsa,10
2,lisi,20
3,wangwu,30
object SparkSQLDemo4 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder().appName("demo4").master("local[*]") getOrCreate()
    import spark.implicits._

    val rdd: RDD[String] = spark.sparkContext.textFile(this.getClass.getClassLoader.getResource("student.csv").getPath)
    val studentRDD: RDD[Student] = rdd.map(x => {
      val arr: Array[String] = x.split(",")
      Student(arr(0).toLong, arr(1), arr(2).toLong)
    })
    /** 1. RDD轉DF
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val df1: DataFrame = studentRDD.toDF()
    df1.show()
    /** 2. RDD轉DS
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val ds1: Dataset[Student] = studentRDD.toDS()
    ds1.show()

    /** 3. DF轉RDD
      * List([1,zhangsa,10], [2,lisi,20], [3,wangwu,30])
      * */
    val rdd1: RDD[Row] = df1.rdd
    println(rdd1.collect.toList)

    /** 4. DS轉RDD
      * List(Student(1,zhangsa,10), Student(2,lisi,20), Student(3,wangwu,30))
      * */
    val rdd2: RDD[Student] = ds1.rdd
    println(rdd2.collect.toList)

    /** 5. DS轉DF
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val df2: DataFrame = ds1.toDF()
    df2.show()

    /** 6. DF轉DS
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val ds2: Dataset[Student] = df2.as[Student]
    ds2.show()
  }
}

case class Student(id: Long, name: String, age: Long)

5.7 UDF函數:一對一

object SparkSQLDemo5 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder().appName("demo5").master("local[*]") getOrCreate()
    //注冊函數 
    val toUpper: UserDefinedFunction = spark.udf.register("toUpper", (s: String) => s.toUpperCase)

    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("student")

    val result: DataFrame = spark.sql("select id,toUpper(name),age from student where age >= 20")
    result.show()

    /**
      * +---+-----------------+---+
      * | id|UDF:toUpper(name)|age|
      * +---+-----------------+---+
      * |  2|             LISI| 20|
      * |  3|           WANGWU| 30|
      * |  5|           HAHAQI| 24|
      * |  6|           XIXIBA| 33|
      * +---+-----------------+---+
      **/
  }
}

5.8 UDAF函數:多對一

object SparkSQLDemo6 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder().appName("demo6").master("local[*]") getOrCreate()
    //注冊函數
    spark.udf.register("MyAvg", new MyAvg)

    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("student")
    frame.printSchema()
    val result: DataFrame = spark.sql("select sum(age),count(1),MyAvg(age) from student")
    result.show()

    /**
      * +--------+--------+----------+
      * |sum(age)|count(1)|myavg(age)|
      * +--------+--------+----------+
      * |     129|       6|      21.5|
      * +--------+--------+----------+
      * */
  }
}

class MyAvg extends UserDefinedAggregateFunction {
  //輸入數據類型
  override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)

  //緩沖區中值的類型
  override def bufferSchema: StructType = StructType(StructField("sum", DoubleType) :: StructField("count", LongType) :: Nil)

  //最終輸出數據類型
  override def dataType: DataType = DoubleType

  //輸入和輸出之間的確定性,一般都是true
  override def deterministic: Boolean = true

  //緩沖區中值的初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //sum
    buffer(0) = 0.0d
    //count
    buffer(1) = 0L
  }

  //分區內聚合
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //如果值不為空
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getDouble(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }

  //分區間聚合
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //如果值不為空
    if (!buffer2.isNullAt(0)) {
      buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
      buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
  }

  //最終輸出的值
  override def evaluate(buffer: Row): Any = {
    new DecimalFormat(".00").format(buffer.getDouble(0) / buffer.getLong(1)).toDouble
  }
}

5.9 UDTF函數:一對多

需要使用Hive的UDTF

import java.util.ArrayList
import org.apache.hadoop.hive.ql.exec.{UDFArgumentException, UDFArgumentLengthException}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQLDemo7 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder()
      .appName("demo7")
      .master("local[*]")
      .enableHiveSupport() //啟用hive
      .getOrCreate()

    import spark.implicits._

    //注冊utdf算子,這里無法使用sparkSession.udf.register(),注意包全路徑
    spark.sql("CREATE TEMPORARY FUNCTION MySplit as 'com.xx.xx.MySplit'")


    val frame: DataFrame = spark.sparkContext.parallelize(Array("a,b,c,d")).toDF("word")
    frame.createOrReplaceTempView("test")
    val result: DataFrame = spark.sql("select MySplit(word,',') from test")
    result.show()

    /**
      * +----+
      * |col1|
      * +----+
      * |   a|
      * |   b|
      * |   c|
      * |   d|
      * +----+
      */
  }
}

class MySplit extends GenericUDTF {

  override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {
    if (args.length != 2) {
      throw new UDFArgumentLengthException("UserDefinedUDTF takes only two argument")
    }
    if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter")
    }

    //列名,會被用戶傳遞的覆蓋
    val fieldNames: ArrayList[String] = new ArrayList[String]()
    fieldNames.add("col1")

    //返回列以什么格式輸出,這里是string,添加幾個就是幾個列,和上面的名字個數對應個數。
    var fieldOIs: ArrayList[ObjectInspector] = new ArrayList[ObjectInspector]()
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)

    ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)
  }

  override def process(objects: Array[AnyRef]): Unit = {
    //獲取數據
    val data: String = objects(0).toString
    //獲取分隔符
    val splitKey: String = objects(1).toString()
    //切分數據
    val words: Array[String] = data.split(splitKey)

    //遍歷寫出
    words.foreach(x => {
      //將數據放入集合
      var tmp: Array[String] = new Array[String](1)
      tmp(0) = x
      //寫出數據到緩沖區
      forward(tmp)
    })
  }

  override def close(): Unit = {
    //沒有流操作
  }
}

5.10 讀取Json數據拓展

  • 讀取嵌套json數據
{"name":"zhangsan","score":100,"infos":{"age":30,"gender":"man"}},
{"name":"lisi","score":66,"infos":{"age":28,"gender":"feman"}},
{"name":"wangwu","score":77,"infos":{"age":15,"gender":"feman"}}
object SparkSQLDemo8 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder()
      .appName("demo8")
      .master("local[*]")
      .getOrCreate()

    //讀取嵌套的json文件
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("infosView")
    spark.sql("select name,infos.age,score,infos.gender from infosView").show(100)

    /**
      * +--------+---+-----+------+
      * |    name|age|score|gender|
      * +--------+---+-----+------+
      * |zhangsan| 30|  100|   man|
      * |    lisi| 28|   66| feman|
      * |  wangwu| 15|   77| feman|
      * +--------+---+-----+------+
      **/
  }
}
  • 讀取嵌套jsonArray數據
{"name":"zhangsan","age":18,"scores":[{"yuwen":98,"shuxue":90,"yingyu":100,"xueqi":1},{"yuwen":77,"shuxue":33,"yingyu":55,"xueqi":2}]},
{"name":"lisi","age":19,"scores":[{"yuwen":58,"shuxue":50,"yingyu":78,"xueqi":1},{"yuwen":66,"shuxue":88,"yingyu":66,"xueqi":2}]},
{"name":"wangwu","age":17,"scores":[{"yuwen":18,"shuxue":90,"yingyu":45,"xueqi":1},{"yuwen":88,"shuxue":77,"yingyu":44,"xueqi":2}]},
{"name":"zhaoliu","age":20,"scores":[{"yuwen":68,"shuxue":23,"yingyu":63,"xueqi":1},{"yuwen":44,"shuxue":55,"yingyu":77,"xueqi":2}]},
{"name":"tianqi","age":22,"scores":[{"yuwen":88,"shuxue":91,"yingyu":41,"xueqi":1},{"yuwen":55,"shuxue":66,"yingyu":88,"xueqi":2}]}
object SparkSQLDemo8 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder()
      .appName("demo8")
      .master("local[*]")
      .getOrCreate()

    //讀取嵌套的json文件
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("infosView")
    spark.sql("select name,age,explode(scores) from infosView")
    //不折疊顯示
    frame.show(false)

    /**
      * +---+--------+-----------------------------------+
      * |age|name    |scores                             |
      * +---+--------+-----------------------------------+
      * |18 |zhangsan|[[90, 1, 100, 98], [33, 2, 55, 77]]|
      * |19 |lisi    |[[50, 1, 78, 58], [88, 2, 66, 66]] |
      * |17 |wangwu  |[[90, 1, 45, 18], [77, 2, 44, 88]] |
      * |20 |zhaoliu |[[23, 1, 63, 68], [55, 2, 77, 44]] |
      * |22 |tianqi  |[[91, 1, 41, 88], [66, 2, 88, 55]] |
      * +---+--------+-----------------------------------+
      */
  }
}

5.11 讀取Mysql數據

  • 使用Mysql
create database spark;
use spark;
create table person(id varchar(12),name varchar(12),age int(10));
insert into person values('1','zhangsan',18),('2','lisi',19),('3','wangwu',20);
object SparkSQLDemo9 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder()
      .appName("demo9")
      .master("local[*]")
      .getOrCreate()

    val frame: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "root")
      .option("dbtable", "person")
      .load()

    frame.show()

    /**
      * +---+--------+---+
      * | id|    name|age|
      * +---+--------+---+
      * |  1|zhangsan| 18|
      * |  2|    lisi| 19|
      * |  3|  wangwu| 20|
      * +---+--------+---+
      */
  }
}

5.12 讀取Hive數據

  • 使用Hive
//創建數據庫
CREATE DATABASE dwd
//創建表
CREATE EXTERNAL TABLE `dwd.student`(
  `ID` bigint COMMENT '',
  `CreatedBy` string COMMENT '創建人',
  `CreatedTime` string COMMENT '創建時間',
  `UpdatedBy`  string COMMENT '更新人',
  `UpdatedTime` string COMMENT '更新時間',
  `Version` int COMMENT '版本號',
  `name` string COMMENT '姓名'
  ) COMMENT '學生表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/test/student/'
tblproperties ("parquet.compression"="snappy")
//添加數據
INSERT INTO TABLE dwd.student partition(dt='2020-04-05') VALUES(1,"heaton","2020-04-05","","","1","zhangsan") 
INSERT INTO TABLE dwd.student partition(dt='2020-04-06') VALUES(2,"heaton","2020-04-06","","","1","lisi") 
  • 將服務端配置hive-site.xml,放入resources路徑
object SparkSQLDemo10 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder()
      .appName("demo10")
      .master("local[*]")
      .enableHiveSupport() //啟用hive
      .getOrCreate()

    spark.sql("select * from dwd.student").show()
    
    /**
      * +---+---------+-----------+---------+-----------+-------+--------+----------+
      * | id|createdby|createdtime|updatedby|updatedtime|version|    name|        dt|
      * +---+---------+-----------+---------+-----------+-------+--------+----------+
      * |  1|   heaton| 2020-04-05|         |           |      1|zhangsan|2020-04-05|
      * |  2|   heaton| 2020-04-06|         |           |      1|    lisi|2020-04-06|
      * +---+---------+-----------+---------+-----------+-------+--------+----------+
      */
  }
}

5.13 讀取Hbase數據

object SparkSQLDemo11 {
  def main(args: Array[String]): Unit = {
    //創建sparksession
    val spark = SparkSession.builder()
      .appName("demo11")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val hconf: Configuration = HBaseConfiguration.create
    hconf.set(HConstants.ZOOKEEPER_QUORUM, "cdh01.cm,cdh02.cm,cdh03.cm")
    hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
    //一定要創建這個hbaseContext, 因為后面寫入時會用到它,不然空指針
    val hBaseContext = new HBaseContext(spark.sparkContext, hconf)

    //構建DataSet
    val ds1: Dataset[HBaseRecord] = spark.sparkContext.parallelize(1.to(256)).map(i => new HBaseRecord(i, "Hbase")).toDS()

    //定義映射的catalog
    val catalog: String = "{" +
      "       \"table\":{\"namespace\":\"default\", \"name\":\"test1\"}," +
      "       \"rowkey\":\"key\"," +
      "       \"columns\":{" +
      "         \"f0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
      "         \"f1\":{\"cf\":\"cf1\", \"col\":\"f1\", \"type\":\"boolean\"}," +
      "         \"f2\":{\"cf\":\"cf2\", \"col\":\"f2\", \"type\":\"double\"}," +
      "         \"f3\":{\"cf\":\"cf3\", \"col\":\"f3\", \"type\":\"float\"}," +
      "         \"f4\":{\"cf\":\"cf4\", \"col\":\"f4\", \"type\":\"int\"}," +
      "         \"f5\":{\"cf\":\"cf5\", \"col\":\"f4\", \"type\":\"bigint\"}," +
      "         \"f6\":{\"cf\":\"cf6\", \"col\":\"f6\", \"type\":\"smallint\"}," +
      "         \"f7\":{\"cf\":\"cf7\", \"col\":\"f7\", \"type\":\"string\"}," +
      "         \"f8\":{\"cf\":\"cf8\", \"col\":\"f8\", \"type\":\"tinyint\"}" +
      "       }" +
      "     }"

    //數據寫入Hbase
    ds1.write
      .format("org.apache.hadoop.hbase.spark")
      .option(HBaseTableCatalog.tableCatalog, catalog)
      .option(HBaseTableCatalog.newTable, 5)
      .mode(SaveMode.Overwrite) //寫入5個分區
      .save()

    //讀取Hbase數據
    val ds2: DataFrame = spark.read
      .format("org.apache.hadoop.hbase.spark")
      .option(HBaseTableCatalog.tableCatalog, catalog)
      .load()
    ds2.show(10)

    /**
      * +------------+-----+---+---+------------+-----+-----+---+---+
      * |          f7|   f1| f4| f6|          f0|   f3|   f2| f5| f8|
      * +------------+-----+---+---+------------+-----+-----+---+---+
      * |String:Hbase| true|100|100|row100:Hbase|100.0|100.0|100|100|
      * |String:Hbase|false|101|101|row101:Hbase|101.0|101.0|101|101|
      * |String:Hbase| true|102|102|row102:Hbase|102.0|102.0|102|102|
      * |String:Hbase|false|103|103|row103:Hbase|103.0|103.0|103|103|
      * |String:Hbase| true|104|104|row104:Hbase|104.0|104.0|104|104|
      * |String:Hbase|false|105|105|row105:Hbase|105.0|105.0|105|105|
      * |String:Hbase| true|106|106|row106:Hbase|106.0|106.0|106|106|
      * |String:Hbase|false|107|107|row107:Hbase|107.0|107.0|107|107|
      * |String:Hbase| true|108|108|row108:Hbase|108.0|108.0|108|108|
      * |String:Hbase|false|109|109|row109:Hbase|109.0|109.0|109|109|
      * +------------+-----+---+---+------------+-----+-----+---+---+
      */
  }
}

case class HBaseRecord(f0: String, f1: Boolean, f2: Double, f3: Float, f4: Int, f5: Long, f6: Short, f7: String, f8: Byte) {
  def this(i: Int, s: String) {
    this(s"row$i:$s", i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String:$s", i.toByte)
  }
}

6 Spark Streaming

Spark Streaming是Spark核心API擴展,用於構建彈性、高吞吐、容錯的在線數據流的流式處理程序

數據來源有多種:Kafla、Flume、TCP等

Spark Streaming中提供的高級抽象:Discretized stream,DStream表示一個連續的數據流,可以由來自數據源的輸入數據流來創建,也可以通過在其他DStream上轉換得到,一個DStream是由一個RDD序列來表示的,對DStream的操作都會轉換成對其里面的RDD的操作

  • 執行流程

Receiver task 是 7*24h 一直在執行,一直接收數據,將接收到的數據保存到 batch 中,假設 batch interval 為 5s,
那么把接收到的數據每隔 5s 切割到一個 batch,因為 batch 是沒有分布式計算的特性的,而 RDD 有,
所以把 batch 封裝到 RDD 中,又把 RDD 封裝到DStream 中進行計算,在第 5s 的時候,計算前 5s 的數據,
假設計算 5s 的數據只需要 3s,那么第 5-8s 一邊計算任務,一邊接收數據,第 9-11s 只是接收數據,然后在第 10s 的時
候,循環上面的操作。如果 job 執行時間大於 batch interval,那么未執行的數據會越攢越多,最終導致 Spark集群崩潰。

注意:Receiver (接收器)在新版本中已經去除了。

6.1 端口監聽案例

object SparkStreamingDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo1").setMaster("local[*]")
    //創建一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)
    val words: DStream[(String, Int)] = lines.flatMap(_.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    //行動算子打印
    words.print()
    
    //啟動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試如下
nc -lk 11111

6.2 對接Kafka

生產中這種是最常用的方式

object SparkStreamingDemo2 {
  def main(args: Array[String]): Unit = {
    val brokers = "cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092"
    val topic = "bigdata"
    val cgroup = "test"
    val params: Map[String, Object] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> cgroup,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val conf: SparkConf = new SparkConf().setAppName("demo2").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")
    //創建一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //Streaming對接kafka
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](List(topic), params)
    )

    kafkaDStream.print

    //啟動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 使用Kafka
kafka-console-producer --broker-list cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --topic bigdata

ConsumerRecord(topic = bigdata, partition = 0, offset = 13, CreateTime = 1587194334601, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = bigdata, partition = 0, offset = 14, CreateTime = 1587194335215, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = bigdata, partition = 0, offset = 15, CreateTime = 1587194335975, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
ConsumerRecord(topic = bigdata, partition = 0, offset = 16, CreateTime = 1587194336887, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
ConsumerRecord(topic = bigdata, partition = 0, offset = 17, CreateTime = 1587194337912, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = e)

6.3 Checkpoint

Spark的一種持久化方式,並不推薦

這種方式很容易做到,但是有以下的缺點:
多次輸出,結果必須滿足冪等性
事務性不可選
如果代碼變更不能從Checkpoint恢復,不過你可以同時運行新任務和舊任務,因為輸出結果具有等冪性

object SparkStreamingDemo3 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo3").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", createSSC)

    //啟動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }

  def createSSC() : StreamingContext = {
    val brokers = "cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092"
    val topic = "bigdata"
    val cgroup = "test"
    val params: Map[String, Object] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> cgroup,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val conf: SparkConf = new SparkConf().setAppName("demo2").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")
    //創建一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //設置檢查點
    ssc.checkpoint("./ck")

    //Streaming對接kafka
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](List(topic), params)
    )
    kafkaDStream.print

    ssc
  }
}
  • 使用Kafka
kafka-console-producer --broker-list cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --topic bigdata

ConsumerRecord(topic = bigdata, partition = 0, offset = 18, CreateTime = 1587195534875, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1)
ConsumerRecord(topic = bigdata, partition = 0, offset = 19, CreateTime = 1587195535127, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 2)
ConsumerRecord(topic = bigdata, partition = 0, offset = 20, CreateTime = 1587195535439, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 3)
ConsumerRecord(topic = bigdata, partition = 0, offset = 21, CreateTime = 1587195535903, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 4)

  • 將程序關閉,在Kafka中繼續寫入數據,在啟動程序

ConsumerRecord(topic = bigdata, partition = 0, offset = 22, CreateTime = 1587195646015, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 5)
ConsumerRecord(topic = bigdata, partition = 0, offset = 23, CreateTime = 1587195646639, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 6)
ConsumerRecord(topic = bigdata, partition = 0, offset = 24, CreateTime = 1587195647207, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 7)
ConsumerRecord(topic = bigdata, partition = 0, offset = 25, CreateTime = 1587195647647, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 8)

6.4 轉換算子

Transformation 含義
map(func) 通過函數func傳遞源DStream的每個元素,返回一個新的DStream。
flatMap(func) 類似於map,但是每個輸入項可以映射到0或多個輸出項。
filter(func) 通過只選擇func返回true的源DStream的記錄來返回一個新的DStream。
repartition(numPartitions) 重分區,通過創建或多或少的分區來更改此DStream中的並行度級別。
union(otherStream) 返回一個新的DStream,它包含源DStream和其他DStream中的元素的聯合。
count() 通過計算源DStream的每個RDD中的元素數量,返回一個新的單元素RDD DStream。
reduce(func) 使用func函數(函數接受兩個參數並返回一個參數)聚合源DStream的每個RDD中的元素,從而返回單元素RDDs的新DStream。這個函數應該是結合律和交換律的,這樣才能並行計算。
countByValue() 當對K類型的元素的DStream調用時,返回一個新的(K, Long)對的DStream,其中每個鍵的值是它在源DStream的每個RDD中的頻率。
reduceByKey(func, [numTasks]) 當對(K, V)對的DStream調用時,返回一個新的(K, V)對的DStream,其中每個鍵的值使用給定的reduce函數進行聚合。注意:默認情況下,這將使用Spark的默認並行任務數量(本地模式為2,在集群模式下,該數量由config屬性Spark .default.parallelism決定)來進行分組。我們可以傳遞一個可選的numTasks參數來設置不同數量的任務。
join(otherStream, [numTasks]) 當調用兩個(K, V)和(K, W)對的DStream時,返回一個新的(K, (V, W))對的DStream,其中包含每個Key的所有元素對。
cogroup(otherStream, [numTasks]) 當調用(K, V)和(K, W)對的DStream時,返回一個新的(K, Seq[V], Seq[W])元組DStream。
transform(func) 通過將RDD-to-RDD函數應用於源DStream的每個RDD,返回一個新的DStream。它可以用於應用DStream API中沒有公開的任何RDD操作。例如將數據流中的每個批處理與另一個數據集連接的功能並不直接在DStream API中公開。但是你可以很容易地使用transform來實現這一點。這帶來了非常強大的可能性。例如,可以通過將輸入數據流與預先計算的垃圾信息(也可能是使用Spark生成的)結合起來進行實時數據清理
updateStateByKey(func) 返回一個新的“state”DStream,其中每個Key的狀態通過將給定的函數應用於Key的前一個狀態和Key的新值來更新。這可以用於維護每個Key的任意狀態數據。要使用它,您需要執行兩個步驟:(1).定義狀態——狀態可以是任意數據類型;(2).定義狀態更新函數——用函數指定如何使用輸入流中的前一個狀態和新值更新狀態。
object SparkStreamingDemo4 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo4").setMaster("local[*]")
    //創建一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)

    //轉換成RDD操作
    val words: DStream[(String, Int)] = lines.transform(rdd => {
      rdd.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    })

    //行動算子打印
    words.print()

    //啟動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}

  • 監聽服務器,間隔10秒發送數據測試如下
nc -lk 11111

6.5 行動算子

Output Operation 含義
print() 在運行流應用程序的驅動程序節點上打印DStream中每批數據的前10個元素。這對於開發和調試非常有用。這在Python API中稱為pprint()。
saveAsTextFiles(prefix, [suffix]) 將此DStream的內容保存為文本文件。每個批處理間隔的文件名是根據前綴和后綴生成的:“prefix- time_in_ms [.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 將此DStream的內容保存為序列化Java對象的sequencefile。每個批處理間隔的文件名是根據前綴和后綴生成的:“prefix- time_in_ms [.suffix]”。這在Python API中是不可用的。
saveAsHadoopFiles(prefix, [suffix]) 將這個DStream的內容保存為Hadoop文件。每個批處理間隔的文件名是根據前綴和后綴生成的:“prefix- time_in_ms [.suffix]”。這在Python API中是不可用的。
foreachRDD(func) 對流生成的每個RDD應用函數func的最通用輸出操作符。這個函數應該將每個RDD中的數據推送到外部系統,例如將RDD保存到文件中,或者通過網絡將其寫入數據庫。請注意,函數func是在運行流應用程序的驅動程序進程中執行的,其中通常會有RDD操作,這將強制流RDDs的計算。在func中創建遠程連接時可以使用foreachPartition 替換foreach操作以降低系統的總體吞吐量

6.6 有狀態轉換

使用updateStateByKey配合檢查點,可以做到從頭開始保存數據。

object SparkStreamingDemo5 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo5").setMaster("local[*]")
    //創建一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
    //使用updateStateByKey必須設置檢查點
    ssc.checkpoint("./ck")

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)

    def f(seq: Seq[Int], opt: Option[Int]): Some[Int] = {
      Some(seq.sum + opt.getOrElse(0)
      )
    }

    //使用updateStateByKey,根據Key保存前面接收序列里的數據為一個序列
    val words: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(f)

    //行動算子打印
    words.print()

    //啟動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試如下
nc -lk 11111

(aa,1)
(dd,1)
(bb,1)
(cc,1)

  • 間隔10秒后

(aa,3)
(dd,1)
(bb,1)
(cc,3)

6.7 窗口函數

窗口計算,允許你在滑動的數據窗口上應用轉換。

每當窗口滑過源DStream時,屬於該窗口的源RDDs就被組合起來並對其進行操作,從而生成窗口化DStream的RDDs。

上圖中操作應用於最后3個時間單位的數據,並以2個時間單位進行移動。這表明任何窗口操作都需要指定兩個參數:

窗口長度(windowLength)——窗口的持續時間

滑動間隔(slideInterval)——執行窗口操作的間隔

這兩個參數必須是批處理間隔的倍數

Transformation 含義
window(windowLength, slideInterval) 返回一個新的DStream,它是基於源DStream的窗口批次計算的。
countByWindow(windowLength, slideInterval) 返回流中元素的滑動窗口計數。
reduceByWindow(func, windowLength, slideInterval) 返回一個新的單元素流,該流是使用func在滑動間隔上聚合流中的元素創建的。這個函數應該是結合律和交換律的,這樣才能並行地正確計算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 當對(K, V)對的DStream調用時,返回一個新的(K, V)對的DStream,其中每個Key的值使用給定的reduce函數func在滑動窗口中分批聚合。注意:默認情況下,這將使用Spark的默認並行任務數量(本地模式為2,在集群模式下,該數量由config屬性Spark .default.parallelism決定)來進行分組。您可以傳遞一個可選的numTasks參數來設置不同數量的任務。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上面reduceByKeyAndWindow()的一個更有效的版本,其中每個窗口的reduce值是使用前一個窗口的reduce值增量計算的。這是通過減少進入滑動窗口的新數據和“反向減少”離開窗口的舊數據來實現的。例如,在窗口滑動時“添加”和“減去”鍵的計數。但是,它只適用於“可逆約簡函數”,即具有相應“逆約簡”函數的約簡函數(取invFunc參數)。與reduceByKeyAndWindow類似,reduce任務的數量可以通過一個可選參數進行配置。注意,必須啟用checkpoint才能使用此操作。
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當對(K, V)對的DStream調用時,返回一個新的(K, Long)對的DStream,其中每個Key的值是它在滑動窗口中的頻率。與reduceByKeyAndWindow類似,reduce任務的數量可以通過一個可選參數進行配置。
object SparkStreamingDemo6 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo6").setMaster("local[*]")
    //創建一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)
    //    Duration.of(10,TimeUnit.SECONDS)

    //使用窗口,比封裝數據時間多一倍,意思是相當於包含兩個窗口,滑動間隔為一個窗口
    val words: DStream[(String, Int)] = lines.window(Seconds(10), Seconds(5)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    //行動算子打印
    words.print()

    //啟動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試如下
nc -lk 11111

每間隔5秒輸如一行,結果集如下

(aa,1)
(bb,1)
(cc,1)


(aa,2)
(bb,2)
(cc,1)


(aa,2)
(bb,1)
(cc,1)

  • 圖解

7 Spark內存管理

7.1 堆內和堆外內存

作為一個JVM進程,Executor的內存管理建立在JVM的內存管理之上,Spark對JVM的堆內(On-head)空間進行了更為詳細的分配,以充分利用內存。同時,Spark引入了堆外(Off-head)內存,使之可以直接在工作節點的系統內存中開辟空間,進一步優化了內存的使用。堆內內存受到JVM統一管理,堆外內存式直接向操作系統進行內存的申請和釋放。

  • 堆內內存

堆內內存的大小,由Spark應用程序啟動時的 executor-memoryspark.executor.memory參數配置,Executor內運行的並發任務共享JVM堆內內存,這些任務在緩存RDD數據和廣播(Broadcast)數據時占用的內存被規划為儲存(Storage)內存,而這些任務在執行Shuffle時占用的內存被規划委執行(Executor)內存,剩余的部分不做特殊規划,那些Spark內部的對象實例,或者用戶定義的Spark應用程序中的對象實例,均占用剩余的空間,不同的管理模式下,這三部分占用的空間大小各部相同

Spark對堆內內存的管理是一種邏輯上的“規划式”管理,因為對象實際占用內存的申請和釋放都是由JVM完成,Spark只能在申請后和釋放前記錄這些內存

申請內存流程如下:

  1. Spark記錄該對象釋放的內存,刪除該對象的引用
  2. 等待JVM的垃圾回收機制釋放該對象占用的堆內內存

JVM的對象可以序列化的方式儲存,序列化的過程是將對象轉換成為二進制字節流,本質上可以理解為將非連續空間的鏈式儲存轉化為連續空間或塊式儲存,在訪問時則需要進行序列化的逆過程--反序列化,將字節流轉化成對象,序列化的方式可以節省存儲空間,但增加了內存的讀取時候的計算開銷

對於 Spark 中序列化的對象,由於是字節流的形式,其占用的內存大小可直接計算,而對於非序列化的對象,其占用的內存是通過周期性地采樣近似估算而得,即並不是每次新增的數據項都會計算一次占用的內存大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際內存有可能遠遠超出預期[2]。此外,在被 Spark 標記為釋放的對象實例,很有可能在實際上並沒有被 JVM 回收,導致實際可用的內存小於 Spark 記錄的可用內存。所以 Spark 並不能准確記錄實際可用的堆內內存,從而也就無法完全避免內存溢出(OOM, Out of Memory)的異常。

雖然不能精准控制堆內內存的申請和釋放,但 Spark 通過對存儲內存和執行內存各自獨立的規划管理,可以決定是否要在存儲內存里緩存新的 RDD,以及是否為新的任務分配執行內存,在一定程度上可以提升內存的利用率,減少異常的出現。

  • 堆外內存

為了進一步優化內存的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,存儲經過序列化的二進制數據。利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時不再基於 Tachyon,而是與堆外的執行內存一樣,基於 JDK Unsafe API 實現[3]),Spark 可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內存可以被精確地申請和釋放,而且序列化的數據占用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。

在默認情況下堆外內存並不啟用,可通過配置 spark.memory.offHeap.enabled 參數啟用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的划分方式相同,所有運行中的並發任務共享存儲內存和執行內存。

7.2 內存空間管理

7.2.1 靜態內存管理

在 Spark 最初采用的靜態內存管理機制下,存儲內存、執行內存和其他內存的大小在 Spark 應用程序運行期間均為固定的,但用戶可以應用程序啟動前進行配置

  • 靜態內存管理圖-堆內

可用的堆內內存的大小需要按照下面的方式計算

  1. 可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
  2. 可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最后可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這么一塊保險區域,降低因實際內存超出當前預設范圍而導致 OOM 的風險(上文提到,對於非序列化對象的內存采樣估算會產生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規划,在具體使用時 Spark 並沒有區別對待,和“其它內存”一樣交給了 JVM 去管理。

堆外的空間分配較為簡單,只有存儲內存和執行內存,如圖所示。可用的執行內存和存儲內存占用的空間大小直接由參數 spark.memory.storageFraction 決定,由於堆外內存占用的空間可以被精確計算,所以無需再設定保險區域。

  • 靜態內存管理圖- 堆外

靜態內存管理機制實現起來較為簡單,但如果用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或做相應的配置,很容易造成"一半海水,一半火焰"的局面,即存儲內存和執行內存中的一方剩余大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的內容以存儲新的內容。由於新的內存管理機制的出現,這種方式目前已經很少有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。

7.2.2 統一內存管理

  • 動態占用機制圖

憑借統一內存管理機制,Spark 在一定程度上提高了堆內和堆外內存資源的利用率,降低了開發者維護 Spark 內存的難度,但並不意味着開發者可以高枕無憂。譬如,所以如果存儲內存的空間太大或者說緩存的數據過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的 RDD 數據通常都是長期駐留內存的 。所以要想充分發揮 Spark 的性能,需要開發者進一步了解存儲內存和執行內存各自的管理方式和實現原理。

7.2.3 存儲內存管理

  • RDD 的持久化機制

彈性分布式數據集(RDD)作為 Spark 最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上創建,或者在其他已有的 RDD 上執行轉換(Transformation)操作產生一個新的 RDD。轉換后的 RDD 與原始的 RDD 之間產生的依賴關系,構成了血統(Lineage)。憑借血統,Spark 保證了每一個 RDD 都可以被重新恢復。但 RDD 的所有轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 才會創建任務讀取 RDD,然后真正觸發轉換的執行。

Task 在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查 Checkpoint 或按照血統重新計算。所以如果一個 RDD 上要執行多次行動,可以在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在后面的行動時提升計算速度。事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,便可以對緩存 RDD 時使用的內存做統一的規划和管 理 (存儲內存的其他應用場景,如緩存 broadcast 數據,暫時不在本文的討論范圍之內)。

RDD 的持久化由 Spark 的 Storage 模塊 [7] 負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。Storage 模塊在邏輯上以 Block 為基本存儲單位,RDD 的每個 Partition 經過處理后唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而 Slave 需要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。

  • Storage 模塊示意圖

在對 RDD 持久化時,Spark 規定的存儲級別如下

  • DISK_ONLY:持久化到磁盤
  • DISK_ONLY_2:持久化到磁盤並且存一個副本(2個文件)
  • MEMORY_ONLY:持久化到內存
  • MEMORY_ONLY_2:持久化到內存並且存一個副本(2個文件)
  • MEMORY_ONLY_SER:持久化到內存,並且序列化
  • MEMORY_ONLY_SER_2:持久化到內存,並且序列化,還要存一個副本(2個文件)
  • MEMORY_AND_DISK:持久化到內存和磁盤
  • MEMORY_AND_DISK_2:持久化到內存和磁盤並且存一個副本(2個文件)
  • MEMORY_AND_DISK_SER:持久化到內存和磁盤,並且序列化
  • MEMORY_AND_DISK_SER_2:持久化到內存和磁盤,並且序列化,還要存一個副本(2個文件)
  • OFF_HEAP:持久化在堆外內存中,Spark自己管理的內存

通過對數據結構的分析,可以看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:

  • 存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗余備份。OFF_HEAP 則是只在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其他位置。
  • 存儲形式:Block 緩存到存儲內存后,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
  • 副本數量:大於 1 時需要遠程冗余備份到其他節點。如 DISK_ONLY_2 需要遠程備份 1 個副本。
  • RDD 緩存的過程

RDD 在緩存到存儲內存之前,Partition 中的數據一般以迭代器(Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。通過 Iterator 可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些 Record 的對象實例在邏輯上占用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不同 Record 的空間並不連續。

RDD 在緩存到存儲內存之后,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中占用一塊連續的空間。將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為"展開"(Unroll)。Block 有序列化和非序列化兩種存儲格式,具體以哪種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲所有的對象實例,序列化的 Block 則以 SerializedMemoryEntry的數據結構定義,用字節緩沖區(ByteBuffer)來存儲二進制數據。每個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的 Block 對象的實例[6],對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。

因為不能保證存儲空間可以一次容納 Iterator 中的所有數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時占位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行。對於序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,采樣估算其所需的 Unroll 空間並進行申請,空間不足時可以中斷,釋放已占用的 Unroll 空間。如果最終 Unroll 成功,當前 Partition 所占用的 Unroll 空間被轉換為正常的緩存 RDD 的存儲空間。

  • Spark Unroll 示意圖

在靜態內存管理時,Spark 在存儲內存中專門划分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態占用機制進行處理。

  • 淘汰和落盤

由於同一個 Executor 的所有的計算任務共享有限的存儲內存空間,當有新的 Block 需要緩存但是剩余空間不足且無法動態占用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接刪除該 Block。

存儲內存的淘汰規則為:

  • 被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存
  • 新舊 Block 不能屬於同一個 RDD,避免循環淘汰
  • 舊 Block 所屬 RDD 不能處於被讀狀態,避免引發一致性問題
  • 遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。

落盤的流程則比較簡單,如果其存儲級別符合_useDisk 為 true 的條件,再根據其_deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最后將數據存儲到磁盤,在 Storage 模塊中更新其信息。

7.2.4 執行內存管理

  • 多任務間內存分配

Executor 內運行的任務同樣共享執行內存,Spark 用一個 HashMap 結構保存了任務到內存耗費的映射。每個任務可占用的執行內存大小的范圍為 1/2N ~ 1/N,其中 N 為當前 Executor 內正在運行的任務的個數。每個任務在啟動之時,要向 MemoryManager 請求申請最少為 1/2N 的執行內存,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行內存,該任務才可以被喚醒

  • Shuffle 的內存占用

執行內存主要用來存儲任務在執行 Shuffle 時占用的內存,Shuffle 是按照一定規則對 RDD 數據重新分區的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:

Shuffle Write

1、若在 map 端選擇普通的排序方式,會采用 ExternalSorter 進行外排,在內存中存儲數據時主要占用堆內執行空間。

2、若在 map 端選擇 Tungsten 的排序方式,則采用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時可以占用堆外或堆內執行空間,取決於用戶是否開啟了堆外內存以及堆外執行內存是否足夠。

Shuffle Read

1、在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時占用堆內執行空間。

2、如果需要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,占用堆內執行空間。

在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程中所有數據並不能都保存到該哈希表中,當這個哈希表占用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其全部內容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸並(Merge)。

Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計划,解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否采用 Tungsten 排序。Tungsten 采用的頁式內存管理機制建立在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心數據具體存儲在堆內還是堆外。每個內存頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 為 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每個 Task 申請到的內存頁。

Tungsten 頁式管理下的所有內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:

  • 頁號:占 13 位,唯一標識一個內存頁,Spark 在申請內存頁之前要先申請空閑頁號。
  • 頁內偏移量:占 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。

有了統一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只需要對指針進行排序,並且無需反序列化,整個過程非常高效,對於內存訪問效率和 CPU 使用效率帶來了明顯的提升。

Spark 的存儲內存和執行內存有着截然不同的管理方式:對於存儲內存來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的數據,在 Tungsten 排序中甚至抽象成為頁式內存管理,開辟了全新的 JVM 內存管理機制。

8 常規性能調優

8.1 最優資源配置

Spark性能調優的第一步,就是為任務分配更多的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。

  • 資源的分配在使用腳本提交Spark任務時進行指定,標准的Spark任務提交腳本如下:
spark-submit \
 
--class com.xxx.spark.TestSpark \
 
--num-executors 80 \
 
--driver-memory 6g \
 
--executor-memory 6g \
 
--executor-cores 3 \
 
/usr/opt/modules/spark/jar/spark.jar 
名稱 說明
--num-executors 配置Executor的數量
--driver-memory 配置Driver內存(影響不大)
--executor-memory 配置每個Executor的內存大小
--executor-cores 配置每個Executor的CPU core數量
  • 調節原則:盡量將任務分配的資源調節到可以使用的資源的最大限度。

  • 對於具體資源的分配,我們分別討論Spark的兩種Cluster運行模式:

    • 第一種是Spark Standalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有15台機器,每台機器為8G內存,2個CPU core,那么就指定15個Executor,每個Executor分配8G內存,2個CPU core。
    • 第二種是Spark Yarn模式,由於Yarn使用資源隊列進行資源的分配和調度,在表寫submit腳本的時候,就根據Spark作業要提交到的資源隊列,進行資源的分配,比如資源隊列有400G內存,100個CPU core,那么指定50個Executor,每個Executor分配8G內存,2個CPU core。
  • 各項資源進行了調節后,得到的性能提升如下表

名稱 解析
增加Executor個數 在資源允許的情況下,增加Executor的個數可以提高執行task的並行度。
比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,
如果將Executor的個數增加到8個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。
增加每個Executor的CPU core個數 在資源允許的情況下,增加每個Executor的Cpu core個數,可以提高執行task的並行度。
比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,
如果將每個Executor的CPU core個數增加到4個(資源允許的情況下),
那么可以並行執行16個task,此時的並行能力提升了一倍
增加每個Executor的內存量 在資源允許的情況下,增加每個Executor的內存量以后,對性能的提升有三點:
可以緩存更多的數據(即對RDD進行cache),寫入磁盤的數據相應減少,
甚至可以不寫入磁盤,減少了可能的磁盤IO;
可以為shuffle操作提供更多內存,即有更多空間來存放reduce端拉取的數據,
寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;
可以為task的執行提供更多內存,在task的執行過程中可能創建很多對象,
內存較小時會引發頻繁的GC,增加內存后,可以避免頻繁的GC,提升整體性能。
  • 生產環境Spark submit腳本配置
spark-submit \
 
--class com.xxx.spark.WordCount \
 
--num-executors 80 \
 
--driver-memory 6g \
 
--executor-memory 6g \
 
--executor-cores 3 \
 
--master yarn-cluster \
 
--queue root.default \
 
--conf spark.yarn.executor.memoryOverhead=2048 \
 
--conf spark.core.connection.ack.wait.timeout=300 \
 
/usr/local/spark/spark.jar

參數配置參考值:

--num-executors:50~100

--driver-memory:1G~5G

--executor-memory:6G~10G

--executor-cores:3

--master:實際生產環境一定使用yarn-cluster

8.2 RDD優化

8.2.1 RDD復用

  • 在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重復的計算

  • 對上圖中的RDD計算架構進行修改,得到如圖所示的優化結果

8.2.2 RDD持久化

在Spark中,當多次對同一個RDD執行算子操作時,每一次都會對這個RDD以之前的父RDD重新計算一次,這種情況是必須要避免的,對同一個RDD的重復計算是對資源的極大浪費,因此,必須對多次使用的RDD進行持久化,通過持久化將公共RDD的數據緩存到內存/磁盤中,之后對於公共RDD的計算都會從內存/磁盤中直接獲取RDD數據。

  • 對於RDD的持久化,有兩點需要說明:
    1. RDD的持久化是可以進行序列化的,當內存無法將RDD的數據完整的進行存放的時候,可以考慮使用序列化的方式減小數據體積,將數據完整存儲在內存中。
    2. 如果對於數據的可靠性要求很高,並且內存充足,可以使用副本機制,對RDD數據進行持久化。當持久化啟用了復本機制時,對於持久化的每個數據單元都存儲一個副本,放在其他節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不需要重新計算,還可以使用另外一個副本。

8.2.3 RDD盡可能早的filter操作

  • 獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數據,進而減少對內存的占用,從而提升Spark作業的運行效率。

8.3 並行度調節

Spark作業中的並行度指各個stage的task的數量。

如果並行度設置不合理而導致並行度過低,會導致資源的極大浪費,例如,20個Executor,每個Executor分配3個CPU core,而Spark作業有40個task,這樣每個Executor分配到的task個數是2個,這就使得每個Executor有一個CPU core空閑,導致資源的浪費。

理想的並行度設置,應該是讓並行度與資源相匹配,簡單來說就是在資源允許的前提下,並行度要設置的盡可能大,達到可以充分利用集群資源。合理的設置並行度,可以提升整個Spark作業的性能和運行速度。

Spark官方推薦,task數量應該設置為Spark作業總CPU core數量的2~3倍。之所以沒有推薦task數量與CPU core總數相等,是因為task的執行時間不同,有的task執行速度快而有的task執行速度慢,如果task數量與CPU core總數相等,那么執行快的task執行完成后,會出現CPU core空閑的情況。如果task數量設置為CPU core總數的2~3倍,那么一個task執行完畢后,CPU core會立刻執行下一個task,降低了資源的浪費,同時提升了Spark作業運行的效率。

  • Spark作業並行度的設置如下:
val conf = new SparkConf().set("spark.default.parallelism", "500")

8.4 廣播大變量

默認情況下,task中的算子中如果使用了外部的變量,每個task都會獲取一份變量的復本,這就造成了內存的極大消耗。一方面,如果后續對RDD進行持久化,可能就無法將RDD數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另一方面,task在創建對象的時候,也許會發現堆內存無法存放新創建的對象,這就會導致頻繁的GC,GC會導致工作線程停止,進而導致Spark暫停工作一段時間,嚴重影響Spark性能。

假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被所有task共用,此時會在500個task中產生500個副本,耗費集群10G的內存,如果使用了廣播變量, 那么每個Executor保存一個副本,一共消耗400M內存,內存消耗減少了5倍。

廣播變量在每個Executor保存一個副本,此Executor的所有task共用此廣播變量,這讓變量產生的副本數量大大減少。

在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中嘗試獲取變量,如果本地沒有,BlockManager就會從Driver或者其他節點的BlockManager上遠程拉取變量的復本,並由本地的BlockManager進行管理;之后此Executor的所有task都會直接從本地的BlockManager中獲取變量。

8.5 Kryo序列化

默認情況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現Serializable接口即可,但是,Java序列化機制的效率不高,序列化速度慢並且序列化后的數據所占用的空間依然較大。

Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫,是因為它不支持所有對象的序列化,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。

  • 自定義類的Kryo序列化注冊方式的實例代碼如下
public class MyKryoRegistrator implements KryoRegistrator
{
  @Override
  public void registerClasses(Kryo kryo)
  {
    kryo.register(StartupReportLogs.class);
  }
}
  • Kryo序列化機制配置代碼如下
//創建SparkConf對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化庫中注冊自定義的類集合,如果要使用Java序列化庫,需要把該行屏蔽掉
conf.set("spark.kryo.registrator", "com.xxx.MyKryoRegistrator"); 

8.6 調節本地化等待時長

Spark作業運行過程中,Driver會對每一個stage的task進行分配。根據Spark的task分配算法,Spark希望task能夠運行在它要計算的數據算在的節點(數據本地化思想),這樣就可以避免數據的網絡傳輸。通常來說,task可能不會被分配到它處理的數據所在的節點,因為這些節點可用的資源可能已經用盡,此時,Spark會等待一段時間,默認3s,如果等待指定時間后仍然無法在指定節點運行,那么會自動降級,嘗試將task分配到比較差的本地化級別所對應的節點上,比如將task分配到離它要計算的數據比較近的一個節點,然后進行計算,如果當前級別仍然不行,那么繼續降級。

當task要處理的數據不在task所在節點上時,會發生數據的傳輸。task會通過所在節點的BlockManager獲取數據,BlockManager發現數據不在本地時,戶通過網絡傳輸組件從數據所在節點的BlockManager處獲取數據。

網絡傳輸數據的情況是我們不願意看到的,大量的網絡傳輸會嚴重影響性能,因此,我們希望通過調節本地化等待時長,如果在等待時長這段時間內,目標節點處理完成了一部分task,那么當前的task將有機會得到執行,這樣就能夠改善Spark作業的整體性能。

  • Spark本地化等級
名稱 解析
PROCESS_LOCAL 進程本地化,task和數據在同一個Executor中,性能最好。
NODE_LOCAL 節點本地化,task和數據在同一個節點中,但是task和數據不在同一個Executor中,數據需要在進程間進行傳輸。
RACK_LOCAL 機架本地化,task和數據在同一個機架的兩個節點上,數據需要通過網絡在節點之間進行傳輸。
NO_PREF 對於task來說,從哪里獲取都一樣,沒有好壞之分。
ANY task和數據可以在集群的任何地方,而且不在一個機架中,性能最差。

在Spark項目開發階段,可以使用client模式對程序進行測試,此時,可以在本地看到比較全的日志信息,日志信息中有明確的task數據本地化的級別,如果大部分都是PROCESS_LOCAL,那么就無需進行調節,但是如果發現很多的級別都是NODE_LOCAL、ANY,那么需要對本地化的等待時長進行調節,通過延長本地化等待時長,看看task的本地化級別有沒有提升,並觀察Spark作業的運行時間有沒有縮短。

注意,過猶不及,不要將本地化等待時長延長地過長,導致因為大量的等待時長,使得Spark作業的運行時間反而增加了。

  • 本地化等待市場設置如下:
val conf = new SparkConf().set("spark.locality.wait", "6")

9 算子調優

9.1 mapPartitions

普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區進行操作。如果是普通的map算子,假設一個partition有1萬條數據,那么map算子中的function要執行1萬次,也就是對每個元素進行操作。

如果是mapPartition算子,由於一個task處理一個RDD的partition,那么一個task只會執行一次function,function一次接收所有的partition數據,效率比較高。

比如,當要把RDD中的所有數據通過JDBC寫入數據,如果使用map算子,那么需要對RDD中的每一個元素都創建一個數據庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區的數據,只需要建立一個數據庫連接。

mapPartitions算子也存在一些缺點:對於普通的map操作,一次處理一條數據,如果在處理了2000條數據后內存不足,那么可以將已經處理完的2000條數據從內存中垃圾回收掉;但是如果使用mapPartitions算子,但數據量非常大時,function一次處理一個分區的數據,如果一旦內存不足,此時無法回收內存,就可能會OOM,即內存溢出。

因此,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)

在項目中,應該首先估算一下RDD的數據量、每個partition的數據量,以及分配給每個Executor的內存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。

9.2 foreachPartition優化數據庫操作

在生產環境中,通常使用foreachPartition算子來完成數據庫的寫入,通過foreachPartition算子的特性,可以優化寫數據庫的性能。

如果使用foreach算子完成數據庫的操作,由於foreach算子是遍歷RDD的每條數據,因此,每條數據都會建立一個數據庫連接,這是對資源的極大浪費,因此,對於寫數據庫操作,我們應當使用foreachPartition算子。

與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區作為遍歷對象,一次處理一個分區的數據,也就是說,如果涉及數據庫的相關操作,一個分區的數據只需要創建一次數據庫連接

  • 使用了foreachPartition算子后,可以獲得以下的性能提升:
    1. 對於我們寫的function函數,一次處理一整個分區的數據;
    2. 對於一個分區內的數據,創建唯一的數據庫連接;
    3. 只需要向數據庫發送一次SQL語句和多組參數;

在生產環境中,全部都會使用foreachPartition算子完成數據庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區的數據量特別大,可能會造成OOM,即內存溢出。

9.3 filter與coalesce的配合使用

在Spark任務中我們經常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,但是一旦進過filter過濾后,每個分區的數據量有可能會存在較大差異

  • 如上圖我們可以發現兩個問題:

    1. 每個partition的數據量變小了,如果還按照之前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;
    2. 每個partition的數據量不一樣,會導致后面的每個task處理每個partition數據的時候,每個task要處理的數據量不同,這很有可能導致數據傾斜問題。
  • 如圖,第二個分區的數據過濾后只剩100條,而第三個分區的數據過濾后剩下800條,在相同的處理邏輯下,第二個分區對應的task處理的數據量與第三個分區對應的task處理的數據量差距達到了8倍,這也會導致運行速度可能存在數倍的差距,這也就是數據傾斜問題。

  • 針對上述的兩個問題,我們分別進行分析:

    1. 針對第一個問題,既然分區的數據量變小了,我們希望可以對分區數據進行重新分配,比如將原來4個分區的數據轉化到2個分區中,這樣只需要用后面的兩個task進行處理即可,避免了資源的浪費。
    2. 針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區數據重新分配,讓每個partition中的數據量差不多,這就避免了數據傾斜問題。
  • 那么具體應該如何實現上面的解決思路?我們需要coalesce算子。

repartition與coalesce都可以用來進行重分區,其中repartition只是coalesce接口中shuffle為true的簡易實現,coalesce默認情況下不進行shuffle,但是可以通過參數進行設置。

  • 假設我們希望將原本的分區個數A通過重新分區變為B,那么有以下幾種情況:
  1. A > B(多數分區合並為少數分區)

① A與B相差值不大

此時使用coalesce即可,無需shuffle過程。

② A與B相差值很大

此時可以使用coalesce並且不啟用shuffle過程,但是會導致合並過程性能低下,所以推薦設置coalesce的第二個參數為true,即啟動shuffle過程。

  1. A < B(少數分區分解為多數分區)

此時使用repartition即可,如果使用coalesce需要將shuffle設置為true,否則coalesce無效。

我們可以在filter操作之后,使用coalesce算子針對每個partition的數據量各不相同的情況,壓縮partition的數量,而且讓每個partition的數據量盡量均勻緊湊,以便於后面的task進行計算操作,在某種程度上能夠在一定程度上提升性能。

注意:local模式是進程內模擬集群運行,已經對並行度和分區數量有了一定的內部優化,因此不用去設置並行度和分區數量。

9.4 repartition解決SparkSQL低並行度問題

在常規性能調優中我們講解了並行度的調節策略,但是,並行度的設置對於Spark SQL是不生效的,用戶設置的並行度只對於Spark SQL以外的所有Spark的stage生效。

Spark SQL的並行度不允許用戶自己指定,Spark SQL自己會默認根據hive表對應的HDFS文件的split個數自動設置Spark SQL所在的那個stage的並行度,用戶自己通spark.default.parallelism參數指定的並行度,只會在沒Spark SQL的stage中生效。

由於Spark SQL所在stage的並行度無法手動設置,如果數據量較大,並且此stage中后續的transformation操作有着復雜的業務邏輯,而Spark SQL自動設置的task數量很少,這就意味着每個task要處理為數不少的數據量,然后還要執行非常復雜的處理邏輯,這就可能表現為第一個有Spark SQL的stage速度很慢,而后續的沒有Spark SQL的stage運行速度非常快。

為了解決Spark SQL無法設置並行度和task數量的問題,我們可以使用repartition算子

Spark SQL這一步的並行度和task數量肯定是沒有辦法去改變了,但是,對於Spark SQL查詢出來的RDD,立即使用repartition算子,去重新進行分區,這樣可以重新分區為多個partition,從repartition之后的RDD操作,由於不再設計Spark SQL,因此stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數據並執行復雜的算法邏輯。使用repartition算子的前后對比如下圖

9.5 reduceByKey本地聚合

reduceByKey相較於普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合,map端會先對本地的數據進行combine操作,然后將數據寫入給下個stage的每個task創建的文件中,也就是在map端,對每一個key對應的value,執行reduceByKey算子函數。reduceByKey算子的執行過程如下圖

  • 使用reduceByKey對性能的提升如下:

    1. 本地聚合后,在map端的數據量變少,減少了磁盤IO,也減少了對磁盤空間的占用;
    2. 本地聚合后,下一個stage拉取的數據量變少,減少了網絡傳輸的數據量;
    3. 本地聚合后,在reduce端進行數據緩存的內存占用減少;
    4. 本地聚合后,在reduce端進行聚合的數據量減少。
  • 基於reduceByKey的本地聚合特征,我們應該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKey。reduceByKey與groupByKey的運行原理如下圖

groupByKey不會進行map端的聚合,而是將所有map端的數據shuffle到reduce端,然后在reduce端進行數據的聚合操作。由於reduceByKey有map端聚合的特性,使得網絡傳輸的數據量減小,因此效率要明顯高於groupByKey。

10 Shuffle調優

10.1 調節map端緩沖區大小

在Spark任務運行過程中,如果shuffle的map端處理的數據量比較大,但是map端緩沖的大小是固定的,可能會出現map端緩沖數據頻繁spill溢寫到磁盤文件中的情況,使得性能非常低下,通過調節map端緩沖的大小,可以避免頻繁的磁盤IO操作,進而提升Spark任務的整體性能。

map端緩沖的默認配置是32KB,如果每個task處理640KB的數據,那么會發生640/32 = 20次溢寫,如果每個task處理64000KB的數據,機會發生64000/32=2000此溢寫,這對於性能的影響是非常嚴重的。

  • map端緩沖的配置方法
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")

10.2 調節reduce端拉取數據緩沖區大小

Spark Shuffle過程中,shuffle reduce task的buffer緩沖區大小決定了reduce task每次能夠緩沖的數據量,也就是每次能夠拉取的數據量,如果內存資源較為充足,適當增加拉取數據緩沖區的大小,可以減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。

  • reduce端數據拉取緩沖區的大小可以通過spark.reducer.maxSizeInFlight參數進行設置,默認為48MB,該參數的設置方法
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")

10.3 調節reduce端拉取數據重試次數

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試。對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

  • reduce端拉取數據重試次數可以通過spark.shuffle.io.maxRetries參數進行設置,該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗,默認為3,該參數的設置方法
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")

10.4 調節reduce端拉取數據等待時間

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試,在一次失敗后,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。

  • reduce端拉取數據等待間隔可以通過spark.shuffle.io.retryWait參數進行設置,默認值為5s,該參數的設置方法
val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")

10.5 調節SortShuffle排序操作閥值

對於SortShuffleManager,如果shuffle reduce task的數量小於某一閾值則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合並成一個文件,並會創建單獨的索引文件。

當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大於shuffle read task的數量,那么此時map-side就不會進行排序了,減少了排序的性能開銷,但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

SortShuffleManager排序操作閾值的設置可以通過spark.shuffle.sort. bypassMergeThreshold這一參數進行設置,默認值為200

  • 該參數的設置方法如下:
val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")

11 JVM調優

11.1 降低cache操作的內存占比

11.1.1 靜態內存管理機制

根據Spark靜態內存管理機制,堆內存被划分為了兩塊,Storage和Execution。Storage主要用於緩存RDD數據和broadcast數據,Execution主要用於緩存在shuffle過程中產生的中間數據,Storage占系統內存的60%,Execution占系統內存的20%,並且兩者完全獨立。

在一般情況下,Storage的內存都提供給了cache操作,但是如果在某些情況下cache操作內存不是很緊張,而task的算子中創建的對象很多,Execution內存又相對較小,這回導致頻繁的minor gc,甚至於頻繁的full gc,進而導致Spark頻繁的停止工作,性能影響會很大。

在Spark UI中可以查看每個stage的運行情況,包括每個task的運行時間、gc時間等等,如果發現gc太頻繁,時間太長,就可以考慮調節Storage的內存占比,讓task執行算子函數式,有更多的內存可以使用。

Storage內存區域可以通過spark.storage.memoryFraction參數進行指定,默認為0.6,即60%,可以逐級向下遞減,如代碼清單所示:

  • 內存占比設置
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.4")
  • 統一內存管理機制

根據Spark統一內存管理機制,堆內存被划分為了兩塊,Storage和Execution。Storage主要用於緩存數據,Execution主要用於緩存在shuffle過程中產生的中間數據,兩者所組成的內存部分稱為統一內存,Storage和Execution各占統一內存的50%,由於動態占用機制的實現,shuffle過程需要的內存過大時,會自動占用Storage的內存區域,因此無需手動進行調節。

11.1.2 調節Executor堆外內存

Executor的堆外內存主要用於程序的共享庫、Perm Space、 線程Stack和一些Memory mapping等, 或者類C方式allocate object。

有時,如果你的Spark作業處理的數據量非常大,達到幾億的數據量,此時運行Spark作業會時不時地報錯,例如shuffle output file cannot find,executor lost,task lost,out of memory等,這可能是Executor的堆外內存不太夠用,導致Executor在運行的過程中內存溢出。

stage的task在運行的時候,可能要從一些Executor中去拉取shuffle map output文件,但是Executor可能已經由於內存溢出掛掉了,其關聯的BlockManager也沒有了,這就可能會報出shuffle output file cannot find,executor lost,task lost,out of memory等錯誤,此時,就可以考慮調節一下Executor的堆外內存,也就可以避免報錯,與此同時,堆外內存調節的比較大的時候,對於性能來講,也會帶來一定的提升。

默認情況下,Executor堆外內存上限大概為300多MB,在實際的生產環境下,對海量數據進行處理的時候,這里都會出現問題,導致Spark作業反復崩潰,無法運行,此時就會去調節這個參數,到至少1G,甚至於2G、4G。

  • Executor堆外內存的配置需要在spark-submit腳本里配置
--conf spark.yarn.executor.memoryOverhead=2048

11.1.3 調節鏈接等待時長

在Spark作業運行過程中,Executor優先從自己本地關聯的BlockManager中獲取某份數據,如果本地BlockManager沒有的話,會通過TransferService遠程連接其他節點上Executor的BlockManager來獲取數據。

如果task在運行過程中創建大量對象或者創建的對象較大,會占用大量的內存,這回導致頻繁的垃圾回收,但是垃圾回收會導致工作現場全部停止,也就是說,垃圾回收一旦執行,Spark的Executor進程就會停止工作,無法提供相應,此時,由於沒有響應,無法建立網絡連接,會導致網絡連接超時。

在生產環境下,有時會遇到file not found、file lost這類錯誤,在這種情況下,很有可能是Executor的BlockManager在拉取數據的時候,無法建立連接,然后超過默認的連接等待時長60s后,宣告數據拉取失敗,如果反復嘗試都拉取不到數據,可能會導致Spark作業的崩潰。這種情況也可能會導致DAGScheduler反復提交幾次stage,TaskScheduler返回提交幾次task,大大延長了我們的Spark作業的運行時間。

調節連接等待時長后,通常可以避免部分的XX文件拉取失敗、XX文件lost等報錯。

  • 此時,可以考慮調節連接的超時時長,連接等待時長需要在spark-submit腳本中進行設置,設置方式如下
--conf spark.core.connection.ack.wait.timeout=300

12 故障排除

12.1 控制reduce端緩沖大小以及避免OOM

在Shuffle過程,reduce端task並不是等到map端task將其數據全部寫入磁盤后再去拉取,而是map端寫一點數據,reduce端task就會拉取一小部分數據,然后立即進行后面的聚合、算子函數的使用等操作。

reduce端task能夠拉取多少數據,由reduce拉取數據的緩沖區buffer來決定,因為拉取過來的數據都是先放在buffer中,然后再進行后續的處理,buffer的默認大小為48MB。

reduce端task會一邊拉取一邊計算,不一定每次都會拉滿48MB的數據,可能大多數時候拉取一部分數據就處理掉了。

雖然說增大reduce端緩沖區大小可以減少拉取次數,提升Shuffle性能,但是有時map端的數據量非常大,寫出的速度非常快,此時reduce端的所有task在拉取的時候,有可能全部達到自己緩沖的最大極限值,即48MB,此時,再加上reduce端執行的聚合函數的代碼,可能會創建大量的對象,這可難會導致內存溢出,即OOM。

如果一旦出現reduce端內存溢出的問題,我們可以考慮減小reduce端拉取數據緩沖區的大小,例如減少為12MB。

在實際生產環境中是出現過這種問題的,這是典型的以性能換執行的原理。reduce端拉取數據的緩沖區減小,不容易導致OOM,但是相應的,reudce端的拉取次數增加,造成更多的網絡傳輸開銷,造成性能的下降。

​ 注意,要保證任務能夠運行,再考慮性能的優化。

12.2 JVM GC導致的shuffle文件拉取失敗

在Spark作業中,有時會出現shuffle file not found的錯誤,這是非常常見的一個報錯,有時出現這種錯誤以后,選擇重新執行一遍,就不再報出這種錯誤。

出現上述問題可能的原因是Shuffle操作中,后面stage的task想要去上一個stage的task所在的Executor拉取數據,結果對方正在執行GC,執行GC會導致Executor內所有的工作現場全部停止,比如BlockManager、基於netty的網絡通信等,這就會導致后面的task拉取數據拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執行就不會再出現這種錯誤。

可以通過調整reduce端拉取數據重試次數和reduce端拉取數據時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取數據的重試次數增加,並且每次失敗后等待的時間間隔加長。

  • JVM GC導致的shuffle文件拉取失敗
val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "60")
  .set("spark.shuffle.io.retryWait", "60s")

12.3 解決各種序列化導致的報錯

當Spark作業在運行過程中報錯,而且報錯信息中含有Serializable等類似詞匯,那么可能是序列化問題導致的報錯。

  • 序列化問題要注意以下三點:
    1. 作為RDD的元素類型的自定義類,必須是可以序列化的;
    2. 算子函數里可以使用的外部的自定義變量,必須是可以序列化的;
    3. 不可以在RDD的元素類型、算子函數里使用第三方的不支持序列化的類型,例如Connection。

12.4 解決算子函數返回NULL導致的問題

在一些算子函數里,需要我們有一個返回值,但是在一些情況下我們不希望有返回值,此時我們如果直接返回NULL,會報錯,例如Scala.Math(NULL)異常。

  • 如果你遇到某些情況,不希望有返回值,那么可以通過下述方式解決:
    1. 返回特殊值,不返回NULL,例如“-1”;
    2. 在通過算子獲取到了一個RDD之后,可以對這個RDD執行filter操作,進行數據過濾,將數值為-1的數據給過濾掉;
    3. 在使用完filter算子后,繼續調用coalesce算子進行優化。

12.5 解決YARN-CLIENT模式導致的網卡流量激增問題

  • YARN-client模式的運行原理圖

在YARN-client模式下,Driver啟動在本地機器上,而Driver負責所有的任務調度,需要與YARN集群上的多個Executor進行頻繁的通信

假設有100個Executor, 1000個task,那么每個Executor分配到10個task,之后,Driver要頻繁地跟Executor上運行的1000個task進行通信,通信數據非常多,並且通信品類特別高。這就導致有可能在Spark任務運行過程中,由於頻繁大量的網絡通訊,本地機器的網卡流量會激增。

注意,YARN-client模式只會在測試環境中使用,而之所以使用YARN-client模式,是由於可以看到詳細全面的log信息,通過查看log,可以鎖定程序中存在的問題,避免在生產環境下發生故障。

在生產環境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不會造成本地機器網卡流量激增問題,如果YARN-cluster模式下存在網絡通信的問題,需要運維團隊進行解決。

12.6 解決YARN-CLUSTER模式的JVM棧內存溢出無法執行問題

  • YARN-cluster模式的運行原理圖

當Spark作業中包含SparkSQL的內容時,可能會碰到YARN-client模式下可以運行,但是YARN-cluster模式下無法提交運行(報出OOM錯誤)的情況。

YARN-client模式下,Driver是運行在本地機器上的,Spark使用的JVM的PermGen的配置,是本地機器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,但是在YARN-cluster模式下,Driver運行在YARN集群的某個節點上,使用的是沒有經過配置的默認設置,PermGen永久代大小為82MB。

SparkSQL的內部要進行很復雜的SQL的語義解析、語法樹轉換等等,非常復雜,如果sql語句本身就非常復雜,那么很有可能會導致性能的損耗和內存的占用,特別是對PermGen的占用會比較大。

所以,此時如果PermGen的占用好過了82MB,但是又小於128MB,就會出現YARN-client模式下可以運行,YARN-cluster模式下無法運行的情況。

  • 解決上述問題的方法時增加PermGen的容量,需要在spark-submit腳本中對相關參數進行設置,設置方法下
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

通過上述方法就設置了Driver永久代的大小,默認為128MB,最大256MB,這樣就可以避免上面所說的問題。

12.7 解決SparkSQL導致的JVM棧內存溢出

當SparkSQL的sql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧內存溢出。

JVM棧內存溢出基本上就是由於調用的方法層級過多,產生了大量的,非常深的,超出了JVM棧深度限制的遞歸。(我們猜測SparkSQL有大量or語句的時候,在解析SQL時,例如轉換為語法樹或者進行執行計划的生成的時候,對於or的處理是遞歸,or非常多時,會發生大量的遞歸)

此時,建議將一條sql語句拆分為多條sql語句來執行,每條sql語句盡量保證100個以內的子句。根據實際的生產環境試驗,一條sql語句的or關鍵字控制在100個以內,通常不會導致JVM棧內存溢出。

12.8 持久化與checkpoint的使用

Spark持久化在大部分情況下是沒有問題的,但是有時數據可能會丟失,如果數據一旦丟失,就需要對丟失的數據重新進行計算,計算完后再緩存和使用,為了避免數據的丟失,可以選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統上(比如HDFS)。

一個RDD緩存並checkpoint后,如果一旦發現緩存丟失,就會優先查看checkpoint數據存不存在,如果有,就會使用checkpoint數據,而不用重新計算。也即是說,checkpoint可以視為cache的保障機制,如果cache失敗,就使用checkpoint的數據。

使用checkpoint的優點在於提高了Spark作業的可靠性,一旦緩存出現問題,不需要重新計算數據,缺點在於,checkpoint時需要將數據寫入HDFS等文件系統,對性能的消耗較大。

13 數據傾斜

參考:https://www.cnblogs.com/xiaodf/p/6055803.html#21

有的時候,我們可能會遇到大數據計算中一個最棘手的問題——數據傾斜,此時Spark作業的性能會比期望差很多。數據傾斜調優,就是使用各種技術方案解決不同類型的數據傾斜問題,以保證Spark作業的性能。

13.1 數據傾斜發生時的現象

  • 絕大多數task執行得都非常快,但個別task執行極慢。比如,總共有1000個task,997個task都在1分鍾之內執行完了,但是剩余兩三個task卻要一兩個小時。這種情況很常見。
  • 原本能夠正常執行的Spark作業,某天突然報出OOM(內存溢出)異常,觀察異常棧,是我們寫的業務代碼造成的。這種情況比較少見。

13.2 數據傾斜發生的原理

數據傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的數據量特別大的話,就會發生數據傾斜。比如大部分key對應10條數據,但是個別key卻對應了100萬條數據,那么大部分task可能就只會分配到10條數據,然后1秒鍾就運行完了;但是個別task可能分配到了100萬數據,要運行一兩個小時。因此,整個Spark作業的運行進度是由運行時間最長的那個task決定的。

因此出現數據傾斜的時候,Spark作業看起來會運行得非常緩慢,甚至可能因為某個task處理的數據量過大導致內存溢出。

下圖就是一個很清晰的例子:hello這個key,在三個節點上對應了總共7條數據,這些數據都會被拉取到同一個task中進行處理;而world和you這兩個key分別才對應1條數據,所以另外兩個task只要分別處理1條數據即可。此時第一個task的運行時間可能是另外兩個task的7倍,而整個stage的運行速度也由運行最慢的那個task所決定。

13.3 如何定位導致數據傾斜的代碼

數據傾斜只會發生在shuffle過程中。這里給大家羅列一些常用的並且可能會觸發shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導致的。

13.4 數據傾斜的解決方案

13.4.1 解決方案一:使用Hive ETL預處理數據

方案適用場景:導致數據傾斜的是Hive表。如果該Hive表中的數據本身很不均勻(比如某個key對應了100萬數據,其他key才對應了10條數據),而且業務場景需要頻繁使用Spark對Hive表執行某個分析操作,那么比較適合使用這種技術方案。

方案實現思路:此時可以評估一下,是否可以通過Hive來進行數據預處理(即通過Hive ETL預先對數據按照key進行聚合,或者是預先和其他表進行join),然后在Spark作業中針對的數據源就不是原來的Hive表了,而是預處理后的Hive表。此時由於數據已經預先進行過聚合或join操作了,那么在Spark作業中也就不需要使用原先的shuffle類算子執行這類操作了。

方案實現原理:這種方案從根源上解決了數據傾斜,因為徹底避免了在Spark中執行shuffle類算子,那么肯定就不會有數據傾斜的問題了。但是這里也要提醒一下大家,這種方式屬於治標不治本。因為畢竟數據本身就存在分布不均勻的問題,所以Hive ETL中進行group by或者join等shuffle操作時,還是會出現數據傾斜,導致Hive ETL的速度很慢。我們只是把數據傾斜的發生提前到了Hive ETL中,避免Spark程序發生數據傾斜而已。

方案優點:實現起來簡單便捷,效果還非常好,完全規避掉了數據傾斜,Spark作業的性能會大幅度提升。

方案缺點:治標不治本,Hive ETL中還是會發生數據傾斜。

方案實踐經驗:在一些Java系統與Spark結合使用的項目中,會出現Java代碼頻繁調用Spark作業的場景,而且對Spark作業的執行性能要求很高,就比較適合使用這種方案。將數據傾斜提前到上游的Hive ETL,每天僅執行一次,只有那一次是比較慢的,而之后每次Java調用Spark作業時,執行速度都會很快,能夠提供更好的用戶體驗。

項目實踐經驗:在美團·點評的交互式用戶行為分析系統中使用了這種方案,該系統主要是允許用戶通過Java Web系統提交數據分析統計任務,后端通過Java提交Spark作業進行數據分析統計。要求Spark作業速度必須要快,盡量在10分鍾以內,否則速度太慢,用戶體驗會很差。所以我們將有些Spark作業的shuffle操作提前到了Hive ETL中,從而讓Spark直接使用預處理的Hive中間表,盡可能地減少Spark的shuffle操作,大幅度提升了性能,將部分作業的性能提升了6倍以上。

13.4.2 解決方案二:過濾少數導致傾斜的key

方案適用場景:如果發現導致傾斜的key就少數幾個,而且對計算本身的影響並不大的話,那么很適合使用這種方案。比如99%的key就對應10條數據,但是只有一個key對應了100萬數據,從而導致了數據傾斜。

方案實現思路:如果我們判斷那少數幾個數據量特別多的key,對作業的執行和計算結果不是特別重要的話,那么干脆就直接過濾掉那少數幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的數據量最多然后再進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數量,取數據量最多的key過濾掉即可。

方案實現原理:將導致數據傾斜的key給過濾掉之后,這些key就不會參與計算了,自然不可能產生數據傾斜。

方案優點:實現簡單,而且效果也很好,可以完全規避掉數據傾斜。

方案缺點:適用場景不多,大多數情況下,導致傾斜的key還是很多的,並不是只有少數幾個。

方案實踐經驗:在項目中我們也采用過這種方案解決數據傾斜。有一次發現某一天Spark作業在運行的時候突然OOM了,追查之后發現,是Hive表中的某一個key在那天數據異常,導致數據量暴增。因此就采取每次執行前先進行采樣,計算出樣本中數據量最大的幾個key之后,直接在程序中將那些key給過濾掉。

13.4.3 解決方案三:提高shuffle操作的並行度

方案適用場景:如果我們必須要對數據傾斜迎難而上,那么建議優先使用這種方案,因為這是處理數據傾斜最簡單的一種方案。

方案實現思路:在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200,對於很多場景來說都有點過小。

方案實現原理:增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。具體原理如下圖所示。

方案優點:實現起來比較簡單,可以有效緩解和減輕數據傾斜的影響。

方案缺點:只是緩解了數據傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。

方案實踐經驗:該方案通常無法徹底解決數據傾斜,因為如果出現一些極端情況,比如某個key對應的數據量有100萬,那么無論你的task數量增加到多少,這個對應着100萬數據的key肯定還是會分配到一個task中去處理,因此注定還是會發生數據傾斜的。所以這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用嘴簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。

13.4.4 解決方案四:兩階段聚合(局部聚合+全局聚合)

方案適用場景:對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。

方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數后的數據,執行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。

方案實現原理:將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。具體原理見下圖。

方案優點:對於聚合類的shuffle操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark作業的性能提升數倍以上。

方案缺點:僅僅適用於聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

// 第一步,給RDD中的每個key都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每個key的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

13.4.5 解決方案五:將reduce join轉為map join

方案適用場景:在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數據量比較小(比如幾百M或者一兩G),比較適用此方案。

方案實現思路:不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接着對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。

方案實現原理:普通的join是會走shuffle過程的,而一旦shuffle,就相當於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。具體原理如下圖所示。

方案優點:對join操作導致的數據傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生數據傾斜。

方案缺點:適用場景較少,因為這個方案只適用於一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內存資源,driver和每個Executor內存中都會駐留一份小RDD的全量數據。如果我們廣播出去的RDD數據比較大,比如10G以上,那么就可能發生內存溢出了。因此並不適合兩個都是大表的情況。

// 首先將數據量比較小的RDD的數據,collect到Driver中來。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每個Executor就只有一份RDD的數據。
// 可以盡可能節省內存空間,並且減少網絡傳輸性能開銷。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 對另外一個RDD執行map類操作,而不再是join類操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函數中,通過廣播變量,獲取到本地Executor中的rdd1數據。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以將rdd1的數據轉換為一個Map,便於后面進行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 獲取當前RDD數據的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 從rdd1數據Map中,根據key獲取到可以join到的數據。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 這里得提示一下。
// 上面的做法,僅僅適用於rdd1中的key沒有重復,全部是唯一的場景。
// 如果rdd1中有多個相同的key,那么就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數據進行join。
// rdd2中每條數據都可能會返回多條join后的數據。

13.4.6 解決方案六:采樣傾斜key並分拆join操作

方案適用場景:兩個RDD/Hive表進行join的時候,如果數據量都比較大,無法采用“解決方案五”,那么此時可以看一下兩個RDD/Hive表中的key分布情況。如果出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。

方案實現思路

對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數量,計算出來數據量最大的是哪幾個key。
然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
接着將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
而另外兩個普通的RDD就照常join即可。
最后將兩次join的結果使用union算子合並起來即可,就是最終的join結果。

方案實現原理:對於join導致的數據傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key分拆成獨立RDD,並附加隨機前綴打散成n份去進行join,此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task進行join了。具體原理見下圖。

方案優點:對於join導致的數據傾斜,如果只是某幾個key導致了傾斜,采用該方式可以用最有效的方式打散key進行join。而且只需要針對少數傾斜key對應的數據進行擴容n倍,不需要對全量數據進行擴容。避免了占用過多內存。

方案缺點:如果導致傾斜的key特別多的話,比如成千上萬個key都導致數據傾斜,那么這種方式也不適合。

// 首先從包含了少數幾個導致數據傾斜key的rdd1中,采樣10%的樣本數據。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 對樣本數據RDD統計出每個key的出現次數,並按出現次數降序排序。
// 對降序排序后的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。
// 具體取出多少個數據量最多的key,由大家自己決定,我們這里就取1個作為示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 從rdd1中分拆出導致數據傾斜的key,形成獨立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 從rdd1中分拆出不導致數據傾斜的普通key,形成獨立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那個所有key的分布相對較為均勻的rdd。
// 這里將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
// 對擴容的每條數據,都打上0~100的前綴。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 將rdd1中分拆出來的導致傾斜的key的獨立rdd,每條數據都打上100以內的隨機前綴。
// 然后將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 將傾斜key join后的結果與普通key join后的結果,uinon起來。
// 就是最終的join結果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

13.4.7 解決方案七:使用隨機前綴和擴容RDD進行join

方案適用場景:如果在進行join操作時,RDD中有大量的key導致數據傾斜,那么進行分拆key也沒什么意義,此時就只能使用最后一種方案來解決問題了。

方案實現思路
該方案的實現思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數據分布情況,找到那個造成數據傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條數據。
然后將該RDD的每條數據都打上一個n以內的隨機前綴。
同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。
最后將兩個處理后的RDD進行join即可。

方案實現原理:將原先一樣的key通過附加隨機前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。該方案與“解決方案六”的不同之處就在於,上一種方案是盡量只對少數傾斜key對應的數據進行特殊處理,由於處理過程需要擴容RDD,因此上一種方案擴容RDD后對內存的占用並不大;而這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行數據擴容,對內存資源要求很高。

方案優點:對join類型的數據傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。

方案缺點:該方案更多的是緩解數據傾斜,而不是徹底避免數據傾斜。而且需要對整個RDD進行擴容,對內存資源要求很高。

方案實踐經驗:曾經開發一個數據需求的時候,發現一個join導致了數據傾斜。優化之前,作業的執行時間大約是60分鍾左右;使用該方案優化之后,執行時間縮短到10分鍾左右,性能提升了6倍。

// 首先將其中一個key分布相對較為均勻的RDD膨脹100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,將另一個有數據傾斜key的RDD,每條數據都打上100以內的隨機前綴。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 將兩個處理后的RDD進行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

13.4.8 解決方案八:多種方案組合使用

在實踐中發現,很多情況下,如果只是處理較為簡單的數據傾斜場景,那么使用上述方案中的某一種基本就可以解決。但是如果要處理一個較為復雜的數據傾斜場景,那么可能需要將多種方案組合起來使用。比如說,我們針對出現了多個數據傾斜環節的Spark作業,可以先運用解決方案一和二,預處理一部分數據,並過濾一部分數據來緩解;其次可以對某些shuffle操作提升並行度,優化其性能;最后還可以針對不同的聚合或join操作,選擇一種方案來優化其性能。大家需要對這些方案的思路和原理都透徹理解之后,在實踐中根據各種不同的情況,靈活運用多種方案,來解決自己的數據傾斜問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM