Spark_總結一
1.Spark介紹
1.1什么是Spark?
Apache Spark是一個開源的集群計算框架,使數據計算更快(
高效運行,快速開發)
1.2Spark比Hadoop快的兩個原因
第一,內存計算
第二,DAG(有向無環圖)
2.Spark運行模式(四種 )
| Local | 多用於測試 |
| Standalone | Spark自帶的資源調度器(默認情況下就跑在這里面) |
| MeSOS | 資源調度器,同Hadoop中的YARN |
| YARN | 最具前景,公司里大部分都是 Spark on YRAN |
3.Spark內核之RDD的五大特性
Resilient Distributed Dataset
RDD是基礎-->彈性分布式數據集
第一大特性:
RDD由一系列的partitions組成(如果數據源在HDFS上,默認partition的數量與block的個數一致,Spark並沒有讀取HDFS的方法,它是沿用MR的方法,MR讀取HDFS上的數據時首先會進行split,RDD中每一個partition與split對應,split默認與block的大小一致,所以
默認partition的數量與block的個數一致
)
第二大特性:每一個函數實際上是作用在RDD中的每一個partition上
第三大特性:RDD是由一系列的依賴關系的(這里體現出了RDD的彈性,彈性一,數據容錯;彈性二,partition可大可小)
第四大特性:partitioner(分區器)是作用在KV格式的RDD上(RDD執行聚合類函數的時候會產生shuffle,Spark產生shuffle肯定會有partitioner,而partitioner是作用在KV格式的RDD上,推測出聚合類函數必須作用在KV格式的RDD上)
第五大特性:每一個RDD提供了最佳的計算位置,告訴我們每一個partition所在的節點,然后相對應的task就會移動到該節點進行計算(移動計算,而不是移動數據)
4.Spark運行機制
開機啟動時
Driver 、
Worker
、和
Application
會將自己的資源信息注冊到
Master
中,當初始化的時候,
Master
先為
Driver
分配資源然后啟動
Driver。
Driver 運行時先從 main()方法開始,任務在 Worker 上執行,Worker 可以是一台真實的物理機,也可以是虛擬機,擁有 RAM 和 Core。
然后會將
Task
移動到本地的數據上執行運算。
最優計算位置Inputdata 和 Task 在一起(避免了網絡間的信息傳輸)
。實際情況很少會這樣, 有可能存在當前那個計算節點的計算資源和計算能力都滿了,默認配置不變的情況下 Spark 計算框架會等待 3s(
spark.locality.wait
設置的,在 SparkConf()可以修改),默認重試 5 次。如果均失敗了,會選擇一個比較差的本地節點;Spark 分配算法會將其分配到計算數據附近的節點, Task 會通過其所在節點的 BlockManager 來獲取數據,BlockManager 發現自己本地沒有數據,會通過getRemote()方法,通過 TransferService(網絡數據傳輸組件)從原 task 所在節點的 BlockManager 中,獲取數據,通過網絡傳輸回 Task 所在節 點----->(性能大幅度下降,大量的網絡 IO 占用資源) 計算后的結果會返回 到 Driver 上
5.Spark運行時
Driver(SparkContext運行所在的節點可以看做一個Driver)作用:
分發task給對應的Worker,可以和其他節點(Worker)進行通信
接收task的計算結果
Worker作用:
Worker 可以是一台真實的物理機,也可以是虛擬機,擁有 RAM 和 Core,執行運算
6.Spark算子--Transformations || Actions
|
Transformations || Actions 這兩類算子的區別
|
||
|
Transformations
|
Transformations類的算子會返回一個新的RDD,懶執行 | |
|
Actions
|
Actions類的算子會返回基本類型或者一個集合,能夠觸發一個job的 執行,代碼里面有多少個action類算子,那么就有多少個job | |
常見的算子
| Transformation類算子 | map | 輸入一條,輸出一條 將原來 RDD 的每個數據項通過 map 中的用戶自定義函數映射轉變為一個新的 元素。輸入一條輸出一條; |
| flatMap | 輸入一條輸出多條 先進行map后進行flat |
|
| mapPartitions | 與 map 函數類似,只不過映射函數的參數由 RDD 中的每一個元素變成了 RDD 中每一個分區的迭代器。將 RDD 中的所有數據通過 JDBC 連接寫入數據庫,如果使 用 map 函數,可能要為每一個元素都創建一個 connection,這樣開銷很大,如果使用 mapPartitions,那么只需要針對每一個分區建立一個 connection。 | |
| mapPartitionsWithIndex | ||
| filter | 依據條件過濾的算子 | |
| join | 聚合類的函數,會產生shuffle,必須作用在KV格式的數據上 join 是將兩個 RDD 按照 Key 相同做一次聚合;而 leftouterjoin 是依 據左邊的 RDD 的 Key 進行聚 |
|
| union | 不會進行數據的傳輸,只不過將這兩個的RDD標識一下 (代表屬於一個RDD) |
|
| reduceByKey | 先分組groupByKey,后聚合根據傳入的匿名函數聚合,適合在 map 端進行 combiner | |
| sortByKey | 依據 Key 進行排序,默認升序,參數設為 false 為降序 | |
| mapToPair | 進行一次 map 操作,然后返回一個鍵值對的 RDD。(所有的帶 Pair 的算子返回值均為鍵值對) | |
| sortBy | 根據后面設置的參數排序 | |
| distinct | 對這個 RDD 的元素或對象進行去重操作 | |
| Actions類算子 | foreach | foreach 對 RDD 中的每個元素都應用函數操作,傳入一條處理一條數據,返回值為空 |
| collect | 返回一個集合(RDD[T] => Seq[T]) collect 相當於 toArray, collect 將分布式的 RDD 返回為一個單機的 Array 數組。 |
|
| count | 一個 action 算子,計數功能,返回一個 Long 類型的對象 | |
| take(n) | 取前N條數據 | |
| save | 將RDD的數據存入磁盤或者HDFS | |
| reduce | 返回T和原來的類型一致(RDD[T] => T) | |
| foreachPartition | foreachPartition 也是根據傳入的 function 進行處理,但不 同處在於 function 的傳入參數是一個 partition 對應數據的 iterator,而不是直接使用 iterator 的 foreach。 |
map和flatMap者兩個算子的區別
7.Spark中WordCount演變流程圖_Scala和Java代碼
這里以Scala代碼為例
package com.hzf.spark.exerciseimport org.apache.spark.SparkConfimport org.apache.spark.SparkContext/*** 統計每一個單詞出現的次數*/object WordCount{def main(args:Array[String]):Unit={/*** 設置Spark運行時候環境參數 ,可以在SparkConf對象里面設置* 我這個應用程序使用多少資源 appname 運行模式*/val conf =newSparkConf().setAppName("WordCount").setMaster("local")/*** 創建Spark的上下文 SparkContext** SparkContext是通往集群的唯一通道。* Driver*/val sc =newSparkContext(conf)//將文本中數據加載到linesRDD中val linesRDD = sc.textFile("userLog")//對linesRDD中每一行數據進行切割val wordsRDD = linesRDD.flatMap(_.split(" "))val pairRDD = wordsRDD.map{(_,1)}/*** reduceByKey是一個聚合類的算子,實際上是由兩步組成** 1、groupByKey* 2、recuce*/val resultRDD = pairRDD.reduceByKey(_+_)/*(you,2)(Hello,2)(B,2)(a,1)(SQL,2)(A,3)(how,2)(core,2)(apple,1)(H,1)(C,1)(E,1)(what,2)(D,2)(world,2)*/resultRDD.foreach(println)/*(Spark,5)(A,3)(are,2)(you,2)(Hello,2)*/val sortRDD = resultRDD.map(x=>(x._2,x._1))val topN = sortRDD.sortByKey(false).map(x=>(x._2,x._1)).take(5)topN.foreach(println)}}
並行化:把一個本地集合或數據轉化為RDD的過程就是並行化
7.Spark_RDD持久化
7.1cache需要注意的事項
1.cache的返回值,必須賦值給一個新的變量(或者原來的是var類型的變量),然后在其他job中直接使用這個變量即可
2.cache是一個懶執行的算子,所以必須有Actions類型的算子(比如:count)觸發它
3.cache算子的后面不能立即添加Actions類型的算子(比如:val aRDD = linesRDD.cache()是正確的,而val bRDD = linesRDD.cache().count就是錯誤的)
7.2cache 和 persist 聯系 || 區別?
聯系:
cache和persist都為懶執行,所以需要觸發
Actions
類型的算子才會將RDD的結果持久化到內存
區別:
cache是persist的一個簡化版(cache是持久化到內存),persist里面可以手動指定其他持久化級別
liensRDD = liensRDD.cache() 等價於
liensRDD = liensRDD.persist(StorageLevel.MEMORY_ONLY)
參數的含義:
(1)持久化到磁盤
(2)持久化到內存
(3)使用對外內存(一般都是 false)
(4) 表示“不序列化”:true 表示不序列化;false 表示序列化
(5)表示副本個數
持久化的單位是partition,上面的2是指partition的備份數,不是指持久化到幾個節點上
7.3另一個持久化的算子--checkpoint
checkpoin也是懶執行,為了使RDD持久化的數據更加安全,可以使用checkpoint
checkpoint流程
1.在RDD的job執行完成之后,會自動的從
finalRDD(RDD3)從后往前進行
回溯(為什么能夠回溯?因為RDD的第三大特性,RDD之間是有一系列的依賴關系),遇到哪一個RDD(這里是RDD2)調用了checkpoint這個方法,就會對這個RDD做一個標記
maked for checkpoint
2.另外
重新啟動一個新的job,重新計算被標記的RDD,將RDD的結果寫入到HDFS中
3.
如何對第二步進行優化:重新計算被標記的RDD,這樣的話這個RDD就被計算了兩次,最好調用checkpoint之前進行cache一下,這樣的話,重新啟動這個job只需要將內存中的數據拷貝到HDFS上就可以(省去了計算的過程)
4.checkpoint的job執行完成之后,會將這個RDD的依賴關系切斷(即RDD2不需要再依賴RDD1,因為已經將RDD2這一步持久化了,以后需要數據的時候直接從持久化的地方取就可以了),並統一更名為checkpointRDD(RDD3的父RDD更名為checkpointRDD)
