Spark基礎與Java Api介紹


 

原文引自:http://blog.csdn.net/u011497897/article/details/71440323

一、Spark簡介

  1、什么是Spark

    發源於AMPLab實驗室的分布式內存計算平台,它克服了MapReduce在迭代式計算和交互式計算方面的不足。

    相比於MapReduce,Spark能充分利用內存資源提高計算效率。

  2、Spark計算框架

    Driver程序啟動很多workers,然后workers在(分布式)文件系統中讀取數據后轉化為RDD(彈性分布式數據集),最后對RDD在內存中進行緩存和計算

3、為什么Spark計算速度快
    (1)內存計算

    (2)優化執行計划

   4、Spark Api語言支持

    (1)Scala

    (2)Java

    (3)Python

  5、怎么運行Spark

    Local本地模式、Spark獨立集群、Mesos、Yarn-Standalone、Yarn-Client

二、編程模型

  1、RDD(彈性分布式數據集)是什么

    只讀的、分塊的數據記錄集合

    可以通過讀取來不同存儲類型的數據進行創建、或者通過RDD操作生成(map、filter操作等)

    使用者只能控制RDD的緩存或者分區方式

    RDD的數據可以有多種類型存儲方式(可(序列化)存在內存或硬盤中) 

2、RDD 存儲類型 

    RDD可以設置不同類型存儲方式,只存硬盤、只存內存等。

 

 

3、RDD操作

    Transformation:根據已有RDD創建新的RDD數據集build

    Action:在RDD數據集運行計算后,返回一個值或者將結果寫入外部存儲

 

4、RDD如何創建
    
   首先創建JavaSparkContext對象實例sc

        JavaSparkContext  sc = new JavaSparkContext("local","SparkTest");

    接受2個參數:

      第一個參數表示運行方式(local、yarn-client、yarn-standalone等)

      第二個參數表示應用名字
       直接從集合轉化 sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
  從HDFS文件轉化 sc.textFile("hdfs://")
  從本地文件轉化 sc.textFile("file:/")
  下面例子中list2就是根據data2List生成的一個RDD

 

 
根據文件或者集合生成RDD后,接着就可以通過RDD的Transformation操作來完成對數據的各種轉化操作
  常用的map、flatMap、filter操作都是對單元素的操作
  常用的groupByKey、join都是對(key、value)類型元素操作
5、RDD操作例子Java Api
  (1)map
    map操作對數據集每行數據執行函數里面操作
    list1數據集("a,b,c,d,e"),("1,2,3,4,5"); 

執行結果:對list1數據集每行數據用","進行切分

2)flatMap
      flatMap相比於map操作,它對每行數據操作后會生成多行數據,而map操作只會生成一行。

執行結果:對list1數據集每行數據用","進行切分

3)filter

     filter對每行數據執行過濾操作,返回true則保留,返回false則過濾該行數據

執行結果:過濾list1數據集中包含‘a’字符的行

 

4)union

      union操作對兩個RDD數據進行合並。與SQL中的union一樣

      list2數據集("11,22,33,44,55"),("aa,bb,cc,dd,ee"); 

執行結果:合並list1與list2數據集

(5)groupByKey
      groupByKey對pair中的key進行group by操作
      pair1RDD數據集("a,1"),("b,2"),("a,3"),("b,4")

執行結果:對pair1RDD數據集按key進行group by

 

(6)reduceByKey

      reduceByKey對pair中的key先進行group by操作,然后根據函數對聚合數據后的數據操作

執行結果:先group by操作后進行concat

 

(7)mapValues

      mapValues操作對pair中的value部分執行函數里面的操作

執行結果:對pair1RDD中value部分加上test字符串

 

8)join

      join與sql中join含義一致,將兩個RDD中key一致的進行join連接操作

      pair2RDD數據集("a,11"),("b,22"),("a,13"),("c,4")

執行結果:對pair1RDD與pair2RDD按key進行join

(9)cogroup

      cogroup對兩個RDD數據集按key進行group by,並對每個RDD的value進行單獨group by

執行結果:對pair1RDD與pair2RDD按key進行cogroup

6、RDD數據如何輸出

    使用RDD的Transformation對數據操作后,需要再使用Action操作才能將結果數據輸出
    可以分別使用count、collect、save等操作來輸出或統計RDD結果
  
7、RDD Action實例

執行結果:

      count:統計輸出數據行數

collect:輸出所有輸出數據

save:保存輸出數據至外部存儲

 

7、WordCount實例   

 執行結果:

8、廣播變量& 累加器

    Broadcast variables(廣播變量) 

      廣播變量,類似於hadoop中的distribute cache,將同一份數據分發至每台機器。

    Accumulators(累加器)

      類似於MapReduce中的counter,用於計數
 

三、調度機制

  1、DAG Scheduler

    為每個job分割stage,同時會決定最佳路徑,並且DAG Scheduler會記錄哪個RDD或者stage的數據被checkpoint,從而找到最優調度方案                                         (transformations是延遲執行的原因)

 

2、DAG Scheduler優化

    單個Stage內Pipeline執行

    基於分區選擇合適的join算法最小化shuffle
    重用已經cache過的數據

  3、窄依賴& 寬依賴

    窄依賴:每個子分區只依賴有限數目的父分區 

    寬依賴:每個子分區只依賴所有的父分區

4、Stage

    調度器會在產生寬依賴的地方形成一個stage,同一個stage內的RDD操作會流式執行,不會發生數據遷移。

rdd join操作屬於寬依賴,從spark產生的日志可以看出需要分3個stage執行

rdd flatMap、Map操作屬於窄依賴,從spark產生的日志可以看出需要分1個stage執行

5、Shuffle

    每個RDD都可以選擇Partitioner進行shuffle操作

    任何在兩個RDD上的shuffle操作,將選擇其中一個RDD的Partitioner作為標准。如果兩個RDD都沒有設置Partitioner的話,就使用默認的HashPartitioner

    shuffle需要在node之間移動數據,會影響spark執行效率,應該盡量避免RDD操作中發生shuffle。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM