Spark的核心RDD
Resilient Distributed Datasets(彈性分布式數據集)
Spark運行原理與RDD理論
Spark與MapReduce對比,MapReduce的計算和迭代是基於磁盤的,而Spark的迭代和計算是盡量基於內存,只有在內存空間不能容納計算結果時才將溢出的部分數據緩沖到磁盤存儲,因此Spark是將內存與磁盤結合起來使用的一種架構,它既可以適應超大型的批量離線數據集處理(因為它可以基於磁盤),也可以適應基於實時的流數據分析計算(因為它可以基於內存迭代)。
Spark是分布式的基於內存的迭代式計算框架,當然它也可以基於磁盤做迭代計算,根據官方說法,基於磁盤的迭代計算也會比基於磁盤的MapReduce計算快10多倍,而基於內存的迭代計算則比基於內存的MapReduce計算快100倍以上;同時由於Spark是迭代式的計算,因此它天生擅長於將多步驟的Job通過應用層面向過程的流水化操作將其轉換成底層的多個Job串聯操作,這在第一代計算框架MapReduce中需要我們程序員手動分解多個步驟的操作到各個Job作業中去,於此我們如果基於MR來編寫Job應用會耗費大量的代碼;雖然Hive可以通過HQL來簡化MR的編寫過程,但是畢竟Hive的簡化是很有限度的,而且Hive數據倉庫的HQL查詢仍然是基於磁盤的(因為底層仍然是執行MR代碼)。
Spark的迭代計算過程中,從一個節點計算完成之后的數據到下一個節點的傳送過程需要歷經一個shuffle過程,如果一個應用需要多個Job步驟來支撐則自然會涉及到多個節點傳送數據的多次shuffle過程。
Spark的主要功能模塊
交互式查詢:基於Spark-Shell的交互式查詢和基於Spark-Sql的交互式查詢
流式數據處理:實時計算分析
批量數據處理:離線分析計算
Spark客戶端Driver—>SparkContext
RDD(Resilient Distributed Datasets(彈性分布式數據集))
RDD是Spark的運行時(Spark Runtime)內核實現,內核涵蓋兩個部分:
A、數據源:
Spark處理的數據來源可以是HDFS、Hive、HBase、S3(亞馬遜雲儲存平台)、MySql、Oracle等數據源;處理之后的數據也可以被輸出到HDFS、Hive、HBase、S3、MySql、Oracle及其當前的控制台終端
B、調度器:Yarn、Mesos、AWS
RDD被稱為彈性分布式數據集,它具有如下特征:
A、彈性與伸縮性:
1)、自動進行磁盤數據迭代和內存數據迭代兩種操作模式的切換,數據優先在內存中進行迭代,當內存容量有限時再將溢出部分數據緩沖到磁盤進行迭代
2)、作業鏈步驟的高度容錯機制
當一個應用存在多個操作步驟(Stage:階段)時,如果在作業執行過程中某個步驟出錯,則只需要從出錯步驟的前一個步驟后重新執行后面的步驟來恢復;而無需從整個應用的第一個步驟開始恢復
3)、作業步驟如果出錯或失敗則會自動進行特定次數(默認為3次)的重試(該重試包括步驟本身的重試和基於步驟操作底層節點Task計算失敗的重試(默認4次))
4)、重新提交或重試時僅提交計算失敗的那些數據分片來重新計算,這可以從細粒度上控制重新計算的數據量
注:Spark中對操作步驟(操作階段)Stage的定義:
1)、如果數據涉及到從一個節點轉移並重新分配數據到另一個節點則表示從一個Stage過渡到另一個Stage;從MapReduce原理的角度來理解就是當數據發生reduce、shuffle、sort等操作時都將涉及到數據在節點之間轉移;而從SQL概念的角度來理解就是當發生group by、distinct、order by、join、union等操作時都將涉及到數據在節點之間轉移;
2)、如果數據操作並沒有導致數據在節點之間發生轉移(即在同一個節點上進行的本地操作)則表示為Spark概念中的一個Stage內執行的操作,一個Stage內執行的操作相當於MapReduce概念中的map任務操作,這種操作執行的並發度很高(因為這種操作充分利用了集群節點的並發計算能力),而對於SQL概念而言則相當於where和having操作。
application>job>stage>task(jvm)
在以下情況下RDD會緩存操作步驟中的中間數據(注意:並不一定會緩存每一個步驟的中間結果):
1、某個計算步驟非常耗時則會緩存此步驟的計算結果
2、計算步驟鏈很長時會加大緩存步驟計算結果的頻次
3、shuffle到其它節點上的數據會被shuffle到的目標節點緩存一次
4、從一個作業切換到下一個作業時將發生一次checkpoint操作,在此,於checkpoint操作之前會緩存上一個作業的中間結果;checkpoint操作會將數據放置於磁盤文件系統中去以保證數據不發生丟失
RDD從邏輯上看是一個抽象分布式數據集的概念,它的底層數據存儲於集群中不同節點上的磁盤文件系統中,存儲是按照分區(partition)方式進行存儲;所有Spark操作都可以看成是一系列對RDD對象的操作,而RDD是數據集合的抽象,它可以使用SparkContext(Spark上下文)來創建,SparkContext是Spark集群操作的入口,如果是在Spark-Shell下操作,則Spark會自動創建一個基於已有配置的默認SparkContext對象,如果是自己編寫作業Jar則需要自己手動創建(與Hadoop中的FileSystem一樣可以通過Configution配置參數來構建,也可以基於classpath中的配置文件來構建);
一個簡單的測試:
//下面是通過Spark上下文調用textFile函數創建一個包裝好底層數據集的RDD對象: //這里沒有指定前綴hdfs://YunMaster01:9000,但是SparkContext(Spark上下文,即sc對象)中已經封裝了這些默認的配置信息,即創建Spark上下文對象時就已經封裝了這些上下文配置信息(包括操作的底層數據源,如這里的HDFS);這里最好不要加上hdfs前綴,因為加上了之后代碼就被編譯后固化了,造成不可配置,如果不加則SparkContext可以基於配置來自動選擇(數據源類型可以通過配置來改變),提升高可維護性。 scala>val dataSet=sc.textFile("/spark/input") //SparkContext調用textFile函數將返回一個org.apache.spark.rdd.RDD[String]類型的對象(RDD在Spark的架構源碼中被Scala定義為一個接口(trait)類型;方括號中的String表示RDD集合中的元素類型為字符串類型);此RDD的具體實現是MapPartitionRDD(即:字典分區RDD);textFile操作僅僅是執行數據抽象,並沒有立即執行數據的讀取(read)操作,數據的讀取操作會延遲到執行action動作時才發生;因此textFile函數屬於一種transformation動作,transformation動作是lazy級別的操作。 //查看數據源的輸入路徑(如果數據源使用HDFS則顯示為HDFS路徑),以下如不特別加以說明默認都是基於HDFS的數據源(因為這個最常用) //toDebugString函數會根據數據集的來源路徑(RDD依賴)進行反向推理並從上到下依次顯示RDD路徑源列表 scala>dataSet.toDebugString() //返回數據集中的記錄數量,這會啟動一個action(textFile函數是一種轉換(transformation),不會啟動action);action是指Spark會在底層啟動作業(Job)或任務(Task)的計算操作;而轉換(transformation)僅僅是實現數據的封裝和抽象(只涉及到數據在節點本地上的抽象讀(注意:沒有真正意義上的讀,因為沒有產生任何IO操作),但是數據的寫操作則屬於action操作,因為它涉及到IO操作過程,比如:saveAsTextFile函數) scala>dataSet.count //Spark中的一個分區(partition)相當於HDFS中的一個塊(Block),即Spark中一個partition的尺寸等於一個HDFS文件塊的尺寸(BlockSize=128M) //下面調用flatMap函數處理dataSet集合中的每一行,此函數調用完成之后又將產生一個新的RDD對象結果集,實際上所有的Spark操作都是基於RDD集合對象的操作(一切皆為RDD操作) scala>val dataSet02=dataSet.flatMap(_.split(" ")) //實現第二次映射操作產生新的RDD集合對象 scala>val dataSet03=dataSet02.map(word=>(word,1)) //上面的兩個map算子相當於MapReduce中的map方法的操作;而下面與reduce相關的算子則相當於MapReduce中的reduce方法的操作 //由於reduce操作涉及到在節點之間轉移和重新分配數據,因此此過程涉及到shuffle機制,執行reduce操作過程中產生的shuffle是為了歸並相同Key的記錄到同一分組以執行SQL概念中的分組統計 scala>val dataSet04=dataSet03.reduceByKey(_+_) //從上面的執行過程來看,Spark的底層計算原理與Hadoop的底層計算原理是類同的,只是Spark在此基礎上引入了RDD的概念,將一切的操作歸入集合RDD的操作(實際上,Hadoop中MapReduce操作本身就是針對集合對象進行操作,只不過Hadoop沒有抽象出這樣一個概念而已),由於引入了RDD的概念,所以Spark可以將一切操作(應用的所有操作步驟)的輸入源和輸出源都抽象為RDD對象的操作,這樣以來,各個操作之間就可以通過RDD對象鏈接成操作鏈(上一個操作的輸出源是一個RDD對象,該RDD對象可以直接作為下一個操作的輸入源) //將應用執行過程中處理后的數據最終存儲到HDFS文件系統中去(此過程是action操作) scala>dataSet04.saveAsTextFile("/spark/output")
從瀏覽器終端看到的Tasks任務詳情列表中的Locality Level字段表示RDD迭代計算的數據來源:
A、PROCESS_LOCAL:來自於內存 |
B、NODE_LOCAL:來自於磁盤 |
C、ANY:來自於其它節點的shuffle |
WordCount計算原理介紹:
A、textFile函數:返回一個HadoopRDD,HadoopRDD在執行action操作計算數據時會將數據從分布式磁盤讀取到分布式內存中,即從worker節點的本地磁盤讀取到worker節點的本地內存中,RDD更多的是指分布式內存存儲,一種數據的邏輯存儲系統;
HadoopRDD產生的是一個元組集合(即集合中的每個元素是一個元組類型),元組的第一個元素是行記錄的索引(Key,字節偏移量),元組的第二個元素是行記錄內容本身(Value);
基於HadoopRDD會繼續產生一個字符串集合MapPartitionRDD,此集合中的每個字符串代表行記錄內容(沒有行記錄的索引,丟棄了字節偏移量Key)
B、flatMap函數:此函數仍然產生一個字符串MapPartitionRDD集合
C、map函數:此函數產生一個二元素元組類型的MapPartitionRDD集合
D、reduceByKey函數:基於相同Key的Value進行統計;先進行本地統計(執行卡賓函數統計可以減少數據傳送量,降低網絡負載),再進行集群模式統計(shuffle);本地統計之后的結果會根據分區策略將統計后的基於二元素元組的MapPartitionRDD集合數據寫出到不同的本地文件中;在數據被傳送到另一個節點之前的所有操作都是在同一個節點上的操作;在此,於同一個節點上的所有操作都是在同一個Stage中;同一個Stage中的所有操作都是基於內存的鏈式迭代操作(你也可以在這個迭代過程中手動緩存中間結果);我們通常說Spark是基於內存迭代的,其緣由就在於此。
E、shuffle是整個分布式計算的性能瓶頸所在,shuffle過程是從一個節點將數據傳送到另一個節點的過程(即便在同一個節點上也是切換到另一個JVM進程來處理),這個過程中會針對每一份數據至少產生兩次IO(本次磁盤IO、網絡IO);shuffle過程是依據數據分區策略來進行的,shuffle之后將從一個Stage階段過渡到另一個新的Stage階段,shuffle之后將在另一個新的Stage階段產生一個基於二元素元組的ShuffledRDD集合。
F、saveAsTextFile函數:將ShuffledRDD集合轉換為MapPartitionRDD(實際上就是追加一個字節偏移量Key將每一個元組再包裝成一個二元素元組(第一個元素是字節偏移量Key)),然后將其寫出到HDFS文件系統中去。