[大數據之Spark]——Transformations轉換入門經典實例


Spark相比於Mapreduce的一大優勢就是提供了很多的方法,可以直接使用;另一個優勢就是執行速度快,這要得益於DAG的調度,想要理解這個調度規則,還要理解函數之間的依賴關系。

本篇就着重描述下Spark提供的Transformations方法.

依賴關系

寬依賴和窄依賴

窄依賴(narrow dependencies)

窄依賴是指父RDD僅僅被一個子RDD所使用,子RDD的每個分區依賴於常數個父分區(O(1),與數據規模無關)。

  • 輸入輸出一對一的算子,且結果RDD的分區結構不變。主要是map/flatmap
  • 輸入輸出一對一的算子,但結果RDD的分區結構發生了變化,如union/coalesce
  • 從輸入中選擇部分元素的算子,如filter、distinct、substract、sample
寬依賴(wide dependencies)

寬依賴是指父RDD被多個子分區使用,子RDD的每個分區依賴於所有的父RDD分區(O(n),與數據規模有關)

  • 對單個RDD基於key進行重組和reduce,如groupByKey,reduceByKey
  • 對兩個RDD基於key進行join和重組,如join(父RDD不是hash-partitioned )
  • 需要進行分區,如partitionBy

Transformations轉換方法實例

map(func)

map用於遍歷rdd中的每個元素,可以針對每個元素做操作處理:

scala> var data = sc.parallelize(1 to 9,3)
//內容為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> data.map(x=>x*2).collect()
//輸出內容 Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

filter(func)

filter用於過濾元素信息,僅僅返回滿足過濾條件的元素

scala> var data = sc.parallelize(1 to 9,3)
//內容為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> data.filter(x=> x%2==0).collect()
//輸出內容 Array[Int] = Array(2, 4, 6, 8)

flatMap(func)

flatMap與map相比,不同的是可以輸出多個結果,比如

scala> var data = sc.parallelize(1 to 4,1)
//輸出內容為 Array[Int] = Array(1, 2, 3, 4)

scala> data.flatMap(x=> 1 to x).collect()
//輸出內容為 Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

mapPartitions(func)

mapPartitions與map類似,只不過每個元素都是一個分區的迭代器,因此內部可以針對分區為單位進行處理。

比如,針對每個分區做和

//首先創建三個分區
scala> var data = sc.parallelize(1 to 9,3)
//輸出為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

//查看分區的個數
scala> data.partitions.size
//輸出為 Int = 3

//使用mapPartitions
scala> var result = data.mapPartitions{ x=> {
     | var res = List[Int]()
     | var i = 0
     | while(x.hasNext){
     | i+=x.next()
     | }
     | res.::(i).iterator
     | }}

scala> result.collect
//輸出為 Array[Int] = Array(6, 15, 24)

mapPartitionsWithIndex(func)

這個方法與上面的mapPartitions相同,只不過多提供了一個Index參數。

//首先創建三個分區
scala> var data = sc.parallelize(1 to 9,3)
//輸出為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

//查看分區的個數
scala> data.partitions.size
//輸出為 Int = 3

scala> var result = data.mapPartitionsWithIndex{
     | (x,iter) => {
     | var result = List[String]()
     | var i = 0
     | while(iter.hasNext){
     | i += iter.next()
     | }
     | result.::( x + "|" +i).iterator
     | }}
     
result.collect
//輸出結果為 Array[String] = Array(0|6, 1|15, 2|24)

sample(withReplacement, fraction, seed)

這個方法可以用於對數據進行采樣,比如從1000個數據里面隨機5個數據。

  • 第一個參數withReplacement代表是否進行替換,如果選true,上面的例子中,會出現重復的數據
  • 第二個參數fraction 表示隨機的比例
  • 第三個參數seed 表示隨機的種子
//創建數據
var data = sc.parallelize(1 to 1000,1)

//采用固定的種子seed隨機
data.sample(false,0.005,0).collect
//輸出為 Array[Int] = Array(53, 423, 433, 523, 956, 990)

//采用隨機種子
data.sample(false,0.005,scala.util.Random.nextInt(1000)).collect
//輸出為 Array[Int] = Array(136, 158)

union(otherDataset)

union方法可以合並兩個數據集,但是不會去重,僅僅合並而已。

//創建第一個數據集
scala> var data1 = sc.parallelize(1 to 5,1)

//創建第二個數據集
scala> var data2 = sc.parallelize(3 to 7,1)

//取並集
scala> data1.union(data2).collect
//輸出為 Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)

intersection(otherDataset)

這個方法用於取兩個數據集的交集

//創建第一個數據集
scala> var data1 = sc.parallelize(1 to 5,1)

//創建第二個數據集
scala> var data2 = sc.parallelize(3 to 7,1)

//取交集
scala> data1.intersection(data2).collect
//輸出為 Array[Int] = Array(4, 3, 5)

distinct([numTasks]))

這個方法用於對本身的數據集進行去重處理。

//創建數據集
scala> var data = sc.parallelize(List(1,1,1,2,2,3,4),1)

//執行去重
scala> data.distinct.collect
//輸出為 Array[Int] = Array(4, 1, 3, 2)

//如果是鍵值對的數據,kv都相同,才算是相同的元素
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))

//執行去重
scala> data.distinct.collect
//輸出為 Array[(String, Int)] = Array((A,1), (B,1), (A,2))

groupByKey([numTasks])

這個方法屬於寬依賴的方法,針對所有的kv進行分組,可以把相同的k的聚合起來。如果要想計算sum等操作,最好使用reduceByKey或者combineByKey

//創建數據集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))

//分組輸出
scala> data.groupByKey.collect
//輸出為 Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(1, 1, 2)))

reduceByKey(func, [numTasks])

這個方法用於根據key作分組計算,但是它跟reduce不同,它還是屬於transfomation的方法。

//創建數據集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))

scala> data.reduceByKey((x,y) => x+y).collect
//輸出為 Array[(String, Int)] = Array((B,1), (A,4))

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

aggregateByKey比較復雜,我也不是很熟練,不過試驗了下,大概的意思是針對分區內部使用seqOp方法,針對最后的結果使用combOp方法。

比如,想要統計分區內的最大值,然后再全部統計加和:

scala> var data = sc.parallelize(List((1,1),(1,2),(1,3),(2,4)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[54] at parallelize at <console>:27

scala> def sum(a:Int,b:Int):Int = { a+b }
sum: (a: Int, b: Int)Int

scala> data.aggregateByKey(0)(sum,sum).collect
res42: Array[(Int, Int)] = Array((2,4), (1,6))

scala> def max(a:Int,b:Int):Int = { math.max(a,b) }
max: (a: Int, b: Int)Int

scala> data.aggregateByKey(0)(max,sum).collect
res44: Array[(Int, Int)] = Array((2,4), (1,5))

sortByKey([ascending], [numTasks])

sortByKey用於針對Key做排序,默認是按照升序排序。

//創建數據集
scala> var data = sc.parallelize(List(("A",2),("B",2),("A",1),("B",1),("C",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27

//對數據集按照key進行默認排序
scala> data.sortByKey().collect
res23: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))

//升序排序
scala> data.sortByKey(true).collect
res24: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))

//降序排序
scala> data.sortByKey(false).collect
res25: Array[(String, Int)] = Array((C,1), (B,2), (B,1), (A,2), (A,1))

join(otherDataset, [numTasks])

join方法為(K,V)和(K,W)的數據集調用,返回相同的K,所組成的數據集。相當於sql中的按照key做連接。

有點類似於 select a.value,b.value from a inner join b on a.key = b.key;

舉個例子

//創建第一個數據集
scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))

//創建第二個數據集
scala> var data2 = sc.parallelize(List(("A",4)))

//創建第三個數據集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))

data1.join(data2).collect
//輸出為 Array[(String, (Int, Int))] = Array((A,(1,4)))

data1.join(data3).collect
//輸出為 Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(1,5)))

cogroup(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的數據集上調用,返回一個 (K, (Seq[V], Seq[W]))元組的數據集。

//創建第一個數據集
scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))

//創建第二個數據集
scala> var data2 = sc.parallelize(List(("A",4)))

//創建第三個數據集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))

scala> data1.cogroup(data2).collect
//輸出為 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4))), (C,(CompactBuffer(3),CompactBuffer())))

scala> data1.cogroup(data3).collect
//輸出為 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4, 5))), (C,(CompactBuffer(3),CompactBuffer())))

cartesian(otherDataset)

這個方法用於計算兩個(K,V)數據集之間的笛卡爾積

//創建第一個數據集
scala> var a = sc.parallelize(List(1,2))

//創建第二個數據集
scala> var b = sc.parallelize(List("A","B"))

//計算笛卡爾積
scala> a.cartesian(b).collect
//輸出結果 res2: Array[(Int, String)] = Array((1,A), (1,B), (2,A), (2,B))

pipe(command, [envVars])

pipe方法用於針對每個分區的RDD執行一個shell腳本命令,可以使perl或者bash。分區的元素將會被當做輸入,腳本的輸出則被當做返回的RDD值。

//創建數據集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:27

//測試腳本
scala> data.pipe("head -n 1").collect
res26: Array[String] = Array(1, 4, 7)

scala> data.pipe("tail -n 1").collect
res27: Array[String] = Array(3, 6, 9)

scala> data.pipe("tail -n 2").collect
res28: Array[String] = Array(2, 3, 5, 6, 8, 9)

coalesce(numPartitions)

這個方法用於對RDD進行重新分區,第一個參數是分區的數量,第二個參數是是否進行shuffle

//創建數據集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

//查看分區的大小
scala> data.partitions.size
res3: Int = 3

//不使用shuffle重新分區
scala> var result = data.coalesce(2,false)
result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[19] at coalesce at <console>:29

scala> result.partitions.length
res12: Int = 2

scala> result.toDebugString
res13: String = 
(2) CoalescedRDD[19] at coalesce at <console>:29 []
 |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

//使用shuffle重新分區
scala> var result = data.coalesce(2,true)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at coalesce at <console>:29

scala> result.partitions.length
res14: Int = 2

scala> result.toDebugString
res15: String = 
(2) MapPartitionsRDD[23] at coalesce at <console>:29 []
 |  CoalescedRDD[22] at coalesce at <console>:29 []
 |  ShuffledRDD[21] at coalesce at <console>:29 []
 +-(3) MapPartitionsRDD[20] at coalesce at <console>:29 []
    |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

repartition(numPartitions)

這個方法作用於coalesce一樣,重新對RDD進行分區,相當於shuffle版的calesce

//創建數據集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

//查看分區的大小
scala> data.partitions.size
res3: Int = 3

scala> var result = data.repartition(2)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at <console>:29

scala> result.partitions.length
res16: Int = 2

scala> result.toDebugString
res17: String = 
(2) MapPartitionsRDD[27] at repartition at <console>:29 []
 |  CoalescedRDD[26] at repartition at <console>:29 []
 |  ShuffledRDD[25] at repartition at <console>:29 []
 +-(3) MapPartitionsRDD[24] at repartition at <console>:29 []
    |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

scala>

repartitionAndSortWithinPartitions(partitioner)

這個方法是在分區中按照key進行排序,這種方式比先分區再sort更高效,因為相當於在shuffle階段就進行排序。

下面的例子中,由於看不到分區里面的數據。可以通過設置分區個數為1,看到排序的效果。

scala> var data = sc.parallelize(List((1,2),(1,1),(2,3),(2,1),(1,4),(3,5)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[60] at parallelize at <console>:27

scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(2)).collect
res52: Array[(Int, Int)] = Array((2,3), (2,1), (1,2), (1,1), (1,4), (3,5))

scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(1)).collect
res53: Array[(Int, Int)] = Array((1,2), (1,1), (1,4), (2,3), (2,1), (3,5))

scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(3)).collect
res54: Array[(Int, Int)] = Array((3,5), (1,2), (1,1), (1,4), (2,3), (2,1))

參考

spark 官方文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM