Spark常用函數講解之Action操作


摘要:

RDD:彈性分布式數據集,是一種特殊集合 ‚ 支持多種來源 ‚ 有容錯機制 ‚ 可以被緩存 ‚ 支持並行操作,一個RDD代表一個分區里的數據集
RDD有兩種操作算子:

        Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另一個RDD時並沒有立即進行轉換,僅僅是記住       了數據集的邏輯操作
         Ation(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
 
本系列主要講解Spark中常用的函數操作:
         1.RDD基本轉換
         2.鍵-值RDD轉換
         3.Action操作篇
本發所講函數
 
1.reduce(func):通過函數func 先聚集各分區的數據集,再聚集分區之間的數據 ,func接收兩個參數,返回一個新值,新值再做為參數繼續傳遞給函數func,直到最后一個元素
 
2.collect():以數據的形式返回數據集中的所有元素給Driver程序,為防止Driver程序內存溢出,一般要控制返回的數據集大小
 
3.count():返回數據集元素個數
 
4.first():返回數據集的第一個元素
 
5.take(n):以數組的形式返回數據集上的前n個元素
 
6.top(n):按默認或者指定的排序規則返回前n個元素,默認按降序輸出
 
7.takeOrdered(n,[ordering]):  按自然順序或者指定的排序規則返回前n個元素
例1:
def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("reduce")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10,2)
    val reduceRDD = rdd.reduce(_ + _)
    val reduceRDD1 = rdd.reduce(_ - _) //如果分區數據為1結果為 -53
    val countRDD = rdd.count()
    val firstRDD = rdd.first()
    val takeRDD = rdd.take(5)    //輸出前個元素
    val topRDD = rdd.top(3)      //從高到底輸出前三個元素
    val takeOrderedRDD = rdd.takeOrdered(3)    //按自然順序從底到高輸出前三個元素

    println("func +: "+reduceRDD)
    println("func -: "+reduceRDD1)
    println("count: "+countRDD)
    println("first: "+firstRDD)
    println("take:")
    takeRDD.foreach(x => print(x +" "))
    println("\ntop:")
    topRDD.foreach(x => print(x +" "))
    println("\ntakeOrdered:")
    takeOrderedRDD.foreach(x => print(x +" "))
    sc.stop
  }
輸出:
func +: 55
func -: 15 //如果分區數據為1結果為 -53
count: 10
first: 1
take:
1 2 3 4 5
top:
10 9 8
takeOrdered:
1 2 3
  (RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)
 
         (RDD依賴圖)
 
8.countByKey():作用於K-V類型的RDD上,統計每個key的個數,返回(K,K的個數)
 
9.collectAsMap():作用於K-V類型的RDD上,作用與collect不同的是collectAsMap函數不包含重復的key,對於重復的key。后面的元素覆蓋前面的元素
 
10.lookup(k):作用於K-V類型的RDD上,返回指定K的所有V值
例2:
 def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("KVFunc")
    val sc = new SparkContext(conf)
    val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    val rdd = sc.parallelize(arr,2)
    val countByKeyRDD = rdd.countByKey()
    val collectAsMapRDD = rdd.collectAsMap()

    println("countByKey:")
    countByKeyRDD.foreach(print)

    println("\ncollectAsMap:")
    collectAsMapRDD.foreach(print)
    sc.stop
  }
輸出:
countByKey:
(B,2)(A,2)
collectAsMap:
(A,2)(B,3)
 
        (RDD依賴圖)
 
11.aggregate(zeroValue:U)(seqOp:(U,T) => U,comOp(U,U) => U):
seqOp函數將每個分區的數據聚合成類型為U的值,comOp函數將各分區的U類型數據聚合起來得到類型為U的值
def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("Fold")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1,2,3,4),2)
    val aggregateRDD = rdd.aggregate(2)(_+_,_ * _)
    println(aggregateRDD)
    sc.stop
  }
輸出:
90
步驟1:分區1:zeroValue+1+2=5   分區2:zeroValue+3+4=9
 
步驟2:zeroValue*分區1的結果*分區2的結果=90
 
            (RDD依賴圖)
 
12.fold(zeroValue:T)(op:(T,T) => T):通過op函數聚合各分區中的元素及合並各分區的元素,op函數需要兩個參數,在開始時第一個傳入的參數為zeroValue,T為RDD數據集的數據類型,,其作用相當於SeqOp和comOp函數都相同的aggregate函數
例3
def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("Fold")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)), 2)
    val foldRDD = rdd.fold(("d", 0))((val1, val2) => { if (val1._2 >= val2._2) val1 else val2
    })
    println(foldRDD)
  }
輸出:
c,5
其過程如下:
1.開始時將(“d”,0)作為op函數的第一個參數傳入,將Array中和第一個元素("a",1)作為op函數的第二個參數傳入,並比較value的值,返回value值較大的元素
 
2.將上一步返回的元素又作為op函數的第一個參數傳入,Array的下一個元素作為op函數的第二個參數傳入,比較大小
 
3.重復第2步驟
 
每個分區的數據集都會經過以上三步后匯聚后再重復以上三步得出最大值的那個元素,對於其他op函數也類似,只不過函數里的處理數據的方式不同而已
 
             (RDD依賴圖)
 
13.saveAsFile(path:String):將最終的結果數據保存到指定的HDFS目錄中
 
14.saveAsSequenceFile(path:String):將最終的結果數據以sequence的格式保存到指定的HDFS目錄中
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM