Spark_總結一
1.Spark介紹
1.1什么是Spark?
Apache Spark是一個開源的集群計算框架,使數據計算更快(
高效運行,快速開發)
1.2Spark比Hadoop快的兩個原因
第一,內存計算
第二,DAG(有向無環圖)
2.Spark運行模式(四種 )
Local | 多用於測試 |
Standalone | Spark自帶的資源調度器(默認情況下就跑在這里面) |
MeSOS | 資源調度器,同Hadoop中的YARN |
YARN | 最具前景,公司里大部分都是 Spark on YRAN |
3.Spark內核之RDD的五大特性

Resilient Distributed Dataset
RDD是基礎-->彈性分布式數據集
第一大特性:
RDD由一系列的partitions組成(如果數據源在HDFS上,默認partition的數量與block的個數一致,Spark並沒有讀取HDFS的方法,它是沿用MR的方法,MR讀取HDFS上的數據時首先會進行split,RDD中每一個partition與split對應,split默認與block的大小一致,所以
默認partition的數量與block的個數一致
)
第二大特性:每一個函數實際上是作用在RDD中的每一個partition上
第三大特性:RDD是由一系列的依賴關系的(這里體現出了RDD的彈性,彈性一,數據容錯;彈性二,partition可大可小)
第四大特性:partitioner(分區器)是作用在KV格式的RDD上(RDD執行聚合類函數的時候會產生shuffle,Spark產生shuffle肯定會有partitioner,而partitioner是作用在KV格式的RDD上,推測出聚合類函數必須作用在KV格式的RDD上)
第五大特性:每一個RDD提供了最佳的計算位置,告訴我們每一個partition所在的節點,然后相對應的task就會移動到該節點進行計算(移動計算,而不是移動數據)
4.Spark運行機制
開機啟動時
Driver 、
Worker
、和
Application
會將自己的資源信息注冊到
Master
中,當初始化的時候,
Master
先為
Driver
分配資源然后啟動
Driver。
Driver 運行時先從 main()方法開始,任務在 Worker 上執行,Worker 可以是一台真實的物理機,也可以是虛擬機,擁有 RAM 和 Core。
然后會將
Task
移動到本地的數據上執行運算。
最優計算位置Inputdata 和 Task 在一起(避免了網絡間的信息傳輸)
。實際情況很少會這樣, 有可能存在當前那個計算節點的計算資源和計算能力都滿了,默認配置不變的情況下 Spark 計算框架會等待 3s(
spark.locality.wait
設置的,在 SparkConf()可以修改),默認重試 5 次。如果均失敗了,會選擇一個比較差的本地節點;Spark 分配算法會將其分配到計算數據附近的節點, Task 會通過其所在節點的 BlockManager 來獲取數據,BlockManager 發現自己本地沒有數據,會通過getRemote()方法,通過 TransferService(網絡數據傳輸組件)從原 task 所在節點的 BlockManager 中,獲取數據,通過網絡傳輸回 Task 所在節 點----->(性能大幅度下降,大量的網絡 IO 占用資源) 計算后的結果會返回 到 Driver 上
5.Spark運行時

Driver(SparkContext運行所在的節點可以看做一個Driver)作用:
分發task給對應的Worker,可以和其他節點(Worker)進行通信
接收task的計算結果
Worker作用:
Worker 可以是一台真實的物理機,也可以是虛擬機,擁有 RAM 和 Core,執行運算
6.Spark算子--Transformations || Actions
Transformations || Actions 這兩類算子的區別
|
||
Transformations
|
Transformations類的算子會返回一個新的RDD,懶執行 | |
Actions
|
Actions類的算子會返回基本類型或者一個集合,能夠觸發一個job的 執行,代碼里面有多少個action類算子,那么就有多少個job |
常見的算子
Transformation類算子 | map | 輸入一條,輸出一條 將原來 RDD 的每個數據項通過 map 中的用戶自定義函數映射轉變為一個新的 元素。輸入一條輸出一條; |
flatMap | 輸入一條輸出多條 先進行map后進行flat |
|
mapPartitions | 與 map 函數類似,只不過映射函數的參數由 RDD 中的每一個元素變成了 RDD 中每一個分區的迭代器。將 RDD 中的所有數據通過 JDBC 連接寫入數據庫,如果使 用 map 函數,可能要為每一個元素都創建一個 connection,這樣開銷很大,如果使用 mapPartitions,那么只需要針對每一個分區建立一個 connection。 | |
mapPartitionsWithIndex | ||
filter | 依據條件過濾的算子 | |
join | 聚合類的函數,會產生shuffle,必須作用在KV格式的數據上 join 是將兩個 RDD 按照 Key 相同做一次聚合;而 leftouterjoin 是依 據左邊的 RDD 的 Key 進行聚 |
|
union | 不會進行數據的傳輸,只不過將這兩個的RDD標識一下 (代表屬於一個RDD) |
|
reduceByKey | 先分組groupByKey,后聚合根據傳入的匿名函數聚合,適合在 map 端進行 combiner | |
sortByKey | 依據 Key 進行排序,默認升序,參數設為 false 為降序 | |
mapToPair | 進行一次 map 操作,然后返回一個鍵值對的 RDD。(所有的帶 Pair 的算子返回值均為鍵值對) | |
sortBy | 根據后面設置的參數排序 | |
distinct | 對這個 RDD 的元素或對象進行去重操作 | |
Actions類算子 | foreach | foreach 對 RDD 中的每個元素都應用函數操作,傳入一條處理一條數據,返回值為空 |
collect | 返回一個集合(RDD[T] => Seq[T]) collect 相當於 toArray, collect 將分布式的 RDD 返回為一個單機的 Array 數組。 |
|
count | 一個 action 算子,計數功能,返回一個 Long 類型的對象 | |
take(n) | 取前N條數據 | |
save | 將RDD的數據存入磁盤或者HDFS | |
reduce | 返回T和原來的類型一致(RDD[T] => T) | |
foreachPartition | foreachPartition 也是根據傳入的 function 進行處理,但不 同處在於 function 的傳入參數是一個 partition 對應數據的 iterator,而不是直接使用 iterator 的 foreach。 |
map和flatMap者兩個算子的區別

7.Spark中WordCount演變流程圖_Scala和Java代碼

這里以Scala代碼為例
package com.hzf.spark.exercise
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 統計每一個單詞出現的次數
*/
object WordCount{
def main(args:Array[String]):Unit={
/**
* 設置Spark運行時候環境參數 ,可以在SparkConf對象里面設置
* 我這個應用程序使用多少資源 appname 運行模式
*/
val conf =newSparkConf()
.setAppName("WordCount")
.setMaster("local")
/**
* 創建Spark的上下文 SparkContext
*
* SparkContext是通往集群的唯一通道。
* Driver
*/
val sc =newSparkContext(conf)
//將文本中數據加載到linesRDD中
val linesRDD = sc.textFile("userLog")
//對linesRDD中每一行數據進行切割
val wordsRDD = linesRDD.flatMap(_.split(" "))
val pairRDD = wordsRDD.map{(_,1)}
/**
* reduceByKey是一個聚合類的算子,實際上是由兩步組成
*
* 1、groupByKey
* 2、recuce
*/
val resultRDD = pairRDD.reduceByKey(_+_)
/*(you,2)
(Hello,2)
(B,2)
(a,1)
(SQL,2)
(A,3)
(how,2)
(core,2)
(apple,1)
(H,1)
(C,1)
(E,1)
(what,2)
(D,2)
(world,2)*/
resultRDD.foreach(println)
/*(Spark,5)
(A,3)
(are,2)
(you,2)
(Hello,2)*/
val sortRDD = resultRDD.map(x=>(x._2,x._1))
val topN = sortRDD.sortByKey(false).map(x=>(x._2,x._1)).take(5)
topN.foreach(println)
}
}
並行化:把一個本地集合或數據轉化為RDD的過程就是並行化

7.Spark_RDD持久化
7.1cache需要注意的事項
1.cache的返回值,必須賦值給一個新的變量(或者原來的是var類型的變量),然后在其他job中直接使用這個變量即可
2.cache是一個懶執行的算子,所以必須有Actions類型的算子(比如:count)觸發它
3.cache算子的后面不能立即添加Actions類型的算子(比如:val aRDD = linesRDD.cache()是正確的,而val bRDD = linesRDD.cache().count就是錯誤的)
7.2cache 和 persist 聯系 || 區別?
聯系:
cache和persist都為懶執行,所以需要觸發
Actions
類型的算子才會將RDD的結果持久化到內存
區別:
cache是persist的一個簡化版(cache是持久化到內存),persist里面可以手動指定其他持久化級別
liensRDD = liensRDD.cache() 等價於
liensRDD = liensRDD.persist(StorageLevel.MEMORY_ONLY)

參數的含義:
(1)持久化到磁盤
(2)持久化到內存
(3)使用對外內存(一般都是 false)
(4) 表示“不序列化”:true 表示不序列化;false 表示序列化
(5)表示副本個數
持久化的單位是partition,上面的2是指partition的備份數,不是指持久化到幾個節點上
7.3另一個持久化的算子--checkpoint
checkpoin也是懶執行,為了使RDD持久化的數據更加安全,可以使用checkpoint
checkpoint流程

1.在RDD的job執行完成之后,會自動的從
finalRDD(RDD3)從后往前進行
回溯(為什么能夠回溯?因為RDD的第三大特性,RDD之間是有一系列的依賴關系),遇到哪一個RDD(這里是RDD2)調用了checkpoint這個方法,就會對這個RDD做一個標記
maked for checkpoint
2.另外
重新啟動一個新的job,重新計算被標記的RDD,將RDD的結果寫入到HDFS中
3.
如何對第二步進行優化:重新計算被標記的RDD,這樣的話這個RDD就被計算了兩次,最好調用checkpoint之前進行cache一下,這樣的話,重新啟動這個job只需要將內存中的數據拷貝到HDFS上就可以(省去了計算的過程)
4.checkpoint的job執行完成之后,會將這個RDD的依賴關系切斷(即RDD2不需要再依賴RDD1,因為已經將RDD2這一步持久化了,以后需要數據的時候直接從持久化的地方取就可以了),並統一更名為checkpointRDD(RDD3的父RDD更名為checkpointRDD)