SparkCore的常用算子


 SparkCore算子簡介

SparkCore中的算子可以分為2類:Transformations Operation   和 Action Operation

在Spark的提交過程中,會將RDD及作用於其上的一系列算子(即:RDD及其之間的依賴關系)構建成一個DAG有向無環視圖。當遇到action算子的時候就會觸發一個job的提交,而Driver程序 則會將觸發的job提交給DAGScheduler,由DAGSchedule將job構建成一張DAG

因此,action類算子就是spark application程序分為job的依據,也就是觸發job提交的決定性因素

Spark的RDD空間

  • Stage:

        stage是對job的划分,遇到shuffle就划分,一個stage有多個tasks,同一個job間的stage具有依賴關系,前者必須結束才能進行后者的計算。

  • RDD的創建

(1)通過集合創建RDD,主要用於進行測試,可以在實際部署到集群運行之前,自己使用集合構造測試數據,來測試后面的spark應用的流程 

 

(2) 使用本地文件創建RDD ,在本地臨時性地處理一些存儲了大量數據的文件

 

 

 (3)HDFS文件創建RDD,主要用於測試大量數據

其實本地創建RDD和HDFS文件創建RDD是一樣的,只是在路徑上,要指明是HDFS

hdfs://spark1:9000/data.txt

scala> val linesLength = sc.textFile("hdfs://spark1:9000/rdd.txt").map(line => line.length).reduce(_+_)

linesLength: Int = 9
  •  常用的算子

將worker / executor中的RDD數據或其計算數據拉取到Driver程序中來,形成Scala的集合或變量的操作
算子 描述
collect() 無參,以數組的形式返回RDD中的所有的元素:本質上:是將executor中運算得到的RDD--->拉取到Driver程序中來,形成Scala的集合
take(n) 返回RDD中的前n個元素,無參時,默認為前10個
takeOrdered(n, [ordering]) 和top類似,先排序(升序/降序),再取前n,不過獲取的元素的順序與top相反
takeSample(withReplacement, num, [seed]) 返回一個隨機采樣數組,該數組由從RDD中采樣獲得,可以選擇是否用隨機數來替換不足的部分,seed用於指定隨機數生成器的種子
first() 返回RDD的第一個元素(並不一定是最大的,看RDD是否排序,類似於take(1))
top(n) 返回由RDD前n個最大的元素構成的數組(最大:元素具備可比性,默認取字典順序最大的)
reduce(func) 通過func函數來聚集RDD中的所有元素,且要求func必須滿足:1.可交換;2.可並聯。
reduceByKeyLocally(func:(V, V)=>V) 功能與reduceByKey相同,以key為組進行聚合,但是 唯一不同的是:該算子返回的是一個Map[K, V]的集合
sum() 只能作用於純數值形式的RDD,返回元素的總和
count() 無參,()可以省略,返回RDD的元素個數
countByValue() 無參,針對於任意類型的RDD,統計RDD中各種元素值及其出現的次數,返回Map[value,count]集合
countByKey() 無參,針對於PairRDD,返回每種Key對應的元素的個數,返回Map[key, count]形式的Map集合

 

 

將RDD中的數據拉取到Driver程序中進行處理(如:遍歷,打印輸出),如:
算子 描述
foreach(func) 針對於RDD中的每一個元素,運行func進行更新 func 的沒有返回值(Unit)
foreachPartition(func) 以Partition為單位進行遍歷,遍歷每個分區。foreachPartition(func: Iterator[T]=>Unit): Unit

 

 

將executor中的RDD數據寫出到文件系統去
算子 描述
saveAsTextFile(path) 將RDD數據集中的元素,以textFile的格式保存到HDFS或者其他文件系統中去。對於每個元素,Spark程序都會調用toString()方法,將元素轉換為文本格式
saveAsSequenceFile(path) 將數據集的元素以Hadoop SequenceFile的格式保存到指定的目錄中,可以使用HDFS或者其他Hadoop支持的文件系統
saveAsObjectFile(path) 用於將RDD中的元素序列化為對象,存儲於磁盤中。對於HDFS,默認采用SequenceFile的格式存儲

 

  •  collect算子

 

collect(): 收集數據,將RDD轉換為Scala的 Array數組
         * 本質上:是將executor中運算得到的RDD--->拉取到Driver程序中來,形成Scala的集合
  • take 算子

* take(n): 獲取 RDD的前n個元素
    *     ---返回前n個元素組成的數組,而不是返回新的RDD(屬於Action類的算子)
  • first 算子

first(): 返回RDD的第一個元素(Scala變量),== take(1)
      *          並不會排序(區別於top())
  • top 算子

 

top(n):取 RDD的最大的前 n個元素,返回Array集合,屬於action算子;
      *
      *    1) 當為普通單值RDD時:按照元素值的字典順序,取最大的前n個;
      *
      *    2) 當為PairRDD時:先按照key值進行降序排序,當KEY值相同時,再按照value降序排序,
      *                     最后取最大的前N個;
  • count算子

 * count():統計 RDD的元素個數!
  • countByKey 算子:

* countByKey(): 針對於PairRDD,按照key統計每一種 key的元素的個數
      *               統計的是每一種 key的數量(與value無關)

 

 

  • countByValue()

 
        
      * countByValue(): 針對於各種RDD,統計其中每一種唯一的元素  的出現次數!!
      *                 此處的value指的是RDD的元素,並不是k,v中的value;
      *                 與是PairRDD、還是普通單值RDD無關!
      *                 ---返回Map集合,屬於action類算子

 

 

  • foreach 算子

* foreach(func): 用於遍歷 RDD,將函數func應用於每一個元素。
      *       -- 無返回值(不會返回新的RDD,也不會返回scala集合)
      *       func必須為沒有返回值的方法(返回值為 Unit類型)
  • foreachPartition 算子

/**
      * foreachPartition():func中的參數iterator包含了一個分區中的所有元素構成的迭代器;
      *   ---與foreach的效果類似,但是能夠以分區為單位進行處理,對於多數場景的處理效率要高!
      */

 

wc案例演示

scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").collect
file: Array[String] = Array(word    ruoze    jepson, xiaohai    ruoze    word, dashu    xiaohai    jepson, xiaoshiqi)

scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).collect
file: Array[String] = Array(word, ruoze, jepson, xiaohai, ruoze, word, dashu, xiaohai, jepson, xiaoshiqi)
                                                                                    
scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).map((_,1)).collect
file: Array[(String, Int)] = Array((word,1), (ruoze,1), (jepson,1), (xiaohai,1), (ruoze,1), (word,1), (dashu,1), 
(xiaohai,1), (jepson,1), (xiaoshiqi,1)) scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).collect file: Array[(String, Int)] = Array((word,2), (jepson,2), (ruoze,2), (xiaohai,2), (xiaoshiqi,1), (dashu,1))

scala>val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_). 

 sortBy(_._2,false).collect

scala> file
res0: Array[(String, Int)] = Array((word,2), (jepson,2), (ruoze,2), (xiaohai,2), (xiaoshiqi,1), (dashu,1))

 

 

 

下一節出常用Transiformation算子

 參考博客:https://blog.csdn.net/qq_16759443/article/details/82801332


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM