#####1. 翻譯
Apache Spark是一個快速的、通用的集群計算系統。它提供Java、Scala、Python和R中的高級api,以及一個支持通用執行圖的優化引擎。它還支持一組豐富的高級工具,包括用於SQL和結構化數據處理的[Spark SQL]、用於機器學習的[MLlib]、用於圖形處理的[GraphX]和用於流媒體的[Spark streams]。
#####2.什么是spark?
> Spark是一種基於內存的快速、通用、可擴展的大數據分析引擎。
##### 3.spark生態:
> + spark core: spark 的核心計算
> + spark sql (結構化數據):對歷史數據做交互式查詢(即席查詢:用戶根據自己的需求 自定義查詢)
> + spark Streaming : 近實時計算
> + spark mlib :機器學習
> + spark graphX:圖計算(關注事物本身而且關注事物之間的聯系)
##### 4. 什么是結構化和非結構化?
結構化數據——能夠用數據或統一的結構加以表示,如數字、文字、符號。結構化數據也稱作行數據,是由二維表結構來邏輯表達和實現的數據,嚴格地遵循數據格式與長度規范,主要通過關系型數據庫進行存儲和管理。
半結構化數據——是介於完全結構化數據(如關系型數據庫、面向對象數據庫中的數據)和完全無結構的數據(如聲音、圖像文件等)之間的數據,XML、HTML文檔就屬於半結構化數據。它一般是自描述的,數據的結構和內容混在一起,沒有明顯的區分。
非結構化數據——非結構化數據是數據結構不規則或不完整,沒有預定義的數據模型,不方便用數據庫二維邏輯表來表現的數據。包括圖像和音頻/視頻信息等等。丟失的視頻數據就屬於非結構化數據。
#####5.實時計算框架Storm sparkString flink 區別?
> Storm ,flink 實時處理性好 一條一條處理
> sparkString 可以批處理 非實時
> flink 批處理 flink SQL
##### 6.spark 的資源調度? standalone yarn mesos
> local模式:利用多線程對程序進行單機調試;分布式部署模式:
> (1)、standalone:獨立模式,類似於MapReduce1;
> (2)、spark on yarn:利用yarn進行統一的資源調度和管理;
> (3)、spark on mesos:類似於yarn進行資源調度,支持粗粒度和細粒度的調度,前者節約資源調度時間,后者減少資源的浪費;
##### 7.什么是Rdd?
> 彈性分布式數據集
> 這三個特性分別為:分區,不可變,並行操作。
#####8.spark on hive 和 hive on spark?
> + spark on hive:hive 作為數據源,spark計算
> + hive on spark:spark 作為hive 底層的計算引擎
##### 9.spark 為什么比hadoop 的mr快?
> 1. 基於內存
> 2. spark實現了DAG引擎
> 3. spark的容錯
1.消除了冗余的HDFS讀寫
Hadoop每次shuffle操作后,必須寫到磁盤,而Spark在shuffle后不一定落盤,可以cache到內存中,以便迭代時使用。如果操作復雜,很多的shufle操作,那么Hadoop的讀寫IO時間會大大增加。
2.消除了冗余的MapReduce階段
Hadoop的shuffle操作一定連着完整的MapReduce操作,冗余繁瑣。而Spark基於RDD提供了豐富的算子操作,且reduce操作產生shuffle數據,可以緩存在內存中。
3.JVM的優化
Spark Task的啟動時間快。Spark采用fork線程的方式,Spark每次MapReduce操作是基於線程的,只在啟動。而Hadoop采用創建新的進程的方式,啟動一個Task便會啟動一次JVM。
Spark的Executor是啟動一次JVM,內存的Task操作是在線程池內線程復用的。
##### 10.什么是DAG? 有向無環圖
##### 11.spark 的特點?
1)快:與 Hadoop的 Mapreduce/相比, Spark基於內存的運算要快100倍以上,基於硬盤的運算也要快10上。Spak實現了高效的DAG執行引擎,可以通過基於內存來高效處理數據流。計算的中間結果是存在於內存中
2)易用:spaは支持ava、 Python和a的AP,還支持超過so種高級算法,使用戶可以快速構建不同的應用。而且Spak支持交互式的 Python和 Scala的 Shell,,可以非常方便地在這些Shel中使用 Spark集群來驗證解決問題的方法。
3)通用: Spark提供了統一的解決方案。 Spark可以用於批處理、交互式查詢( Spark SQL)、實時流處理( Spark Streaming)、機器學習( Spark Mllib)和圖計算( Graphx)。這些不同類型的處理都可以在同一個應用中無縫使用。減少了開發和維護的人力成本和部署平台的物力成本
4)兼容性: Spark可以非常方便地與其他的開源產品進行融合。比如, Spark可以使用 Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,並且可以處理所有 Hadoop支持的數據,包括HDFS、 Hbase等。這對於已經部署 Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spak的強大處理能力。
##### 12.spark 能代替hadoop 嗎?
不能,僅僅是mr的替代方案
##### 13.spark Rdd 的緩存?
cache / persist 內存中緩存 ,內部的優化機制。當Rdd 重復被使用了,不需要在重新計算,直接從內存中獲取使用
1、提高性能 (默認將數據緩存在內存中)
2.緩存使用的函數cache,Persist,**標識RDD可以被緩存**
cache函數底層調用Persist
storage level:標識緩存的位置
MEMORY_ONLY 存在內存中的
MEMORY_ONLY_2 存在內存中的
MEMORY_ONLY_SER 存在內存中,以序列化形式存儲
DISK_ONLY 存儲在磁盤中
##### 14.driver 的作用?
Spark 的驅動器是執行開發程序中的 main 方法的進程。它負責開發人員編寫的用來創 建 SparkContext、創建 RDD,以及進行 RDD 的轉化操作和行動操作代碼的執行。
啟動 Spark shell 的時候,系統后台自啟了一個 Spark 驅動器程序, 就是在 Spark shell 中預加載的一個叫作 sc 的 SparkContext 對象。如果驅動器程序終止,那 么 Spark 應用也就結束了。
主要負責:
1)把用戶程序轉為作業(JOB)
2)跟蹤 Executor 的運行狀況
3)為執行器節點調度任務
4)UI 展示應用運行狀況
##### 15.excutor 的作用?
Spark Executor 是一個工作進程,負責在 Spark 作業中運行任務,任務間相互獨立。Spark 應用啟動時,Executor 節點被同時啟動,並且始終伴隨着整個 Spark 應用的生命周期而存 在。如果有 Executor 節點發生了故障或崩潰,Spark 應用也可以繼續執行,會將出錯節點上 的任務調度到其他 Executor 節點上繼續運行。
主要負責:
1)負責運行組成 Spark 應用的任務,並將結果返回給驅動器進程;
2)通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的 RDD 提供內存式 存儲。RDD 是直接緩存在 Executor 進程內的,因此任務可以在運行時充分利用緩存數據加速運算。
##### 16.spark spark-submit腳本的參數有哪些?
參數名 參數說明
--master master 的地址,提交任務到哪里執行,例如 spark://host:port, yarn, local
--deploy-mode 在本地 (client) 啟動 driver 或在 cluster 上啟動,默認是 client
--class 應用程序的主類,僅針對 java 或 scala 應用
--name 應用程序的名稱
--jars 用逗號分隔的本地 jar 包,設置后,這些 jar 將包含在 driver 和 executor 的 classpath 下
--packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標
--exclude-packages 為了避免沖突 而指定不包含的 package
--repositories 遠程 repository
--conf PROP=VALUE
指定 spark 配置屬性的值,
例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
--properties-file 加載的配置文件,默認為 conf/spark-defaults.conf
--driver-memory Driver內存,默認 1G
--driver-java-options 傳給 driver 的額外的 Java 選項
--driver-library-path 傳給 driver 的額外的庫路徑
--driver-class-path 傳給 driver 的額外的類路徑
--driver-cores Driver 的核數,默認是1。在 yarn 或者 standalone 下使用
--executor-memory 每個 executor 的內存,默認是1G
--total-executor-cores 所有 executor 總共的核數。僅僅在 mesos 或者 standalone 下使用
--num-executors 啟動的 executor 數量。默認為2。在 yarn 下使用
--executor-core 每個 executor 的核數。在yarn或者standalone下使用
##### 17.spark 配置的方式?
> 代碼 腳本 配置文件 , 優先級一次降低的
##### 18.spark的資源調度方式?
> local模式
> standalone
> on-yarn : yarn-cluster yarn-client
> mesos (粗細粒度)
##### 19.spark 的提交方式有兩種?
> 發布模式: client cluster
##### 20.spark collect 算子的作用?
> 收集一個彈性分布式數據集的所有元素到一個數組中,從遠程集群拉取數據到driver端。
##### 21.oom?
> OOM - Out of Memory,內存溢出
##### 22.spark yarn 的提交方式有:
> --master yarn-client :資源調度yarn 提交方式 client
> --master yarn-cluster :資源調度yarn 提交方式 cluster
cluster 模式會在集群的某個節點上為 Spark 程序啟動一個稱為 Master 的進程,然后 Driver 程序會運行正在這個 Master 進程內部,由這種進程來啟動 Driver 程序,客戶端完成提交的步驟后就可以退出,不需要等待 Spark 程序運行結束,這是四一職中適合生產環境的運行方式
client 模式也有一個 Master 進程,但是 Driver 程序不會運行在這個 Master 進程內部,而是運行在本地,只是通過 Master 來申請資源,直到運行結束,這種模式非常適合需要交互的計算。顯然 Driver 在 client 模式下會對本地資源造成一定的壓力。
##### 23.spark client 和 cluster 提交方式的區別?
> 主要區別在於:Driver程序的運行節點
> + client:driver 在你當前提交的節點運行,driver 需要頻繁的和集群通信占用大量的網絡帶寬,容易掛掉,好處是方便查看日志便於調試,多用於學習和測試
> + cluster :driver 在集群的節點。掛掉會自動重啟
##### 24.Rdd的屬性?
> 1. RDD是一個分片的數據集合;
> 2. RDD的函數針對每個分片進行計算,並行計算
> 3. rdd之間的依賴關系(寬窄依賴)
> 4. key-value型RDD是根據哈希來分區的
##### 25.Rdd的並行度?
> 一個Rdd 可以有多個分片,一個分片對應一個task,分片的個數決定並行度
> 並行度並不是越高越好, 還要考慮資源的情況
> 當並行度過低時,Spark集群會出現資源閑置的情況。
> 當並行度過高時,每個分區產生的間接開銷累計起來會更大。
##### 26.spark的算子有兩種?
> 轉換算子:transformation 由一個Rdd 生成一個新的Rdd 不會被立即執行,記錄的都是一系列的操作
>
> 動作算子:action 立即執行,返回都是結果
##### 27.spark的容錯?
> lineage:血緣關系,根據血緣關系從新計算 進行容錯
> checkpoint:設置檢查點,一般都是文件系統,磁盤io
##### 28.spark的持久化?
> cache 、persist、checkpoint
> cache持久化:
> cache實際上是persist的一種簡化方式,是一種**懶執行**的,執行action類算子才會觸發,cahce后返回值要賦值給一個變量,下一個job直接基於變量進行操作。
> checkpoint:會將RDD的數據存儲到HDFS中,安全系數較高,因為HDFS會有備份
##### 29.什么是寬依賴,窄依賴?
> 窄依賴:父RDD與子RDD,partition之前的關系是一對一(或者多對一)的;
> 寬依賴:父RDD與子RDD,partition之前的關系是一對多的;
> 作用:切割job,划分stage;Application,一個算子或者多個算子拆分(寬依賴和窄依賴)
> - l 是碰到一個寬依賴,就會切割一個stage
> - l rdd里面存儲的不是數據,而一個代碼邏輯
> - l 數據只有在stage與stage之間的時候將數據落地,到底數據寫到哪個小文件(Reduce輸出)中,是由分區器決定的.
> - l stage里面的並行度是由最后一個rdd來決定的.
##### 31.創建Rdd的方式有幾種?
> 1. 使用程序中的集合創建RDD val arr = Array() sc.parallelize(arr) sc.makeRDD(arr )
> 2. 使用本地文件創建RDD
> 3. 使用HDFS文件創建RDD
##### 32.map和mapPartitons有什么區別?
> map是對rdd中的每一個元素進行操作
> mapPartitions則是對rdd中的每個分區的迭代器進行操作
##### 30.總結Rdd 的算子(30個以上)
| **算子** | **類型** | **說明** |
| ----------------------------- | ------------ | ------------------------------------------------------------ |
| **++** | | 合並兩個RDD |
| **aggregate** | 執行算子 | 根據初始化值進行對rdd種的元素進行聚合,結束之后每個分區會有一個結果,后面會根據這個分區結果再進行一次聚合 |
| **aggregateByKey** | 執行算子 | 和aggregate類似,但是操作的是RDD是Pair類型 |
| **cache()** | 控制操作 | 當第一次遇到Action算子時才會觸發持久化操作。Cache()是persis的一種特殊情況,將RDD持久化到內存中 |
| 轉換算子 | | |
| **Cartesian** | 轉換算子 | 計算兩個RDD之間的笛卡爾乘積,並將它們作為新的RDD返回 |
| **coalesce** | 轉換算子 | 將RDD進行重分區,使用HashPartitioner。它的簡版是repartition算子 |
| **Cogroup** | 轉換算子 | 相當於SQL中的**全外關聯**full outer join,返回左右RDD中的記錄,關聯不上的為空。 |
| **Collect** | **執行算子** | 一個RDD轉換成數組。根據一個偏函數返回一個符合偏函數的結果集RDD。即將RDD轉換成數組 |
| **collectAsMap** | | 一個RDD轉換成Map |
| **combineByKey** | 轉換算子 | **將RDD[K,V]轉換成RDD[K,C],**這里的V類型和C類型可以相同也可以不同。(單個值類型v操作 , 同分區內合並操作 , 多分區間的合並操作 ) |
| **Count** | **執行算子** | 返回RDD中的元素數量 |
| **countByKey** | **執行算子** | 統計RDD[K,V]中每個K的數量。 |
| **createCombiner** | 參數 | 組合器函數,用於將V類型轉換成C類型,輸入參數未RDD[K,V]中的V,輸出為C |
| **distinct** | 轉換算子 | 去除RDD重復的元素,返回所有元素不重復的RDD |
| **flatMap** | 轉換算子 | 類似於map。1對多,可以理解成將原來的數據集拍扁了。RDD中每個元素可生成一個或多個元素構成的新RDD例如將數組、列表拆分成單個值或字符串拆分成單個字符 |
| **flatMapValues** | 轉換算子 | 類似於flatMap,只不過flatMapValues是針對[K,V]中的V值進行flatMap操作。 |
| **filter** | 轉換算子 | 過濾,根據里面的規則返回(true的)一個過濾過后的rdd |
| **First** | **執行算子** | 返回RDD中的第一個元素,不排序。 |
| **Fold** | 執行算子 | 是aggregate的簡化版,將aggregate中的seqOp和combOp使用同一個函數op。 |
| **foldByKey** | 轉換算子 | 作用於RDD[K,V],根據K將V做折疊、合並處理。 |
| **foreach** | **執行算子** | 遍歷RDD,將函數f應用於每一個元素。需要注意如果RDD執行foreach,只會在Executor端有效,並且不是Driver端 |
| **foreachPartition** | **執行算子** | 與foreach類似,只不過是對每一個分區使用f |
| **fullOuterJoin** | 轉換算子 | 類似於SQL的全連接, |
| **glom** | 轉換算子 | 將RDD中每一個分區中所有類型為T的元素轉換成Array[T] |
| **groupBy** | | 根據業務需求,按照自定義的返回值來分區 |
| **groupByKey** | 轉換算子 | 根據key將value進行分組。該函數用於將RDD[K,V]中每個K對應的V值,合並到一個集合Iterable[V]中 |
| **intersection** | 轉換算子 | 取交集。返回兩個RDD中相同的數據集合,返回元素去重。類似於SQL中的inner join |
| **join** | 轉換算子 | 類似於SQL中的內關聯join,只返回兩個RDD根據K可以關聯上的結果 |
| **leftOuterJoin** | 轉換算子 | 類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空 |
| **Lookup** | 執行算子 | 用於(K,V)類型的RDD,指定K值,返回RDD中該K對應的所有V值。 |
| **map** | 轉換算子 | 對RDD中的每一個元素經過func函數轉換后形成一個新的RDD |
| **mapPartitions** | 轉換算子 | 是map的一個變種。mapPartitions的輸入函數是應用於每個分區,也就是把每個分區中的內容作為整體來處理的。**在映射過程中頻繁創建額外的對象時mapPartitions比map高效,例如在RDD中創建數據庫的連接等。** |
| **mapPartitionsWithIndex** | 轉換算子 | 函數作用同mapPartitions,不過提供了兩個參數,第一個參數為分區的索引。 |
| **mapValues** | 轉換算子 | 類似於map算子,只不過此算子針對[K,V]值中的V值進行map。進行輸入函數應用於RDD中Kev-Value的Value,原RDD中的Key保持不變與新的Value一起組成新的RDD中的元素。如(panda,0)轉成(panda,(0,1)) |
| mergeValue | 參數 | 合並值函數,將一個C類型和V類型值合並成一個C類型,輸入參數為(C,V),輸出為C |
| mergeCombiners | 參數 | 合並組合器函數,用於將兩個C類型值合並成一個C類型,輸入參數為(C,C),輸出為C |
| numPartition | 參數 | 結果RDD分區數,默認保持原有分區數 |
| **partitionBy** | 轉換算子 | 該函數根據partitioner函數生成新的ShuffleRDD,將原RDD重新分區。 |
| persist() | 控制操作 | persis(level:StorageLevel)可以傳入緩存級別,默認是MEMORY_ONLY,此時同cache()操作 |
| 轉換算子 | | |
| **randomSplit** | 轉換算子 | 該函數根據weights權重,將一個RDD切分成多個RDD。 |
| **Reduce** | 執行算子 | 將RDD中元素兩兩傳遞給輸入函數,同時產生一個新的值。根據映射函數,對RDD中的元素進行二元計算 |
| **reduceByKey** | 轉換算子 | 對元素為RDD[K,V]對的RDD中Key相同的元素的Value進行reduce |
| **reduceByKeyLocally** | 轉換算子 | 和reduceByKey類似。RDD[K,V]中每個K對應的V值根據映射函數來運算,運算結果映射到一個Map[K,V]中,而不是RDD[K,V]。 |
| **repartition** | 轉換算子 | 該函數其實就是coalesce函數第二個參數為true的實現 |
| **rightOuterJoin** | 轉換算子 | 類似於SQL中的右外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空 |
| **saveAsHadoopFile** | 存儲操作 | 將RDD存儲在HDFS上的文件中,支持老版本Hadoop API。可以指定outputKeyClass、outputValueClass以及壓縮格式 |
| **執行算子** | | |
| **saveAsHadoopDataset** | 存儲操作 | 可以用於將RDD保存到除了HDFS的其他存儲中,比如HBase。在JobConf中通常需要關注或設置5個參數:文件保存路徑、Key值的class類型、value值的class類型、RDD的輸出格式(OutputFormat)以及壓縮相關的參數 |
| **執行算子** | | |
| **saveAsNewAPIHadoopFile** | 存儲操作 | 用於將RDD數據保存到HDFS上,使用新版本的Hadoop API,用法基本同**saveAsHadoopFile** |
| **執行算子** | | |
| **saveAsNewAPIHadoopDataset** | 存儲操作 | 該方法作用同**saveAsHadoopDataset,**只不過采用新版本的Hadoop API |
| **執行算子** | | |
| **saveAsObjectFile** | 存儲操作 | 將RDD中的元素序列化成對象,存儲到文件中。對於HDFS,默認采用SequenceFile保存 |
| **執行算子** | | |
| **saveAsSequenceFile** | 存儲操作 | 將RDD以SequenceFile的文件格式**保存到HDFS**上。 |
| **執行算子** | | |
| **saveAsTextFile** | 存儲操作 | 將RDD以文本文件的格式存儲到文件系統中。 |
| **執行算子** | | |
| **sortBy** | **執行算子** | 排序。根據規則來定義排序。true升序 false,升序 |
| **sortByKey** | **執行算子** | 排序。根據按value的排序 |
| **subtract** | 轉換算子 | 該函數類似於intersection,但返回在RDD中出現,並且不在otherRDD中出現的元素,不去重。 |
| **subtractByKey** | 轉換算子 | 和基本轉換操作中的subtract類似。只不過這里是針對K的,返回在主RDD中出現,並且不在otherRDD中出現的元素。 |
| **take(n)** | 執行算子 | 用於獲取RDD中從0到n-1下標的元素,不排序。 |
| **takeOrdered** | 執行算子 | 和top類似,只不過以和top相反的順序返回元素 |
| **top** | 執行算子 | 從RDD中,按照默認(降序)或者指定的排序規則,返回前num個元素 |
| **union** | 轉換算子 | 就是將兩個RDD進行合並(類型需一致),返回兩個RDD的並集,不去重。 |
| **zip** | 轉換算子 | 將兩個RDD組合成Key/Value形式的RDD。這里默認兩個RDD的partitio數以及元素數量都相同,否則會拋出異常 |
| **zipPartitions** | 轉換算子 | 將多個RDD按照partition組合成為新的RDD,該操作需要組合的RDD具有相同的分區數,但對於每個分區內的元素數量沒有要求 |
| **zipWithIndex** | 轉換算子 | 將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。 |
| **zipWithUniqueId** | 轉換算子 | 將RDD中元素和一個唯一ID組合成鍵/值對。該唯一ID生成的算法如下:①每個分區中第一個元素的唯一ID值為該分區索引號;②每個分區中第N個元素的唯一ID值為(前一個元素唯一ID-ID值)+(該RDD總的分區數) |
##### 33.Spark的重分區算子? 區別?
> coalesce 縮減分區數,用於大數據集過濾后,提高小數據集的執行效率
>
> repartition 根據分區數,重新通過網絡隨機洗牌所有數據
>
> 區別:
>
> - coalesce: 重新分區,可以選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定
> - repartition: 實際上是調用的coalesce,默認是進行shuffle的
##### 34.總結寬依賴算子?
> 1. 所有bykey 的算子 groupbykey reducebyky…
> 2. repatition,cartesian算子
> 3. 部分join算子
##### 35.使用什么方法可以代替join?
> 廣播變量+map+filter
##### 36.reducebykey和groupbykey的區別?
> 1. reduceByKey:按照 key 進行聚合,在 shuffle 之前有 combine(預聚合)操作,返回結果 是 RDD[k,v].
> 2. groupByKey:按照 key 進行分組,直接進行 shuffle。
> 3. 開發指導:reduceByKey 比 groupByKey,建議使用。但是需要注意是否會影響業務邏輯
##### 37.DAG如果划分stage?
(底層對應了三個RDD:MapPartitionsRDD、ShuffleRDD、MapPartitionsRDD)
>1、使用出發job的最后一個RDD,創建finalStage(創建一個stage對象,並且將stage加入到DAGScheduler內部的內存緩存中)
>
>2、使用finalStage創建一個job(這個job的最后一個stage,就是 finalStage)
>
>3、將job加入到內存緩存中
>
>4、使用 submitStage() 提交 finalStage
>提交stage的方法(stage划分算法入口):
>
>調用 getMissingParentStage() 獲取當前這個 stage 的父 stage:
>
> 往棧中推入stage的最后一個RDD
>
> while循環對stage的最后一個RDD,調用自己定義的visit()方法
>
> visit():如果是窄依賴,將RDD放入棧中,如果是寬依賴,使用寬依賴的那個RDD創建一個stage,將isShuffleMap設為true
>
>提交stage,為stage創建一批task,task數量與Partition數量相同
>
>計算每個task對應的Partition的最佳位置(就是從stage最后一個RDD開始,去找被cache或checkpoint的RDD的Partition,task的最佳位置,就是該Partition的位置,這樣task就在那個節點上執行,不需要計算之前的RDD;如果從最后一個RDD到最開始的RDD,都沒有被cache或checkpoint,那么最佳位置就是Nil,就是沒有最佳位置)
>
>5.、針對stage的task,創建TaskSet對象,調用TaskScheduler的submitTask方法,提交TaskSet,提交到Excutor上去執行
##### 38.如何划分job?
> 1)Application:初始化一個SparkContext即生成一個Application;
>
> 2)Job:一個Action算子就會生成一個Job;
>
> 3)Stage:根據RDD之間的依賴關系的不同將Job划分成不同的Stage,遇到一個寬依賴則划分一個Stage;
>
> ==對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據.==
>
> 4)Task:Stage是一個TaskSet,將Stage划分的結果發送到不同的Executor執行即為一個Task。
>
> 注意:Application->Job->Stage-> Task 每一層都是 1 對 n 的關系。
##### 39.spark的提交流程總結?
1、Driver端會調用SparkSubmit類(內部執行submit->doRunMain->通過反射
獲取應用程序的主類對象->執行主類的main方法)
2、構建sparkConf和sparkContext對象,在sparkContext入口做了三件事,創建
了sparkEnv對象(創建了ActorSystem對象)TaskScheduler(用來生成並發送
task給Executor)DAGScheduler(用來划分Stage)
3、clientActor 將任務封裝到 ApplicationDescription 對象並且提交給Master
4、Master收到任務信息后,將任務信息存到內存中,同時放到隊列中(waitingApp)
5、任務信息開始執行后,調用schedule方法,進行資源的調度。
6、將調度好的資源封裝到LaunchExecutor並發送給對應的worker。
7、worker 接收到 master 發送來的任務調度信息(LaunchExecutor),將信息封裝
成一個ExecutorRunner對象。
8、封裝成ExecutorRunner后,調用ExecutorRunner的·start方法,開始啟動GoarseGrainedExecutorBackend對象
9、Executor啟動后DriverActor進行反向注冊。
10、與DriverActor注冊成功后,創建一個線程池(TreadPool)用來執行任務
11、當所有的Executor注冊完成后,意味着作業環境准備好了,Driver端會結束與
sparkContext 對象的初始化。
12、當Driver初始化完成后(創建一個sc實例)會繼續執行我們自己提交的App
代碼,當觸發了action算子時就會觸發一個job,這時就會調用DAGScheduler對象
進行Stage划分。
13、DagScheduler開始進行stage划分。
14、將划分好的stage按照分區生成一個一個的task,並且封裝到TaskSet對象中
然后TaskSet提交到TaskScheduler
15、TaskScheduler按照提交過來的TaskSet,拿到一個序列化器,將TaskSet序列化
,將序列化好的Task封裝到LaunchExecutor並且提交到DriverActor。
16、DriverActor把LauchExcutor發送到Excutro上。
17、Executor接收到DriverActor發送過來的任務(LaunchExecutro),會將其封裝成為
TaskRunner,然后從線程池中獲取線程來執行TaskRunner。
18、TaskRunner拿到反序列化器,反序列Taskset,然后執行App代碼,也就是對
RDD分區上執行的算子和自定義函數。
ClientActor:負責和Master通信,向Master注冊任務信息
DriverActor:負責和Executor進行通信,接收到 Executor反向注冊和把任務發送到Executer。
##### 40.spark 持久化的級別?
| StorageLevel | 說明 |
| :------------------ | :----------------------------------------------------------- |
| NONE | 沒有一個 |
| MEMORY_ONLY | 使用未序列化的Java對象格式,將數據保存在內存中。如果內存不夠存放所有的數據,則數據可能就不會進行持久化,默認的持久化策略 |
| MEMORY_AND_DISK | 使用未序列化的Java對象格式,優先嘗試將數據保存在內存中。如果內存不夠存放所有的數據,會將數據寫入磁盤文件中。不會立刻輸出到磁盤 |
| MEMORY_ONLY_SER | RDD的每個partition會被序列化成一個字節數組,節省空間,讀取時間更占CPU |
| MEMORY_AND_DISK_SER | 序列化存儲,超出部分寫入磁盤文件中 |
| DISK_ONLY | 使用未序列化的Java對象格式,將數據全部寫入磁盤文件中 |
| MEMORY_ONLY_2 | 對於上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數據,都復制一份副本,並將副本保存到其他節點上 |
| OFF_HEAP | RDD序列化存儲在Tachyon 優點:減少頻繁的GC,減少內存的使用,提高程序性能。 缺點:沒有數據備份,也不能像alluxio那樣保證數據高可用,丟失數據則需要重新計算。 |
##### 41.spark 持久化的選擇?
> 如果默認能滿足使用默認的
> 如果不能於MEMORY_ONLY很好的契合,建議使用MEMORY_ONLY_SER
> 盡可能不要存儲數據到磁盤上,除非數據集函數計算量特別大,或者它過濾了大量數據,否則從新計算一個分區的速度和從磁盤中讀取差不多
> 如果想擁有快速故障恢復能力,可以使用復制存儲級別(_2)
> 可以自定義存儲級別(如復制因子為3),使用StorageLevel單例對象apply方法
##### 42.spark 持久化和容錯的應用場景?
> 1, checkpoint(容錯)是考慮安全性,
> RDD 可以使用 persist() 方法或 cache() 方法進行持久化。數據將會在第一次 action 操作時進行計算,並緩存在節點的內存中。Spark 的緩存具有容錯機制,如果一個緩存的 RDD 的某個分區丟失了,Spark 將按照原來的計算過程,自動重新計算並進行緩存。
> 2,持久化是為高效性
> 持久化到磁盤、已序列化的 Java 對象形式持久化到內存(可以節省空間)、跨節點間復制、以 off-heap 的方式存儲在Tachyon 。
> 這些存儲級別通過傳遞一個 StorageLevel 對象(Scala、Java、Python)給 persist() 方法進行設置。cache() 方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是 StorageLevel.MEMORY_ONLY(將反序列化的對象存儲到內存中)。
#####43. spark的序列化? java kryo
Java序列化
在默認情況下,Spark采用Java的ObjectOutputStream序列化一個對象。該方式適用於所有實現了java.io.Serializable的類。通過繼承java.io.Externalizable,你能進一步控制序列化的性能。Java序列化非常靈活,但是速度較慢,在某些情況下序列化的結果也比較大。
Kryo序列化
Spark也能使用Kryo(版本2)序列化對象。Kryo不但速度極快,而且產生的結果更為緊湊(通常能提高10倍)。Kryo的缺點是不支持所有類型,為了更好的性能,你需要提前注冊程序中所使用的類(class)。
可以在創建SparkContext之前,通過調用System.setProperty("spark.serializer", "spark.KryoSerializer"),將序列化方式切換成Kryo。
但是Kryo需要用戶進行注冊,這也是為什么Kryo不能成為Spark序列化默認方式的唯一原因,但是建議對於任何“網絡密集型”(network-intensive)的應用,都采用這種方式進行序列化方式。
#####44. 什么是累加器?
> 累加器用來把Executor端變量信息聚合到Driver端。在Driver程序中定義的變量,在Executor端的每個Task都會得到這個變量的一份新的副本,每個task更新這些副本的值后,傳回Driver端進行merge(合並)。
> 當內置的Accumulator無法滿足要求時,可以繼承AccumulatorV2實現自定義的累加器。
> 實現自定義累加器的步驟:
> 1. 繼承AccumulatorV2,實現相關方法
> 2. 創建自定義Accumulator的實例,然后在SparkContext上注冊它
#####45. 什么是廣播變量?
廣播變量用來高效分發較大的對象。向所有工作節點發送一個較大的只讀值,以供一個或多個 Spark 操作使用
使用廣播變量的過程如下:
(1)通過對一個類型 T 的對象調用 SparkContext.broadcast 創建出一個 Broadcast[T]
對象。 任何可序列化的類型都可以這么實現。
(2)通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)。
(3)變量只會被發到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節 點)。
#####46. 節點和task 執行的關系?
1) 每個節點可以起一個或多個Executor。
2) 每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task。
3) 每個Task執行的結果就是生成了目標RDD的一個partiton。
注意: 這里的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作線程。而 Task被執行的並發度 = Executor數目 * 每個Executor核數。
#####47. diver 失敗了如何恢復?
1. --supervise 在集群模式下,driver 失敗了自動重啟
2. 對driver 的元數據做ck
#####48. cluster 模式如何查看日志?
第一種 是聚合日志方式(推薦,比較常用)
這種方式的話,顧名思義,就是說,將散落在集群中各個機器上的日志,最后都給聚合起來,讓我們可以統一查看
如果打開了日志聚合的選項,即yarn.log-aggregation-enable,container的日志會拷貝到hdfs上去,並從機器中刪除
對於這種情況,可以使用yarn logs -applicationId <app ID>命令,來查看日志
yarn logs命令,會打印出application對應的所有container的日志出來,當然,因為日志是在hdfs上的,我們自然也可以通過hdfs的命令行來直接從hdfs中查看日志
日志在hdfs中的目錄,可以通過查看yarn.nodemanager.remote-app-log-dir和yarn.nodemanager.remote-app-log-dir-suffix屬性來獲知
第二種 web ui(如果你有精力的話,可以去配一下)
日志也可以通過spark web ui來查看executor的輸出日志
但是此時需要啟動History Server,需要讓spark history server和mapreduce history server運行着
並且在yarn-site.xml文件中,配置yarn.log.server.url屬性
spark history server web ui中的log url,會將你重定向到mapreduce history server上,去查看日志
第三種 分散查看(通常不推薦)
如果沒有打開聚合日志選項,那么日志默認就是散落在各個機器上的本次磁盤目錄中的,在YARN_APP_LOGS_DIR目錄下
根據hadoop版本的不同,通常在/tmp/logs目錄下,或者$HADOOP_HOME/logs/userlogs目錄下
如果你要查看某個container的日志,那么就得登錄到那台機器上去,然后到指定的目錄下去,找到那個日志文件,然后才能查看
#####49. spark的優化? 10條以上
為什么需要調優??
程序都是能跑的,集群還是那個集群,但是有可能另外一個會調優的人和你寫的代碼的運行的速度要幾倍甚至幾十倍
1.開發調優
1.1 原則一:避免創建重復的RDD
我們有一份數據 ,student.txt
第一個需求 :wordCount val stuRDD = sc.textFile("e://sparkData//student.txt")
第二個需求:算有多少個學生 val stuRDD01 = sc.textFile("e://sparkData//student.txt")
如果創建兩份就會加載兩次,浪費性能。但是根據我們的需要來說,同樣的算子,需要使用兩次,那怎么辦??
進行持久化即可:
sc.textFile("e://sparkData//student.txt").cache()
1.2 原則二:盡可能使用同一個RDD
這種是大家在開發中,經常寫着寫着就忘記了
舉個例子:
val namesRDD = starsRDD.map(_._1)
val name2LengthRDD = namesRDD.map(name => (name,name.length))
// 這兩個map是可以合並的
val name2LengthRDD01 = starsRDD.map(tuple => (tuple._1,tuple._1.length))
下面的這種方式寫RDD性能更優,因為減少了一次RDD的計算
1.3 原則三:對多次使用的RDD進行持久化
要注意的持久化級別的選擇:
1.優先采用MEMORY_ONLY,d但是前提是你的內存足夠大,否則可能導致OOM(out of memory 異常)
2.如果MEMORY_ONLY內存不足,就采用MEMORY_ONLY_SER持久化級別,序列化之后,把數據占用的內存變少了, 但是序列化和之后使用的反序列化得消耗cpu
3.以上是純內存持久化,速度很快,但是如果MEMORY_ONLY_SER還是內存不夠,那么就采用 MEMORY_AND_DISK_SER,采用這種策略,會優先的把數據放在內存中,內存不足放入磁盤
4.不建議使用純的DISK方案,這樣很慢,_2在一些特殊場景(Spark Streaming 容錯要求更高)使用以外,一般 不建議
1.4 原則四:盡量避免使用shuffle類算子
減少分區
Broadcast + map + filter 代替 join
對於join,大表join小表,可以考慮把小表的數據廣播到executor中,通過map + filter的操作完成join的功 能
1.5 原則五:使用map-side預聚合的shuffle操作
定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預聚合的算子。
就是用reduceByKey 代替groupByKey
如果同樣的一個需求,用reduceByKey的性能比groupByKey好很多,可以大大減少數據的網絡傳輸
1.6 原則六:使用高性能的算子
有些需求,很多算子都能使用,但是性能不一樣,用性能更高算子解決
例如:
使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map,使用mapPartitions會出現OOM(內存溢出)的問題
使用foreachPartitions替代foreach,類似mapPartitions替代普通map,相比於上面的來說 ,這是一個 action算子 ,讀寫數據庫例子
使用filter之后進行coalesce操作
補充:repartition 和 coalesce 的使用場景
repatriation有shuffle,一般是把分區數變多,目的提高並行度
val rdd02 = rdd01.filter(xxx) --> 有的分區會過濾很多,有的可能過濾的很少
coalesce 一般來說是把分區數變少,就是把分區數合並,
rdd02.coalesce() ,並行度雖然降低了,但是資源利用率更高,反而可能提高性能
如果需要減少的分區特別少,
rdd01 是20個分區--》rdd02: 5個分區, val rdd02 = rdd01.coalesce(5,true)/rdd01.repartition(5),這樣比較好
1.7 原則七:廣播大變量
好處是:
如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播后的變量,會保證每個 Executor的內存中,
只駐留一份變量副本,Executor有多個,而Executor中的task執行時共享該Executor中的那份變量副本。這樣 的話,
可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對Executor內存的占用開銷,降低GC的頻 率。
1.8 原則八:使用Kryo優化序列化性能
spark的序列化? java Kryo
做如下配置:
//創建SparkConf對象。
val conf =new SparkConf().setMaster(...).setAppName(...)
//設置序列化器為KryoSerializer。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//注冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
1.9 原則九:優化數據結構
對象,字符串,集合都比較占用內存
字符串代替對象
數組代替集合
使用原始類型(比如Int、Long)替代字符串
使用起來太難,不實用
2.0 資源調優
在executor里面,內存會被分為幾個部分:
第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;
第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占 Executor總內存的20%;
spark.shuffle.memoryFraction
用來調節executor中,進行數據shuffle所占用的內存大小默認是0.2
第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
spark.storage.memoryFraction
用來調節executor中,進行數據持久化所占用的內存大小,默認是0.6
2.1
理解
補充:spark如何配置參數
1.在代碼中如何配置參數:
conf.set(key,value)
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
對於資源調優,有下面幾個參數:
num-executors
作業中,一共給多少個executor
參數調優建議:每個Spark作業的運行一般設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。
設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。
executor-memory
參數說明:該參數用於設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,
而且跟常見的JVM OOM異常,也有直接的關聯。
640G內存 32g * 20 = 640G
20個executor
可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作業申請到的總內存量(也就是所有Executor進程的內存總和),
這個量是不能超過隊列的最大內存量的。此外,如果你是跟團隊里其他人共享這個資源隊列,那么申請的總內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你自己的Spark作業
占用了隊列所有的資源,導致別的同學的作業無法運行。
executor-cores
每個executor有多少個cpu 核心
這個核心指的並不是物理核心,指的是邏輯核心
i7 4核8線程 16/32
參數調優建議:Executor的CPU core數量設置為2~4個較為合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,
來決定每個Executor進程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,
也是避免影響其他同學的作業運行。
driver-memory
給dirver程序分配的內存,當有collect操作的時候,需要把dirver的內存給大一點
spark.default.parallelism
參數說明:該參數用於設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。
spark.default.parallelism = num-executors * executor-cores (2--3倍)
這樣設置之后,那么每個cpu 都是有2-3個task
10個executor 每個executor里面有4個core ,設置spark.default.parallelism = 120
運行executor的core有多少個 40個 ,120/40 = 每個core的task
task的總數,肯定得比分配到cpu core的數量多, 反之浪費資源,一般就是2-3倍比較合適
如何設置並行度:
如何設置一個Spark Application的並行度?
1. spark.defalut.parallelism 默認是沒有值的,如果設置了值比如說10,是在shuffle的過程才會起作用(val rdd2 = rdd1.reduceByKey(_+_) //rdd2的分區數就是10,rdd1的分區數不受這個參數的影響)
new SparkConf().set(“spark.defalut.parallelism”,”500“)
2、如果讀取的數據在HDFS上,增加block數,默認情況下split與block是一對一的,而split又與RDD中的partition對應,所以增加了block數,也就提高了並行度。
3、RDD.repartition,給RDD重新設置partition的數量
4、reduceByKey的算子指定partition的數量
val rdd2 = rdd1.reduceByKey(_+_,10) val rdd3 = rdd2.map.filter.reduceByKey(_+_)
5、val rdd3 = rdd1.join(rdd2) rdd3里面partiiton的數量是由父RDD中最多的partition數量來決定,因此使用join算子的時候,增加父RDD中partition的數量。
6、spark.sql.shuffle.partitions //spark sql中shuffle過程中partitions的數量
3.spark調優數據傾斜調優
map filter 這種會發生數據傾斜嗎??
問題:數據傾斜肯定是某些key發生了數據傾斜,那么如何知道是哪些key傾斜了??
1000萬條取2萬條出來
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
解決:
1.打散
2.過濾
2.腳本里面可以配置參數
使用格式:
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
舉例:./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
3.在配置文件中調節參數
conf/spark-defaults.conf配置文件中讀取配置選項。 在conf/spark-defaults.conf配置文件中,
每行是key-value對,中間可以是用空格進行分割,也可以直接用等號進行分割
問題 :這三個地方都可以配置參數 ,對於同樣的一個參數在三個地方都配置了,而且參數的value不一樣,那么
到底是那個生效??
優先級 :
在代碼中的優先級最高 --》 意味一旦寫入,在其他地方都不能修改了,除非出現修改代碼,打包運行,這樣不可取,除非有些參數寫入就不修改了,在這里面配置比較合適
在腳本中優先級其次 --》 非常靈活,一般來說比較適合在這里面寫入參數
在配置文件中最低 --》 以上兩種的配置參數,都是針對於該應用的參數,配置文件是全局的參數,優先級最低,更加適合寫有些全部都需要用到的參數
2.資源調優
3.數據傾斜調優
4.shuffle調優幾個部分
//////////////////////////////////
spark 優化原則
1.盡量讓計算操作在一個rdd里面進行
// 錯誤的做法。
// 有一個<Long, String>格式的RDD,即rdd1。
// 接着由於業務需要,對rdd1執行了一個map操作,創建了一個rdd2,而rdd2中的數據僅僅是rdd1中
的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)
// 分別對rdd1和rdd2執行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
說明: rdd2 是 k-v類型的rdd1 的v 經過某個操作轉變過來的
正確做法:
JavaPairRDD<Long, String> rdd1 = ... .Cache()
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
減少了RDD的生成
2. 盡量減少shuffle
// 傳統的join操作會導致shuffle操作。
// 因為兩個RDD中,相同的key都需要通過網絡拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將一個數據量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數據。
// 然后進行遍歷,如果發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那么就判定可以
進行join。
// 此時就可以根據自己需要的方式,將rdd1當前數據與rdd2中可以連接的數據,拼接在一起(String
或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建議僅僅在rdd2的數據量比較少(比如幾百M,或者一兩G)的情況下使用。
// 因為每個Executor的內存中,都會駐留一份rdd2的全量數據。
#####50. excutor內存的分配?
> 在executor里面,內存會被分為幾個部分:
> 第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;
> 第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;
> spark.shuffle.memoryFraction
> 用來調節executor中,進行數據shuffle所占用的內存大小默認是0.2
> 第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
> spark.storage.memoryFraction
> 用來調節executor中,進行數據持久化所占用的內存大小,默認是0.6
#####51. Rdd partition的個數有什么來決定的?
1. 默認的
2. 指定的
3. 從hdfs 讀取數據,由塊的個數
4. 從kafka讀取數據。由topic 的partition個數決定
#####52. 總結 spark Shuffle?
https://blog.csdn.net/young_0609/article/details/89643087
一、未經優化的HashShuffleManager
shuffle write:
stage結束之后,每個task處理的數據按key進行“分類”
數據先寫入內存緩沖區
緩沖區滿,溢出到磁盤文件
最終,相同key被寫入同一個磁盤文件
創建的磁盤文件數量 = 當前stagetask數量 * 下一個stage的task數量
shuffle read:
從上游stage的所有task節點上拉取屬於自己的磁盤文件
每個read task會有自己的buffer緩沖,每次只能拉取與buffer緩沖相同大小的數據,然后聚合,聚合完一批后拉取下一批
該拉取過程,邊拉取邊聚合
二、Sort shuffle
shuffle過程容易出現的主要問題就是內存溢出和頻繁的IO操作,導致程序異常和特別慢
1、寫入內存數據結構
注意:
shuffle中的定時器:定時器會檢查內存數據結構的大小,如果內存數據結構空間不夠,那么會申請額外的內存。申請到了,內存數據結構的大小變大,內存不夠,申請不到,則發生溢寫
2、排序
在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。
3、溢寫
排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。
4、merge
一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。
最后會將之前所有的臨時磁盤文件都進行合並,這就是merge過程,
此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。
此外,由於一個task就只對應一個磁盤文件,也就意味着該task為Reduce端的stage的task准備的數據都在這一個文件中,
因此還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。
SortShuffleManager由於有一個磁盤文件merge的過程,因此大大減少了文件數量。
比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,
而第二個stage有100個task。由於每個task最終只有一個磁盤文件,
因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。
三、bypass sort shuffle
1、
bypass運行機制的觸發條件如下:不需要排序和聚合的shuffle操作。
1)shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。
2)不是聚合類的shuffle算子(比如reduceByKey)。
此時task會為每個reduce端的task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,
將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。
最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁盤文件,
只是在最后會做一個磁盤文件的合並而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,
shuffle read的性能會更好。
53. sparkStreaming 的核心抽象?
> - 一旦啟動上下文,就無法設置新的流計算或將其添加到該流計算中
> - 上下文停止后,將無法重新啟動
> - JVM 中只能同時激活一個 StreamingContext
> - StreamingContext 上的 stop() 也會停止 SparkContext,如要僅停止 StreamingContext,請將名為 stopSparkContext 的 stop() 的可選參數設置為 false
> - 只要在創建下一個 StreamingContext 之前停止了上一個 StreamingContext(不停止 SparkContext),就可以重復使用 SparkContext 創建多個 StreamingContext
54. 什么是DStream?
一連串不間斷的batch,一個batch就是一個時間段的Rdd
DStream 由一系列連續的 RDD 表示,這是 Spark 對不可變的分布式數據集的抽象
##### 55.sparkStreaming和Storm對比?
> Storm
>
> 1. 實時計算模型 純實時,來一條數據,處理一條數據
> 2. 實時計算延遲度 毫秒級
> 3. 吞吐量 低
> 4. 事務機制 支持完善
> 5. 健壯性 / 容錯性 ZooKeeper,Acker,非常強
> 6. 動態調整並行度 支持
>
> Spark Streaming
>
> 1. 實時計算模型 准實時,對一個時間段內的數據收集起來,作為一個RDD,再處理
> 2. 實時計算延遲度 秒級
> 3. 吞吐量 高
> 4. 事務機制 支持,但不夠完善
> 5. 健壯性 / 容錯性 Checkpoint,WAL,一般
> 6. 動態調整並行度 不支持
##### 56.sparkStreaming的算子類型?
> Transformations 和 Output
##### 57.sparkStreaming 的receiver ?
> 會啟動一個線程單獨拉取數據
> 給定的cpu核數大於需要計算的流數
##### 58.sparkStreaming從kafka 獲取數據的兩種方式?
> - [Receiver](https://www.cnblogs.com/heml/p/6796414.html#_label0)
> - [Direct](https://www.cnblogs.com/heml/p/6796414.html#_label1)
> - [Direct代碼](https://www.cnblogs.com/heml/p/6796414.html#_label2)
>
> 簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據
>
> 1、拉取數據的方式
>
> Receiver采用kafka高級api,一次性拉取固定時間的數據后再進行處理,這可能造成一個問題:拉取的數據過多,放不下怎么辦?
>
> Direct采用kafka低級api,直接連接到kafka的分區,rdd中的分區與kafka中的分區是一一對應的,他是一邊拉取數據,一邊處理數據,到達設置的時間間隔后,就作為一個批次進行計算結果。
>
> 2、可靠性保證
>
> Receiver要保證數據不丟失,需要WAL,提供至少一次的語義。
>
> Direct可以提供一次且緊一次語義。
>
> 3、 高峰數據量過大的處理
>
> Receiver方式只能手動設置最大接收速率,不能自動調節數據接收速率。
>
> Direct可以使用反壓機制自動調節數據接收速率。
>
> 4、直連方式的區別
>
> Receiver接收固定時間間隔的數據(放在內存中),使用Kafka高級的API,自動維護偏移量,達到固定的時間才處理,效率低且容易丟失數據;
>
> Direct直連方式,相當於直接連接到Kafka分區上,使用Kafka底層的API,需要自己維護偏移量,效率高。
##### 59.sparkStreaming從kafka 獲取的 message 包含哪些?
record.checksum() // 記錄的校驗和
record.offset() // 偏移量
record.partition() // 分區
record.timestamp() // 時間戳
record.timestampType() // 時間戳類型
record.topic() // 主題
record.key() // 鍵(如果未指定鍵,則為null)
record.hashCode() // 字符串的哈希碼
record.value().toString //值