Spark_總結一


 


Spark_總結一

1.Spark介紹

    1.1什么是Spark?
    Apache Spark是一個開源的集群計算框架,使數據計算更快( 高效運行,快速開發
    
    1.2Spark比Hadoop快的兩個原因
     第一,內存計算
     第二,DAG(有向無環圖)

2.Spark運行模式(四種 

Local     多用於測試
Standalone Spark自帶的資源調度器(默認情況下就跑在這里面)
MeSOS 資源調度器,同Hadoop中的YARN
YARN     最具前景,公司里大部分都是  Spark on YRAN

3.Spark內核之RDD的五大特性

 
Resilient Distributed Dataset
RDD是基礎-->彈性分布式數據集
 
第一大特性: RDD由一系列的partitions組成(如果數據源在HDFS上,默認partition的數量與block的個數一致,Spark並沒有讀取HDFS的方法,它是沿用MR的方法,MR讀取HDFS上的數據時首先會進行split,RDD中每一個partition與split對應,split默認與block的大小一致,所以 默認partition的數量與block的個數一致
第二大特性:每一個函數實際上是作用在RDD中的每一個partition上
第三大特性:RDD是由一系列的依賴關系的(這里體現出了RDD的彈性,彈性一,數據容錯;彈性二,partition可大可小)
第四大特性:partitioner(分區器)是作用在KV格式的RDD上(RDD執行聚合類函數的時候會產生shuffle,Spark產生shuffle肯定會有partitioner,而partitioner是作用在KV格式的RDD上,推測出聚合類函數必須作用在KV格式的RDD上)
第五大特性:每一個RDD提供了最佳的計算位置,告訴我們每一個partition所在的節點,然后相對應的task就會移動到該節點進行計算(移動計算,而不是移動數據)

4.Spark運行機制

   開機啟動時  Driver 、 Worker Application 會將自己的資源信息注冊到 Master 中,當初始化的時候, Master 先為 Driver 分配資源然后啟動 Driver。
    Driver 運行時先從 main()方法開始,任務在 Worker 上執行,Worker 可以是一台真實的物理機,也可以是虛擬機,擁有 RAM 和 Core。 然后會將 Task 移動到本地的數據上執行運算。 最優計算位置Inputdata 和 Task 在一起(避免了網絡間的信息傳輸) 。實際情況很少會這樣, 有可能存在當前那個計算節點的計算資源計算能力都滿了,默認配置不變的情況下 Spark 計算框架會等待 3s( spark.locality.wait 設置的,在 SparkConf()可以修改),默認重試 5 次。如果均失敗了,會選擇一個比較差的本地節點;Spark 分配算法會將其分配到計算數據附近的節點, Task 會通過其所在節點的 BlockManager 來獲取數據,BlockManager 發現自己本地沒有數據,會通過getRemote()方法,通過 TransferService(網絡數據傳輸組件)從原 task 所在節點的 BlockManager 中,獲取數據,通過網絡傳輸回 Task 所在節 點----->(性能大幅度下降,大量的網絡 IO 占用資源) 計算后的結果會返回 到 Driver 上

5.Spark運行時

Driver(SparkContext運行所在的節點可以看做一個Driver)作用:
       分發task給對應的Worker,可以和其他節點(Worker)進行通信
       接收task的計算結果
 
Worker作用:
        
Worker 可以是一台真實的物理機,也可以是虛擬機,擁有 RAM 和 Core,執行運算


6.Spark算子--Transformations   ||   Actions

                               Transformations  ||  Actions  這兩類算子的區別  
   Transformations
Transformations類的算子會返回一個新的RDD,懶執行
           Actions
Actions類的算子會返回基本類型或者一個集合,能夠觸發一個job的 執行,代碼里面有多少個action類算子,那么就有多少個job
 
常見的算子
   Transformation類算子 map     輸入一條,輸出一條
將原來 RDD 的每個數據項通過 map 中的用戶自定義函數映射轉變為一個新的 元素。輸入一條輸出一條;
flatMap     輸入一條輸出多條
先進行map后進行flat
mapPartitions 與 map 函數類似,只不過映射函數的參數由 RDD 中的每一個元素變成了 RDD 中每一個分區的迭代器。將 RDD 中的所有數據通過 JDBC 連接寫入數據庫,如果使 用 map 函數,可能要為每一個元素都創建一個 connection,這樣開銷很大,如果使用 mapPartitions,那么只需要針對每一個分區建立一個 connection。
mapPartitionsWithIndex  
filter 依據條件過濾的算子
join     聚合類的函數,會產生shuffle,必須作用在KV格式的數據上
join 是將兩個 RDD 按照 Key 相同做一次聚合;而 leftouterjoin 是依 據左邊的 RDD 的 Key 進行聚
union     不會進行數據的傳輸,只不過將這兩個的RDD標識一下
(代表屬於一個RDD)
reduceByKey 先分組groupByKey,后聚合根據傳入的匿名函數聚合,適合在 map 端進行 combiner
sortByKey 依據 Key 進行排序,默認升序,參數設為 false 為降序
mapToPair 進行一次 map 操作,然后返回一個鍵值對的 RDD。(所有的帶 Pair 的算子返回值均為鍵值對)
sortBy 根據后面設置的參數排序
distinct 對這個 RDD 的元素或對象進行去重操作
Actions類算子 foreach foreach 對 RDD 中的每個元素都應用函數操作,傳入一條處理一條數據,返回值為空
collect 返回一個集合(RDD[T] => Seq[T])
collect 相當於 toArray, collect 將分布式的 RDD 返回為一個單機的 Array 數組。
count 一個 action 算子,計數功能,返回一個 Long 類型的對象
  take(n) 取前N條數據
   save     將RDD的數據存入磁盤或者HDFS
reduce 返回T和原來的類型一致(RDD[T] => T)
foreachPartition foreachPartition 也是根據傳入的 function 進行處理,但不 同處在於 function 的傳入參數是一個 partition 對應數據的 iterator,而不是直接使用 iterator 的 foreach。
 
map和flatMap者兩個算子的區別
 
 

7.Spark中WordCount演變流程圖_Scala和Java代碼

這里以Scala代碼為例
  1. package com.hzf.spark.exercise
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. /**
  5. * 統計每一個單詞出現的次數
  6. */
  7. object WordCount{
  8. def main(args:Array[String]):Unit={
  9. /**
  10. * 設置Spark運行時候環境參數 ,可以在SparkConf對象里面設置
  11. * 我這個應用程序使用多少資源 appname 運行模式
  12. */
  13. val conf =newSparkConf()
  14. .setAppName("WordCount")
  15. .setMaster("local")
  16. /**
  17. * 創建Spark的上下文 SparkContext
  18. *
  19. * SparkContext是通往集群的唯一通道。
  20. * Driver
  21. */
  22. val sc =newSparkContext(conf)
  23. //將文本中數據加載到linesRDD中
  24. val linesRDD = sc.textFile("userLog")
  25. //對linesRDD中每一行數據進行切割
  26. val wordsRDD = linesRDD.flatMap(_.split(" "))
  27. val pairRDD = wordsRDD.map{(_,1)}
  28. /**
  29. * reduceByKey是一個聚合類的算子,實際上是由兩步組成
  30. *
  31. * 1、groupByKey
  32. * 2、recuce
  33. */
  34. val resultRDD = pairRDD.reduceByKey(_+_)
  35. /*(you,2)
  36. (Hello,2)
  37. (B,2)
  38. (a,1)
  39. (SQL,2)
  40. (A,3)
  41. (how,2)
  42. (core,2)
  43. (apple,1)
  44. (H,1)
  45. (C,1)
  46. (E,1)
  47. (what,2)
  48. (D,2)
  49. (world,2)*/
  50. resultRDD.foreach(println)
  51. /*(Spark,5)
  52. (A,3)
  53. (are,2)
  54. (you,2)
  55. (Hello,2)*/
  56. val sortRDD = resultRDD.map(x=>(x._2,x._1))
  57. val topN = sortRDD.sortByKey(false).map(x=>(x._2,x._1)).take(5)
  58. topN.foreach(println)
  59. }
  60. }
 
並行化:把一個本地集合或數據轉化為RDD的過程就是並行化
 
7.Spark_RDD持久化
7.1cache需要注意的事項 
    1.cache的返回值,必須賦值給一個新的變量(或者原來的是var類型的變量),然后在其他job中直接使用這個變量即可
    2.cache是一個懶執行的算子,所以必須有Actions類型的算子(比如:count)觸發它
    3.cache算子的后面不能立即添加Actions類型的算子(比如:val aRDD = linesRDD.cache()是正確的,而val bRDD = linesRDD.cache().count就是錯誤的)
 
7.2cache 和 persist 聯系  ||  區別?
    聯系: cache和persist都為懶執行,所以需要觸發 Actions 類型的算子才會將RDD的結果持久化到內存
    區別: cache是persist的一個簡化版(cache是持久化到內存),persist里面可以手動指定其他持久化級別
liensRDD = liensRDD.cache()    等價於     liensRDD = liensRDD.persist(StorageLevel.MEMORY_ONLY)
參數的含義:
        (1)持久化到磁盤 
        (2)持久化到內存 
        (3)使用對外內存(一般都是 false)
        (4) 表示“不序列化”:true 表示不序列化;false 表示序列化 
        (5)表示副本個數
持久化的單位是partition,上面的2是指partition的備份數,不是指持久化到幾個節點上
 
7.3另一個持久化的算子--checkpoint
    checkpoin也是懶執行,為了使RDD持久化的數據更加安全,可以使用checkpoint
 
checkpoint流程
 
    1.在RDD的job執行完成之后,會自動的從 finalRDD(RDD3)從后往前進行 回溯(為什么能夠回溯?因為RDD的第三大特性,RDD之間是有一系列的依賴關系),遇到哪一個RDD(這里是RDD2)調用了checkpoint這個方法,就會對這個RDD做一個標記 maked for checkpoint
    2.另外 重新啟動一個新的job,重新計算被標記的RDD,將RDD的結果寫入到HDFS中
    3. 如何對第二步進行優化:重新計算被標記的RDD,這樣的話這個RDD就被計算了兩次,最好調用checkpoint之前進行cache一下,這樣的話,重新啟動這個job只需要將內存中的數據拷貝到HDFS上就可以(省去了計算的過程)
    4.checkpoint的job執行完成之后,會將這個RDD的依賴關系切斷(即RDD2不需要再依賴RDD1,因為已經將RDD2這一步持久化了,以后需要數據的時候直接從持久化的地方取就可以了),並統一更名為checkpointRDD(RDD3的父RDD更名為checkpointRDD)
         


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM