spark常見的transformation和action算子


RDD:RDD分區數,若從HDFS創建RDD,RDD的分區就是和文件塊一一對應,若是集合並行化形式創建,RDD分區數可以指定,一般默認值是CPU的核數。

task:task數量就是和分區數量對應。

這個全:https://www.cnblogs.com/frankdeng/p/9301672.html

一、transformation算子:

(1)map(func):將函數應用於RDD中的每一個元素,將返回值構成新的RDD。輸入分區與輸出分區一對一,即:有多少個輸入分區,就有多少個輸出分區。

rdd.map(x=>x+1)

如:{1,2,3,3}    結果為 {2,3,4,4}

hadoop fs -cat /tmp/lxw1234/1.txt
hello world
hello spark
hello hive

//讀取HDFS文件到RDD
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子
scala> var mapresult = data.map(line => line.split("\\s+"))
mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
//結果
scala> mapresult.collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))

 

(2)flatMap(func):比map多一步合並操作,首先將數組元素進行映射,然后合並壓平所有的數組。

//使用flatMap算子
scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
//結果
scala> flatmapresult.collect
res1: Array[String] = Array(hello, world, hello, spark, hello, hive)

參考博客:https://www.cnblogs.com/devin-ou/p/8028305.html

 

(3)mapPartitions(func):函數中傳入的參數是迭代器,迭代器里面保存的是一個分區里面的數據。

/**

* makeRDD方法的第一個參數代表的是RDD中的 元素

* 第二個參數:RDD的分區數

* rdd[Int]

*/

val rdd = sc.makeRDD(1 to 10,3)

/**

* mapPartitions這個算子遍歷的單位是partition

* 會將一個partition的數據量全部加載到一個集合里面

*/

val mapPartitonsRDD = rdd.mapPartitions(iterator=>{

val list = new ListBuffer[Int]()

//創建一個數據庫連接

while(iterator.hasNext){

val num = iterator.next()

list.+=(num+100)

}

//批量插入數據庫

list.iterator

}, false)

/**

* 想要執行,必須有action類的算子

* collect算子會將集群中計算的結果回收到Driver端,慎用

*/

val resultArr = mapPartitonsRDD.collect()

resultArr.foreach { println }

map和mapPartition的異同:

  mapPartition  function一次處理一個分區的數據,性能比較高;

  map的function一次只處理一條數據。

  如果在map過程中需要頻繁創建額外的對象(例如將rdd中的數據通過jdbc寫入數據庫,map需要為每個元素創建一個鏈接而mapPartition為每個partition創建一個鏈接),則mapPartitions效率比map高的多。

SparkSql或DataFrame默認會對程序進行mapPartition的優化。

參考博客:https://blog.csdn.net/wuxintdrh/article/details/80278479

 

(4)distinct:對RDD中的元素進行去重操作。

scala> data.flatMap(line => line.split("\\s+")).collect
res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
 
scala> data.flatMap(line => line.split("\\s+")).distinct.collect
res62: Array[String] = Array(hive, hello, world, spark, hi)

 

(5)reduceByKey(func,[numTask]):找到相同的key,對其進行聚合,聚合的規則由func指定。

reduce任務的數量可以由numTask指定

goodsSaleRDD.reduceByKey((x,y) => x+y)

 參考博客:https://www.jianshu.com/p/af175e66ce99

 

(6)groupByKey():對相同的key進行分組。

 

(7)aggregateByKey(zeroValue: U,  numPartitions: Int)(seqOp: (U, V) => U,  combOp: (U, U) => U)

第一個參數代表着 初始值

第二個參數是中間聚合,在每個分區內部按照key執行聚合操作。這個分兩步,第一步先將每個value和初始值作為函數參數進行計算,返回的結果作為新的kv對。然后在對結果再帶入到函數中計算。

第三個參數是最終聚合,對中間聚合結果進行最終聚合。

 

例如:一個RDD有兩個分區,

patition1:(1,1) (1,2) (2,1)

patition2:(2,3)(2,4)(1,7)

首先,在每個patition中將value和初始值三帶入到seqFunc函數中,得到中間結果kv:

patition1:(1,3) (1,3) (2,3)

patition2:(2,3)(2,4)(1,7)

再將中間結果kv帶入到seqFunc函數中,按照key進行聚合

patition1:(1,3)(2,3)

patition2:(2,4)(1,7)

最后,進行整體聚合,將上一步結果帶入combFunc

(1,10)(2,7)

def seqFunc(a,b):
    print "seqFunc:%s,%s" %(a,b)
    return max(a,b) #取最大值
def combFunc(a,b):
    print "combFunc:%s,%s" %(a ,b)
    return a + b #累加起來
'''
    aggregateByKey這個算子內部肯定有分組
'''
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)

參考博客:https://blog.csdn.net/qq_35440040/article/details/82691794 這個寫的挺亂,但有地方可以參考

 

(8)combineByKey ( createCombiner: V=>C,  mergeValue: (C, V) =>C,  mergeCombiners: (C,C) =>C )   :

主要分為三步,第一步,對value進行初始化處理;第二步,在分區內部對(key,value)進行處理,第三步,所有分區間對(key,value)進行處理。

https://www.jianshu.com/p/b77a6294f31c

參考博客:https://www.jianshu.com/p/b77a6294f31c

 

(9)sortBy():排序操作

 

二、action算子

基本RDD的action操作

1、reduce():接收一個函數作為參數,這個函數操作兩個相同元素類型的RDD並返回一個同樣類型的新元素。

val sum=rdd.reduce( (x,y) => x+y )

 

2、aggregate(zeroValue)(seqOp, combOp):期待返回的類型的初始值。然后通過一個函數把RDD中的元素合並起來並放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合並。

val result = input.aggregate((0,0))(
                    (acc, value) => (acc._1 + value, acc._2+1),
                    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))     
val avg = result._1 / result._2.toDouble       

 

2、collect():以普通集合或者值的形式,把數據返回驅動器程序,它會將整個RDD的內容返回。通常在單元測試中使用,由於需要將數據復制到驅動器進程中,collect()要求所有的數據都必須能一同放入單台機器的內存中。

rdd.collect()

結果:{1,2,3,3}

  Spark的collect方法,是Action類型的一個算子,會從遠程集群拉取數據到driver端。最后,將大量數據匯集到一個driver節點上,將數據用數組存放,占用了jvm堆內存,非常用意造成內存溢出,只用作小型數據的觀察。

如何避免使用collect:

若需要遍歷RDD中元素,可以使用foreach語句;
若需要打印RDD中元素,可用take語句,返回數據集前n個元素,data.take(1000).foreach(println),這點官方文檔里有說明;
若需要查看其中內容,可用saveAsTextFile方法;
總之,單機環境下使用collect問題並不大,但分布式環境下盡量規避,如有其他需要,手動編寫代碼實現相應功能就好。


參考博客:https://blog.csdn.net/chaoshengmingyue/article/details/82021746

 

3、first:返回RDD中的第一個元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 
scala> rdd1.first
res14: (String, String) = (A,1)


4、take():返回RDD中的n個元素,並嘗試只訪問盡量少的分區,因此該操作會得到一個不均衡的集合。

rdd.take(2)

結果:{1,2}

 

5、foreach(fuc):對RDD中的每個元素使用給定的函數。

rdd.foreach(func)

 

6、saveAsTextFile:saveAsTextFile用於將RDD以文本文件的格式存儲到文件系統中。

復制代碼
var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r--   2 lxw1234 supergroup        0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--   2 lxw1234 supergroup        21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
 
hadoop fs -cat /tmp/lxw1234.com/part-00000
1
2
3
4
5
復制代碼

 

Pair RDD的action操作:

1、countByKey ():對每個鍵對應的元素分別計數

rdd.countBykey()

結果:{(1,1),(3,2)}

 

2、collectAsMap():將結果以映射的形式返回,以便查詢。

rdd.collectAsMap()

結果:Map{(1,2),(3,6)}

 

3、lookup(key)返回給定鍵對應的所有值

rdd.lookup(3)

結果:[4, 6]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM