Spark基礎與Java Api介紹


原創文章,轉載請注明: 轉載自http://www.cnblogs.com/tovin/p/3832405.html 

 

一、Spark簡介

  1、什么是Spark

    發源於AMPLab實驗室的分布式內存計算平台,它克服了MapReduce在迭代式計算和交互式計算方面的不足。

    相比於MapReduce,Spark能充分利用內存資源提高計算效率。

  2、Spark計算框架

    Driver程序啟動很多workers,然后workers在(分布式)文件系統中讀取數據后轉化為RDD(彈性分布式數據集),最后對RDD在內存中進行緩存和計算

  

        

  3、為什么Spark計算速度快
    (1)內存計算

    (2)優化執行計划

   4、Spark Api語言支持

    (1)Scala

    (2)Java

    (3)Python

  5、怎么運行Spark

    Local本地模式、Spark獨立集群、Mesos、Yarn-Standalone、Yarn-Client

 

二、編程模型

  1、RDD(彈性分布式數據集)是什么

    只讀的、分塊的數據記錄集合

    可以通過讀取來不同存儲類型的數據進行創建、或者通過RDD操作生成(map、filter操作等)

    使用者只能控制RDD的緩存或者分區方式

    RDD的數據可以有多種類型存儲方式(可(序列化)存在內存或硬盤中) 

  2、RDD 存儲類型 

    RDD可以設置不同類型存儲方式,只存硬盤、只存內存等。

    

  3、RDD操作

    Transformation:根據已有RDD創建新的RDD數據集build

    Action:在RDD數據集運行計算后,返回一個值或者將結果寫入外部存儲
    
 
 4、RDD如何創建
    
   首先創建JavaSparkContext對象實例sc

        JavaSparkContext  sc = new JavaSparkContext("local","SparkTest");

    接受2個參數:

      第一個參數表示運行方式(local、yarn-client、yarn-standalone等)

      第二個參數表示應用名字
 
  直接從集合轉化 sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
  從HDFS文件轉化 sc.textFile("hdfs://")
  從本地文件轉化 sc.textFile("file:/")
  下面例子中list2就是根據data2List生成的一個RDD
    
  
  根據文件或者集合生成RDD后,接着就可以通過RDD的Transformation操作來完成對數據的各種轉化操作
  常用的map、flatMap、filter操作都是對單元素的操作
  常用的groupByKey、join都是對(key、value)類型元素操作
 
  5、RDD操作例子Java Api
  (1)map
    map操作對數據集每行數據執行函數里面操作
    list1數據集("a,b,c,d,e"),("1,2,3,4,5"); 
      

    執行結果:對list1數據集每行數據用","進行切分

      
   (2)flatMap
      flatMap相比於map操作,它對每行數據操作后會生成多行數據,而map操作只會生成一行。
      

      執行結果:對list1數據集每行數據用","進行切分

       

    (3)filter

     filter對每行數據執行過濾操作,返回true則保留,返回false則過濾該行數據

       

       執行結果:過濾list1數據集中包含‘a’字符的行

       

    (4)union

      union操作對兩個RDD數據進行合並。與SQL中的union一樣

      list2數據集("11,22,33,44,55"),("aa,bb,cc,dd,ee"); 
        
      執行結果:合並list1與list2數據集
        
    (5)groupByKey
      groupByKey對pair中的key進行group by操作
      pair1RDD數據集("a,1"),("b,2"),("a,3"),("b,4")

        

      執行結果:對pair1RDD數據集按key進行group by

       

    (6)reduceByKey

      reduceByKey對pair中的key先進行group by操作,然后根據函數對聚合數據后的數據操作

      

      執行結果:先group by操作后進行concat

        

    (7)mapValues

      mapValues操作對pair中的value部分執行函數里面的操作

        

      執行結果:對pair1RDD中value部分加上test字符串

        

    (8)join

      join與sql中join含義一致,將兩個RDD中key一致的進行join連接操作

      pair2RDD數據集("a,11"),("b,22"),("a,13"),("c,4")
      

      執行結果:對pair1RDD與pair2RDD按key進行join

      

    (9)cogroup

      cogroup對兩個RDD數據集按key進行group by,並對每個RDD的value進行單獨group by

       

       執行結果:對pair1RDD與pair2RDD按key進行cogroup

        

  6、RDD數據如何輸出

    使用RDD的Transformation對數據操作后,需要再使用Action操作才能將結果數據輸出
    可以分別使用count、collect、save等操作來輸出或統計RDD結果
  
  7、RDD Action實例
     
    執行結果:

      count:統計輸出數據行數

      

      collect:輸出所有輸出數據

      

             save:保存輸出數據至外部存儲

      

  7、WordCount實例                                                                  

  

     執行結果:

    

 

  8、廣播變量& 累加器

    Broadcast variables(廣播變量) 

      廣播變量,類似於hadoop中的distribute cache,將同一份數據分發至每台機器。

    Accumulators(累加器)

      類似於MapReduce中的counter,用於計數
 
 

三、調度機制

  1、DAG Scheduler

    為每個job分割stage,同時會決定最佳路徑,並且DAG Scheduler會記錄哪個RDD或者stage的數據被checkpoint,從而找到最優調度方案                                         (transformations是延遲執行的原因)

    

  2、DAG Scheduler優化

    單個Stage內Pipeline執行

    基於分區選擇合適的join算法最小化shuffle
    重用已經cache過的數據

  3、窄依賴& 寬依賴

    窄依賴:每個子分區只依賴有限數目的父分區 

    寬依賴:每個子分區只依賴所有的父分區

      

  4、Stage

    調度器會在產生寬依賴的地方形成一個stage,同一個stage內的RDD操作會流式執行,不會發生數據遷移。

    

    rdd join操作屬於寬依賴,從spark產生的日志可以看出需要分3個stage執行

      

        rdd flatMap、Map操作屬於窄依賴,從spark產生的日志可以看出需要分1個stage執行

      

  5、Shuffle

    每個RDD都可以選擇Partitioner進行shuffle操作

    任何在兩個RDD上的shuffle操作,將選擇其中一個RDD的Partitioner作為標准。如果兩個RDD都沒有設置Partitioner的話,就使用默認的HashPartitioner

    shuffle需要在node之間移動數據,會影響spark執行效率,應該盡量避免RDD操作中發生shuffle。

  

 

 

原創文章,轉載請注明: 轉載自http://www.cnblogs.com/tovin/p/3832405.html 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM