摘要:
RDD:彈性分布式數據集,是一種特殊集合 ‚ 支持多種來源 ‚ 有容錯機制 ‚ 可以被緩存 ‚ 支持並行操作,一個RDD代表一個分區里的數據集
RDD有兩種操作算子:
Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另一個RDD時並沒有立即進行轉換,僅僅是記住了數據集的邏輯操作
Ation(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
本系列主要講解Spark中常用的函數操作:
1.RDD基本轉換
本節所講函數
基礎轉換操作:
(例1)
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10) //創建RDD
val map = rdd.map(_*2) //對RDD中的每個元素都乘於2
map.foreach(x => print(x+" "))
sc.stop()
}
}
輸出:
2 4 6 8 10 12 14 16 18 20
(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)
(例2)
//...省略sc
val rdd = sc.parallelize(1 to 5)
val fm = rdd.flatMap(x => (1 to x)).collect()
fm.foreach( x => print(x + " "))
輸出:
1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
如果是map函數其輸出如下:
Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)
(RDD依賴圖)

func的類型:Iterator[T] => Iterator[U]
假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,當在映射的過程中不斷的創建對象時就可以使用mapPartitions比map的效率要高很多,比如當向數據庫寫入數據時,如果使用map就需要為每個元素創建connection對象,但使用mapPartitions的話就需要為每個分區創建connetcion對象
(例3):輸出有女性的名字:
object MapPartitions {
//定義函數
def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = {
var woman = List[String]()
while (iter.hasNext){
val next = iter.next()
next match {
case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman
case _ =>
}
}
return woman.iterator
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("mappartitions")
val sc = new SparkContext(conf)
val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
val rdd = sc.parallelize(l,2)
val mp = rdd.mapPartitions(partitionsFun)
/*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
mp.collect.foreach(x => (print(x +" "))) //將分區中的元素轉換成Aarray再輸出
}
}
輸出:
kpop lucy
其實這個效果可以用一條語句完成
val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1)
之所以不那么做是為了演示函數的定義
(RDD依賴圖)
func類型:(Int, Iterator[T]) => Iterator[U]
(例4):將例3橙色的注釋部分去掉即是
輸出:(帶了分區索引)
[0]kpop [1]lucy
5.sample(withReplacement,fraction,seed):以指定的隨機種子隨機抽樣出數量為fraction的數據,withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣
(例5):從RDD中隨機且有放回的抽出50%的數據,隨機種子值為3(即可能以1 2 3的其中一個起始值)
//省略
val rdd = sc.parallelize(1 to 10)
val sample1 = rdd.sample(true,0.5,3)
sample1.collect.foreach(x => print(x + " "))
sc.stop
//省略sc val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) val unionRDD = rdd1.union(rdd2) unionRDD.collect.foreach(x => print(x + " ")) sc.stop
輸出:
1 2 3 3 4 5
//省略sc val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) val unionRDD = rdd1.intersection(rdd2) unionRDD.collect.foreach(x => print(x + " ")) sc.stop
輸出:
3 4
//省略sc val list = List(1,1,2,5,2,9,6,1) val distinctRDD = sc.parallelize(list) val unionRDD = distinctRDD.distinct() unionRDD.collect.foreach(x => print(x + " "))
輸出:
1 6 9 5 2
//省略 val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(2 to 5) val cartesianRDD = rdd1.cartesian(rdd2) cartesianRDD.foreach(x => println(x + " "))
輸出:
(1,2) (1,3) (1,4) (1,5) (2,2) (2,3) (2,4) (2,5) (3,2) (3,3) (3,4) (3,5)
(RDD依賴圖)

目,但不會報錯,只是分區個數還是原來的
(例9:)
shuffle=false
//省略
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //當suffle的值為false時,不能增加分區數(即分區數不能從5->7)
println("重新分區后的分區個數:"+coalesceRDD.partitions.size)
輸出:
重新分區后的分區個數:3 //分區后的數據集 List(1, 2, 3, 4) List(5, 6, 7, 8) List(9, 10, 11, 12, 13, 14, 15, 16)
(例9.1:)
shuffle=true
//...省略
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(7,true)
println("重新分區后的分區個數:"+coalesceRDD.partitions.size)
println("RDD依賴關系:"+coalesceRDD.toDebugString)
輸出:
重新分區后的分區個數:5 RDD依賴關系:(5) MapPartitionsRDD[4] at coalesce at Coalesce.scala:14 [] | CoalescedRDD[3] at coalesce at Coalesce.scala:14 [] | ShuffledRDD[2] at coalesce at Coalesce.scala:14 [] +-(4) MapPartitionsRDD[1] at coalesce at Coalesce.scala:14 [] | ParallelCollectionRDD[0] at parallelize at Coalesce.scala:13 [] //分區后的數據集 List(10, 13) List(1, 5, 11, 14) List(2, 6, 12, 15) List(3, 7, 16) List(4, 8, 9)
(RDD依賴圖:coalesce(3,flase))

(RDD依賴圖:coalesce(3,true))
11.repartition(numPartition):是函數coalesce(numPartition,true)的實現,效果和例9.1的coalesce(numPartition,true)的一樣
//省略 val rdd = sc.parallelize(1 to 16,4) val glomRDD = rdd.glom() //RDD[Array[T]] glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName)) sc.stop
輸出:
int[] //說明RDD中的元素被轉換成數組Array[Int]
//省略sc val rdd = sc.parallelize(1 to 10) val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0)) randomSplitRDD(0).foreach(x => print(x +" ")) randomSplitRDD(1).foreach(x => print(x +" ")) randomSplitRDD(2).foreach(x => print(x +" ")) sc.stop
輸出:
2 4 3 8 9 1 5 6 7 10
以上例子源碼地址:https://github.com/Mobin-F/SparkExample/tree/master/src/main/scala/com/mobin/SparkRDDFun/TransFormation/KVRDD
