RDD:RDD分區數,若從HDFS創建RDD,RDD的分區就是和文件塊一一對應,若是集合並行化形式創建,RDD分區數可以指定,一般默認值是CPU的核數。
task:task數量就是和分區數量對應。
這個全:https://www.cnblogs.com/frankdeng/p/9301672.html
一、transformation算子:
(1)map(func):將函數應用於RDD中的每一個元素,將返回值構成新的RDD。輸入分區與輸出分區一對一,即:有多少個輸入分區,就有多少個輸出分區。
rdd.map(x=>x+1)
如:{1,2,3,3} 結果為 {2,3,4,4}
hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive
//讀取HDFS文件到RDD
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子 scala> var mapresult = data.map(line => line.split("\\s+")) mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
//結果 scala> mapresult.collect res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
(2)flatMap(func):比map多一步合並操作,首先將數組元素進行映射,然后合並壓平所有的數組。
//使用flatMap算子 scala> var flatmapresult = data.flatMap(line => line.split("\\s+")) flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
//結果 scala> flatmapresult.collect res1: Array[String] = Array(hello, world, hello, spark, hello, hive)
參考博客:https://www.cnblogs.com/devin-ou/p/8028305.html
(3)mapPartitions(func):函數中傳入的參數是迭代器,迭代器里面保存的是一個分區里面的數據。
/** * makeRDD方法的第一個參數代表的是RDD中的 元素 * 第二個參數:RDD的分區數 * rdd[Int] */ val rdd = sc.makeRDD(1 to 10,3) /** * mapPartitions這個算子遍歷的單位是partition * 會將一個partition的數據量全部加載到一個集合里面 */ val mapPartitonsRDD = rdd.mapPartitions(iterator=>{ val list = new ListBuffer[Int]() //創建一個數據庫連接 while(iterator.hasNext){ val num = iterator.next() list.+=(num+100) } //批量插入數據庫 list.iterator }, false) /** * 想要執行,必須有action類的算子 * collect算子會將集群中計算的結果回收到Driver端,慎用 */ val resultArr = mapPartitonsRDD.collect() resultArr.foreach { println }
map和mapPartition的異同:
mapPartition function一次處理一個分區的數據,性能比較高;
map的function一次只處理一條數據。
如果在map過程中需要頻繁創建額外的對象(例如將rdd中的數據通過jdbc寫入數據庫,map需要為每個元素創建一個鏈接而mapPartition為每個partition創建一個鏈接),則mapPartitions效率比map高的多。
SparkSql或DataFrame默認會對程序進行mapPartition的優化。
參考博客:https://blog.csdn.net/wuxintdrh/article/details/80278479
(4)distinct:對RDD中的元素進行去重操作。
scala> data.flatMap(line => line.split("\\s+")).collect res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark) scala> data.flatMap(line => line.split("\\s+")).distinct.collect res62: Array[String] = Array(hive, hello, world, spark, hi)
(5)reduceByKey(func,[numTask]):找到相同的key,對其進行聚合,聚合的規則由func指定。
reduce任務的數量可以由numTask指定
goodsSaleRDD.reduceByKey((x,y) => x+y)
參考博客:https://www.jianshu.com/p/af175e66ce99
(6)groupByKey():對相同的key進行分組。
(7)aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U)
第一個參數代表着 初始值
第二個參數是中間聚合,在每個分區內部按照key執行聚合操作。這個分兩步,第一步先將每個value和初始值作為函數參數進行計算,返回的結果作為新的kv對。然后在對結果再帶入到函數中計算。
第三個參數是最終聚合,對中間聚合結果進行最終聚合。
例如:一個RDD有兩個分區,
patition1:(1,1) (1,2) (2,1)
patition2:(2,3)(2,4)(1,7)
首先,在每個patition中將value和初始值三帶入到seqFunc函數中,得到中間結果kv:
patition1:(1,3) (1,3) (2,3)
patition2:(2,3)(2,4)(1,7)
再將中間結果kv帶入到seqFunc函數中,按照key進行聚合
patition1:(1,3)(2,3)
patition2:(2,4)(1,7)
最后,進行整體聚合,將上一步結果帶入combFunc
(1,10)(2,7)
def seqFunc(a,b): print "seqFunc:%s,%s" %(a,b) return max(a,b) #取最大值 def combFunc(a,b): print "combFunc:%s,%s" %(a ,b) return a + b #累加起來 ''' aggregateByKey這個算子內部肯定有分組 ''' aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
參考博客:https://blog.csdn.net/qq_35440040/article/details/82691794 這個寫的挺亂,但有地方可以參考
(8)combineByKey ( createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C,C) =>C ) :
主要分為三步,第一步,對value進行初始化處理;第二步,在分區內部對(key,value)進行處理,第三步,所有分區間對(key,value)進行處理。
https://www.jianshu.com/p/b77a6294f31c
參考博客:https://www.jianshu.com/p/b77a6294f31c
(9)sortBy():排序操作
二、action算子
基本RDD的action操作
1、reduce():接收一個函數作為參數,這個函數操作兩個相同元素類型的RDD並返回一個同樣類型的新元素。
val sum=rdd.reduce( (x,y) => x+y )
2、aggregate(zeroValue)(seqOp, combOp):期待返回的類型的初始值。然后通過一個函數把RDD中的元素合並起來並放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合並。
val result = input.aggregate((0,0))( (acc, value) => (acc._1 + value, acc._2+1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val avg = result._1 / result._2.toDouble
2、collect():以普通集合或者值的形式,把數據返回驅動器程序,它會將整個RDD的內容返回。通常在單元測試中使用,由於需要將數據復制到驅動器進程中,collect()要求所有的數據都必須能一同放入單台機器的內存中。
rdd.collect()
結果:{1,2,3,3}
Spark的collect方法,是Action類型的一個算子,會從遠程集群拉取數據到driver端。最后,將大量數據匯集到一個driver節點上,將數據用數組存放,占用了jvm堆內存,非常用意造成內存溢出,只用作小型數據的觀察。
如何避免使用collect:
若需要遍歷RDD中元素,可以使用foreach語句;
若需要打印RDD中元素,可用take語句,返回數據集前n個元素,data.take(1000).foreach(println)
,這點官方文檔里有說明;
若需要查看其中內容,可用saveAsTextFile方法;
總之,單機環境下使用collect問題並不大,但分布式環境下盡量規避,如有其他需要,手動編寫代碼實現相應功能就好。
參考博客:https://blog.csdn.net/chaoshengmingyue/article/details/82021746
3、first:返回RDD中的第一個元素,不排序。
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21 scala> rdd1.first res14: (String, String) = (A,1)
4、take():返回RDD中的n個元素,並嘗試只訪問盡量少的分區,因此該操作會得到一個不均衡的集合。
rdd.take(2)
結果:{1,2}
5、foreach(fuc):對RDD中的每個元素使用給定的函數。
rdd.foreach(func)
6、saveAsTextFile:saveAsTextFile用於將RDD以文本文件的格式存儲到文件系統中。
var rdd1 = sc.makeRDD(1 to 10,2) scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS hadoop fs -ls /tmp/lxw1234.com Found 2 items -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS -rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000 hadoop fs -cat /tmp/lxw1234.com/part-00000 1 2 3 4 5
Pair RDD的action操作:
1、countByKey ():對每個鍵對應的元素分別計數
rdd.countBykey()
結果:{(1,1),(3,2)}
2、collectAsMap():將結果以映射的形式返回,以便查詢。
rdd.collectAsMap()
結果:Map{(1,2),(3,6)}
3、lookup(key)返回給定鍵對應的所有值
rdd.lookup(3)
結果:[4, 6]