0.spark簡介
Spark是整個BDAS的核心組件,是一個大數據分布式編程框架,不僅實現了MapReduce的算子map 函數和reduce函數及計算模型,還提供更為豐富的算子,如filter、join、groupByKey等。是一個用來實現快速而同用的集群計算的平台。Spark將分布式數據抽象為彈性分布式數據集(RDD),實現了應用任務調度、RPC、序列化和壓縮,並為運行在其上的上層組件提供API。其底層采用Scala這種函數式語言書寫而成,並且所提供的API深度借鑒Scala函數式的編程思想,提供與Scala類似的編程接口
執行器作用:
負責運行組成Spark應用的任務,並將結果返回給驅動器進程;
通過自身的塊管理器(blockManager)為用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在執行器進程內的,因此任務可以在運行時充分利用緩存數據加快運算。
驅動器的職責:
所有的Spark程序都遵循同樣的結構:程序從輸入數據創建一系列RDD,再使用轉化操作派生成新的RDD,最后使用行動操作收集或存儲結果RDD,Spark程序其實是隱式地創建出了一個由操作組成的邏輯上的有向無環圖DAG。當驅動器程序執行時,它會把這個邏輯圖轉為物理執行計划。這樣 Spark就把邏輯計划轉為一系列步驟(stage),而每個步驟又由多個任務組成。這些任務會被打包送到集群中。
1、RDD是什么
RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一個只讀的,可分區的分布式數據集,這個數據集的全部或部分可以緩存在內存中,在多次計算間重用。
分區是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被划分為多個分片,每一個分片稱為分區,分區的格式決定了並行計算的粒度,而每個分區的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(准確來說是作業最后一個RDD)的分區數決定。
RDD分區的一個分區原則:盡可能是得分區的個數等於集群核心數目
Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群的不同節點上。創建RDD的方法有兩種:一種是讀取一個外部數據集;一種是在群東程序里分發驅動器程序中的對象集合,不如剛才的示例,讀取文本文件作為一個字符串的RDD的示例。
創建出來后,RDD支持兩種類型的操作:轉化操作和行動操作
轉化操作會由一個RDD生成一個新的RDD。
行動操作會對RDD計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(比如HDFS)中。
RDD計算方式


下面對RDD的五個特性進行解釋:
1、有一個分片列表。就是能被切分,和hadoop一樣的,能夠切分的數據才能並行計算。
2、有一個函數計算每一個分片,這里指的是下面會提到的compute函數。
3、對其他的RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。
4、可選:key-value型的RDD是根據哈希來分區的,類似於mapreduce當中的Paritioner接口,控制key分到哪個reduce。
5、可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,可以將處理的分區“本地化”)
為什么會產生RDD?
(1)傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在迭代計算式要進行大量的磁盤IO操作。RDD正是解決這一缺點的抽象方法
(2)RDD的具體描述RDD(彈性分布式數據集)是Spark提供的最重要的抽象的概念,它是一種有容錯機制的特殊集合,可以分布在集群的節點上,以函數式編程操作集合的方式,進行各種並行操作。可以將RDD理解為一個具有容錯機制的特殊集合,它提供了一種只讀、只能由已存在的RDD變換而來的共享內存,然后將所有數據都加載到內存中,方便進行多次重用。
a.它是分布式的,可以分布在多台機器上,進行計算。
b.它是彈性的,計算過程中內存不夠時它會和磁盤進行數據交換(緩存管理)。
c.這些限制可以極大的降低自動容錯開銷
d.實質是一種更為通用的迭代並行計算框架,用戶可以顯示地控制計算的中間結果,然后將其自由運用於之后的計算。
(3)RDD的容錯機制實現分布式數據集容錯方法有兩種:數據檢查點和記錄更新,RDD采用記錄更新的方式:記錄所有更新點的成本很高。所以,RDD只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操作,然后創建某個RDD的變換序列(血統 lineage)存儲下來;變換序列指,每個RDD都包含了它是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統”容錯。 要實現這種“血統”容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關系。實際上依賴關系可以分兩種,窄依賴和寬依賴。窄依賴:子RDD中的每個數據塊只依賴於父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊可以依賴於父RDD中的所有數據塊。例如:map變換,子RDD中的數據塊只依賴於父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴於多塊父RDD中的數據塊,因為一個key可能分布於父RDD的任何一個數據塊中, 將依賴關系分類的兩個特性:第一,窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成之后,並且父RDD的計算結果進行hash並傳到對應節點上之后才能計算子RDD。第二,數據丟失時,對於窄依賴只需要重新計算丟失的那一塊數據來恢復;對於寬依賴則要將祖先RDD中的所有數據塊全部重新計算來恢復。所以在“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查點。也是這兩個特性要求對於不同依賴關系要采取不同的任務調度機制和容錯恢復機制。
(4)RDD內部的設計每個RDD都需要包含以下四個部分:
a.源數據分割后的數據塊,源代碼中的splits變量
b.關於“血統”的信息,源碼中的dependencies變量
c.一個計算函數(該RDD如何通過父RDD計算得到),源碼中的iterator(split)和compute函數d.一些關於如何分塊和數據存放位置的元信息,如源碼中的partitioner和preferredLocations例如:a.一個從分布式文件系統中的文件得到的RDD具有的數據塊通過切分各個文件得到的,它是沒有父RDD的,它的計算函數知識讀取文件的每一行並作為一個元素返回給RDD;b.對與一個通過map函數得到的RDD,它會具有和父RDD相同的數據塊,它的計算函數式對每個父RDD中的元素所執行的一個函數
2、RDD在Spark中的地位及作用
(1)為什么會有Spark?因為傳統的並行計算模型無法有效的解決迭代計算(iterative)和交互式計算(interactive);而Spark的使命便是解決這兩個問題,這也是他存在的價值和理由。
(2)Spark如何解決迭代計算?其主要實現思想就是RDD,把所有計算的數據保存在分布式的內存中。迭代計算通常情況下都是對同一個數據集做反復的迭代計算,數據在內存中將大大提升IO操作。這也是Spark涉及的核心:內存計算。
(3)Spark如何實現交互式計算?因為Spark是用scala語言實現的,Spark和scala能夠緊密的集成,所以Spark可以完美的運用scala的解釋器,使得其中的scala可以向操作本地集合對象一樣輕松操作分布式數據集。
(4)Spark和RDD的關系?可以理解為:RDD是一種具有容錯性基於內存的集群計算抽象方法,Spark則是這個抽象方法的實現。
3、RDD底層實現原理
RDD是一個分布式數據集,顧名思義,其數據應該分部存儲於多台機器上。事實上,每個RDD的數據都以Block的形式存儲於多台機器上,下圖是Spark的RDD存儲架構圖,其中每個Executor會啟動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注冊該Block,BlockManagerMaster管理RDD與Block的關系,當RDD不再需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。

4、RDD cache的原理
RDD的轉換過程中,並不是每個RDD都會存儲,如果某個RDD會被重復使用,或者計算其代價很高,那么可以通過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?
RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,通過CacheManager把RDD計算出來,並存儲到BlockManager中,下次獲取該RDD的數據時便可直接通過CacheManager從BlockManager讀出。
What is Data Locality - RDD的位置可見性(location preference)
這個問題就不重復造輪子了,直接引用Quora上的一個問答了:
RDD is a dataset which is distributed, that is, it is divided into “partitions”. Each of these partitions can be present in the memory or disk of different machines. If you want Spark to process the RDD, then Spark needs to launch one task per partition of the RDD. It’s best that each task be sent to the machine have the partition that task is supposed to process. In that case, the task will be able to read the data of the partition from the local machine. Otherwise, the task would have to pull the partition data over the network from a different machine, which is less efficient. This scheduling of tasks (that is, allocation of tasks to machines) such that the tasks can read data “locally” is known as “locality aware scheduling”.
5、如何操作RDD?
RDD的所有轉換操作都是lazy模式,即Spark不會立刻計算結果,而只是簡單的記住所有對數據集的轉換操作。這些轉換只有遇到action操作的時候才會開始計算
(1)如何獲取RDDa.從共享的文件系統獲取,(如:HDFS)b.通過已存在的RDD轉換c.將已存在scala集合(只要是Seq對象)並行化 ,通過調用SparkContext的parallelize方法實現d.改變現有RDD的之久性;RDD是懶散,短暫的。(RDD的固化:cache緩存至內存; save保存到分布式文件系統)
(2)操作RDD的兩個動作a.Actions:對數據集計算后返回一個數值value給驅動程序;例如:Reduce將數據集的所有元素用某個函數聚合后,將最終結果返回給程序。b.Transformation:根據數據集創建一個新的數據集,計算后返回一個新RDD;例如:Map將數據的每個元素經過某個函數計算后,返回一個新的分布式數據集。
在spark新版中,也許會有更多的action和transformation,可以參照spark的主頁
6. RDD工作原理
RDD(Resilient DistributedDatasets)[1] ,彈性分布式數據集,是分布式內存的一個抽象概念,RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而創建,然而這些限制使得實現容錯的開銷很低。對開發者而言,RDD可以看作是Spark的一個對象,它本身運行於內存中,如讀文件是一個RDD,對文件計算是一個RDD,結果集也是一個RDD ,不同的分片、數據之間的依賴、key-value類型的map數據都可以看做RDD。
主要分為三部分:創建RDD對象,DAG調度器創建執行計划,Task調度器分配任務並調度Worker開始運行。
SparkContext(RDD相關操作)→通過(提交作業)→(遍歷RDD拆分stage→生成作業)DAGScheduler→通過(提交任務集)→任務調度管理(TaskScheduler)→通過(按照資源獲取任務)→任務調度管理(TaskSetManager)
Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。
Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統中
轉換(Transformations)(如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,並不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
操作(Actions)(如:count, collect, save等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。
它們本質區別是:Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。關於這兩個動作,在Spark開發指南中會有就進一步的詳細介紹,它們是基於Spark開發的核心。
7.RDD的寬窄依賴

窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每個分區都只被子 RDD 的一個分區所使用 。相應的,那么寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴。例如, map 就是一種窄依賴,而 join 則會導致寬依賴
這種划分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基於一對一的關系,可以在 filter 之后執行 map 。其次,窄依賴支持更高效的故障還原。因為對於窄依賴,只有丟失的父 RDD 的分區需要重新計算。而對於寬依賴,一個結點的故障可能導致來自所有父 RDD 的分區丟失,因此就需要完全重新執行。因此對於寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出一樣。
Spark數據分區
Spark的特性是對數據集在節點間的分區進行控制。在分布式系統中,通訊的代價是巨大的,控制數據分布以獲得最少的網絡傳輸可以極大地提升整體性能。Spark程序可以通過控制RDD分區方式來減少通訊的開銷。
Spark中所有的鍵值對RDD都可以進行分區。確保同一組的鍵出現在同一個節點上。比如,使用哈希分區將一個RDD分成了100個分區,此時鍵的哈希值對100取模的結果相同的記錄會被放在一個節點上。
(可使用partitionBy(newHashPartitioner(100)).persist()來構造100個分區)
Spark中的許多操作都引入了將數據根據鍵跨界點進行混洗的過程。(比如:join(),leftOuterJoin(),groupByKey(),reducebyKey()等)對於像reduceByKey()這樣只作用於單個RDD的操作,運行在未分區的RDD上的時候會導致每個鍵的所有對應值都在每台機器上進行本地計算。
SparkSQL的shuffle過程


Spark SQL的核心是把已有的RDD,帶上Schema信息,然后注冊成類似sql里的”Table”,對其進行sql查詢。這里面主要分兩部分,一是生成SchemaRD,二是執行查詢。
如果是spark-hive項目,那么讀取metadata信息作為Schema、讀取hdfs上數據的過程交給Hive完成,然后根據這倆部分生成SchemaRDD,在HiveContext下進行hql()查詢。