列舉spark所有算子


一、RDD概述
     1、什么是RDD
          RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
     2、RDD屬性
     (1)、 一組分片(Partition),即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。
     (2)、 一個計算每個分片的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。
     (3)、RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
     (4)、一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
     (5)、一個列表,存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
     3、創建RDD
          (1)、由一個已經存在的Scala集合創建。
                         val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
          (2)、由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等
                         val rdd2 = sc.textFile("hdfs://hadoop141:8020/words.txt")
          (3)、查看該rdd的分區數量,默認是程序所分配的cpu core的數量,也可以在創建的時候指定
                         rdd1.partitions.length
                     創建的時候指定分區數量:
                         val rdd1 = sc.parallelize(Array(1,2,3.4),3)
二、RDD編程API---包含兩種算子
     1、Transformation
          RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
     2、常用的Transformation操作:
          (1)map(func):返回一個新的RDD,該RDD由每一個輸入的元素經過func函數轉換后組成。
          (2)filter(func):返回一個新的RDD,該RDD由每一個輸入的元素經過func函數計算后返回為true的輸入元素組成。
          (3)sortBy(func,[ascending], [numTasks]):返回一個新的RDD,輸入元素經過func函數計算后,按照指定的方式進行排序。(默認方式為false,升序;true是降序)
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
          (4)flatMap(func):類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)。類似於先map,然后再flatten。
val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd4.flatMap(_.split(' ')).collect
------------------------------------------------------------------
val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
rdd5.flatMap(_.flatMap(_.split(" "))).collect
          (5)union:求並集,注意類型要一致
          (6)intersection:求交集
          (7)distinct:去重
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect
--------------------------------------------
val rdd9 = rdd6.intersection(rdd7)
          (8)join、leftOuterJoin、rightOuterJoin
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
--------------------------------------------------------------------------
val rdd3 = rdd1.join(rdd2).collect
rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9)))
---------------------------------------------------------------------------
val rdd3 = rdd1.leftOuterJoin(rdd2).collect
rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
---------------------------------------------------------------------------
val rdd3 = rdd1.rightOuterJoin(rdd2).collect
rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
          (9)groupByKey([numTasks]):在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD----只針對數據是對偶元組的
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
val rdd3 = rdd1 union rdd2
val rdd4 = rdd3.groupByKey.collect
rdd4: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(9, 2)))
-----------------------------------------------------------------------------------
val rdd5 = rdd4.map(x=>(x._1,x._2.sum))
rdd5: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
                groupBy:傳入一個參數的函數,按照傳入的參數為key,返回一個新的RDD[(K, Iterable[T])],value是所有可以相同的傳入數據組成的迭代器。
                              以下為源碼:
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
  groupBy[K](f, defaultPartitioner(this))
}

                具體代碼案例:
scala> val rdd1=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5)))
rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24
 
scala> rdd1.groupBy(_._1).collect
res18: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))
          (10)reduceByKey(func,[numTasks]):在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置。
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
val rdd3 = rdd1 union rdd2
val rdd6 = rdd3.reduceByKey(_+_).collect
rdd6: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
          (11)cogroup(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2).collect
rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(2, 1),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
----------------------------------------------------------------------------------------
val rdd4 = rdd3.map(x=>(x._1,x._2._1.sum+x._2._2.sum))
rdd4: Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,2), (kitty,2))
          (12)cartesian(otherDataset )笛卡爾積
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2).collect
rdd3: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
     2、Action
          一旦觸發,就會執行一個任務
 
三、RDD編程----高級API
     1、
          mapPartitions:針對每個分區進行操作,源碼如下:要求傳入一個Iterator,並且返回一個Iterator
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
}
          mapPartitionsWithIndex:針對每個partition操作,把每個partition中的分區號和對應的值拿出來。是Transformation
          (1)源碼:
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
preservesPartitioning表示返回RDD是否留有分區器。僅當RDD為K-V型RDD,且key沒有被修飾的情況下,可設為true。非K-V型RDD一般不存在分區器;K-V RDD key被修改后,元素將不再滿足分區器的分區要求。這些情況下,須設為false,表示返回的RDD沒有被分區器分過區。
*/
def mapPartitionsWithIndex[U: ClassTag](-------要求傳入一個函數
    f: (Int, Iterator[T]) => Iterator[U],------函數要求傳入兩個參數
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
    preservesPartitioning)
}
     (2)代碼實例:
(1)首先自定義一個函數,符合mapPartitionsWithIndex參數要求的函數
scala> val func = (index : Int,iter : Iterator[Int]) => {
     | iter.toList.map(x=>"[PartID:" + index + ",val:" + x + "]").iterator
     | }
func: (Int, Iterator[Int]) => Iterator[String] = <function2>
(2)定義一個算子,分區數為2
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(3)調用方法,傳入自定義的函數
scala> rdd1.mapPartitionsWithIndex(func).collect
res0: Array[String] = Array([PartID:0,val:1], [PartID:0,val:2], [PartID:0,val:3], [PartID:0,val:4], [PartID:1,val:5], [PartID:1,val:6], [PartID:1,val:7], [PartID:1,val:8], [PartID:1,val:9])
     2、aggregate:聚合操作,是Action
          (1)源碼
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
將RDD中元素聚集,須提供0初值(因為累積元素,所有要提供累積的初值)。先在分區內依照seqOp函數聚集元素(把T類型元素聚集為U類型的分區“結果”),再在分區間按照combOp函數聚集分區計算結果,最后返回這個結果
*
* @param zeroValue the initial value for the accumulated result of each partition for the
*                  `seqOp` operator, and also the initial value for the combine results from
*                  different partitions for the `combOp` operator - this will typically be the
*                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
第一個參數是初始值, 第二個參數:是兩個函數[每個函數都是2個參數(第一個參數:先對個個分區進行合並, 第二個:對個個分區合並后的結果再進行合並), 輸出一個參數]
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
  val cleanSeqOp = sc.clean(seqOp)
  val cleanCombOp = sc.clean(combOp)
  val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
  val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
  sc.runJob(this, aggregatePartition, mergeResult)
  jobResult
}
          (2)代碼實例:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//這里先對連個分區分別進行相加,然后兩個的分區相加后的結果再相加得出最后的結果
scala> rdd1.aggregate(0)(_+_,_+_)
res0: Int = 45                                                                 
//先對每個分區比較求出最大值,然后每個分區求出的最大值再相加得出最后的結果
scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res1: Int = 13
//這里需要注意,初始值是每次都要參與運算的,例如下面的代碼:分區1是1,2,3,4;初始值為5,則他們比較最大值就是5,分區2是5,6,7,8,9;初始值為5,則他們比較結果最大值就是9;然后再相加,這里初始值也要參與運算,5+(5+9)=19
scala> rdd1.aggregate(5)(math.max(_,_),_+_)
res0: Int = 19
-----------------------------------------------------------------------------------------------
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
//這里需要注意,由於每個分區計算是並行計算,所以計算出的結果有先后順序,所以結果會出現兩種情況:如下
scala> rdd2.aggregate("")(_+_,_+_)
res0: String = defabc                                                                                                                    
 
scala> rdd2.aggregate("")(_+_,_+_)
res2: String = abcdef
//這里的例子更能說明上面提到的初始值參與計算的問題,我們可以看到初始值=號參與了三次計算
scala> rdd2.aggregate("=")(_+_,_+_)
res0: String = ==def=abc
--------------------------------------------------------------------------------------
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
 
scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
res1: String = 42                                                               
 
scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
res3: String = 24
-------------------------------------------------------------------------------------------
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
//這里需要注意:第一個分區加上初始值元素為"","12","23",兩兩比較,最小的長度為1;第二個分區加上初始值元素為"","345","",兩兩比較,最小的長度為0
scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
res4: String = 10                                                               
 
scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
res9: String = 01                                                               
------------------------------------------------------------------------------------
//注意與上面的例子的區別,這里定義的rdd里的元素的順序跟上面不一樣,導致結果不一樣
scala> val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
 
scala> rdd5.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
res1: String = 11 
     3、aggregateByKey:按照key值進行聚合
//定義RDD
scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
//自定義方法,用於傳入mapPartitionsWithIndex
scala> val func=(index:Int,iter:Iterator[(String, Int)])=>{
     | iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
     | }
func: (Int, Iterator[(String, Int)]) => Iterator[String] = <function2>
//查看分區情況
scala> pairRDD.mapPartitionsWithIndex(func).collect
res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
//注意:初始值為0和其他值的區別
scala> pairRDD.aggregateByKey(0)(_+_,_+_).collect
res4: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))               
 
scala> pairRDD.aggregateByKey(10)(_+_,_+_).collect
res5: Array[(String, Int)] = Array((dog,22), (cat,39), (mouse,26))
//下面三個的區別:,第一個比較好理解,由於初始值為0,所以每個分區輸出不同動物中個數最多的那個,然后在累加
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
 
//下面兩個:由於有初始值,就需要考慮初始值參與計算,這里第一個分區的元素為("cat",2), ("cat", 5), ("mouse", 4),初始值是10,不同動物之間兩兩比較value的大小,都需要將初始值加入比較,所以第一個分區輸出為("cat", 10), ("mouse", 10);第二個分區同第一個分區,輸出結果為(dog,12), (cat,12), (mouse,10);所以最后累加的結果為(dog,12), (cat,22), (mouse,20),注意最后的對每個分區結果計算的時候,初始值不參與計算
scala> pairRDD.aggregateByKey(10)(math.max(_,_),_+_).collect
res7: Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))
//這個和上面的類似
scala> pairRDD.aggregateByKey(100)(math.max(_,_),_+_).collect
res8: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
     4、coalesce:返回一個新的RDD
          重新給RDD的元素分區。
          當適當縮小分區數時,如1000->100,spark會把之前的10個分區當作一個分區,並行度變為100,不會引起數據shuffle。
          當嚴重縮小分區數時,如1000->1,運算時的並行度會變成1。為了避免並行效率低下問題,可將shuffle設為true。shuffle之前的運算和之后的運算分為不同stage,它們的並行度分別為1000,1。
          當把分區數增大時,必會存在shuffle,shuffle須設為true。
          
          partitionBy:按照傳入的參數進行分區,傳入的參數為分區的實例對象,可以傳入之定義分區的實例或者默認的HashPartitioner;源碼如下:
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  if (self.partitioner == Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}
 
          repartition:返回一個新的RDD
               按指定分區數重新分區RDD,存在shuffle。
               當指定的分區數比當前分區數目少時,考慮使用coalesce,這樣能夠避免shuffle。
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
 
scala> val rdd2 = rdd1.repartition(6)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:26
 
scala> rdd2.partitions.length
res0: Int = 6
 
scala> val rdd3 = rdd2.coalesce(2,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at coalesce at <console>:28
 
scala> rdd3.partitions.length
res1: Int = 2
     5、collectAsMap:將RDD轉換成Map(注意RDD的數據應為對偶元組)
scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2),("c", 2),("d", 4),("e", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
 
scala> rdd1.collectAsMap
res3: scala.collection.Map[String,Int] = Map(e -> 1, b -> 2, d -> 4, a -> 1, c -> 2)
     6、combineByKey:和reduceByKey的效果相同,reduceByKey底層就是調用combineByKey
1)、源碼
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
    partitioner, mapSideCombine, serializer)(null)
}
 
/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
* This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    numPartitions: Int): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}
 
          (2)參數說明:
                    第一個參數createCombiner: V => C:生成合並器,每組key,取出第一個value的值,然后返回你想合並的類型。
                         第二個參數mergeValue: (C, V) => C:函數,局部計算
                         第三個參數mergeCombiners: (C, C) => C:函數,對局部計算的結果再進行計算
          (3)代碼實例
//首先聲明兩個rdd,然后利用zip將兩個rdd合並成一個,rdd6
scala> val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24
 
scala> val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
 
scala> val rdd6 = rdd5.zip(rdd4)
rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:28
 
scala> rdd6.collect
res6: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
 
//我們需要將按照key進行分組合並,相同的key的value都放在List中
//這里我們第一個參數List(_):表示將第一個value取出放進集合中
//第二個參數(x:List[String],y:String)=>x :+ y:表示局部計算,將value加入到List中
//第三個參數(m:List[String],n:List[String])=>m++n:表示對局部的計算結果再進行計算
 
scala> val rdd7 = rdd6.combineByKey(List(_),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n)
rdd7: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:30
 
scala> rdd7.collect
res7: Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(wolf, bear, bee, salmon, rabbit, gnu)))
 
//這里第一個參數,可以有另外的寫法。如下面的兩個
scala> val rdd7 = rdd6.combineByKey(_::List(),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
 
scala> val rdd7 = rdd6.combineByKey(_::Nil,(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
     7、countByKey、countByValue:按照key或者value計算出現的次數
scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24
 
scala> rdd1.countByKey
res8: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)           
 
scala> rdd1.countByValue
res9: scala.collection.Map[(String, Int),Long] = Map((c,2) -> 1, (a,1) -> 1, (b,2) -> 2, (c,1) -> 1
     8、filterByRange
scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b",6)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
//注意:這里傳入的參數,是左閉右閉的區間
scala> val rdd2 = rdd1.filterByRange("b","d")
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at filterByRange at <console>:26
 
scala> rdd2.collect
res10: Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,6))
     9、flatMapValues:對values進行處理,類似flatMap,會將key和每一個分出來的value組成映射
scala> val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd3: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at parallelize at <console>:24
 
scala> val rdd4 = rdd3.flatMapValues(_.split(" "))
rdd4: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[36] at flatMapValues at <console>:26
 
scala> rdd4.collect
res11: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
          mapValues:不改變key,只針對傳入的鍵值對的value進行計算,類似於map;注意與上面的flatMapValues的區別,它不會改變傳入的key-value對,只是將value按照傳入的函數進行處理;
scala> val rdd3 = sc.parallelize(List(("a",(1,2)),("b",(2,4))))
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ParallelCollectionRDD[57] at parallelize at <console>:24
 
scala> rdd3.mapValues(x=>x._1 + x._2).collect
res34: Array[(String, Int)] = Array((a,3), (b,6))
------------------------------------------------------------------------
如果使用flatMapValues,結果如下,它將value全部拆開跟key組成映射
scala> rdd3.flatMapValues(x=>x + "").collect
res36: Array[(String, Char)] = Array((a,(), (a,1), (a,,), (a,2), (a,)), (b,(), (b,2), (b,,), (b,4), (b,)))
     10、foldByKey:根據key分組,對每一組的value進行計算
scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24
 
scala> val rdd2 = rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[38] at map at <console>:26
 
scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
-----------------------------------------------------------------------------
scala> val rdd3 = rdd2.foldByKey("")(_+_)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[39] at foldByKey at <console>:28
 
scala> rdd3.collect
res13: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
 
scala> val rdd3 = rdd2.foldByKey(" ")(_+_).collect
rdd3: Array[(Int, String)] = Array((4," bear wolf"), (3," dog cat"))
-----------------------------------------------------------------------------
//進行wordcout的計算
val rdd = sc.textFile("hdfs://node-1.itcast.cn:9000/wc").flatMap(_.split(" ")).map((_, 1))
rdd.foldByKey(0)(_+_)
     11、keyBy:以傳入的參數作為key,生成新的RDD
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[41] at parallelize at <console>:24
 
scala> val rdd2 = rdd1.keyBy(_.length)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[42] at keyBy at <console>:26
 
scala> rdd2.collect
res14: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
     12、keys、values:取出rdd的key或者value,生成新的RDD
scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:24
 
scala> rdd1.keys.collect
res16: Array[String] = Array(e, c, d, c, a)
 
scala> rdd1.values.collect
res17: Array[Int] = Array(5, 3, 4, 2, 1)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM