(1)進入spark
./bin/spark-shell
(2)創建RDD
val rdd=sc.parallelize(Array(1,2,3,4,5,6,8))
或者
val rdd1=sc.makeRDD(Array(1,2,3,4,5,6,8))
(3)map實例
1. 作用:返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
2. 需求:創建一個1-10數組的RDD,將所有元素*2形成新的RDD
var source =sc.parallelize(1 to 10)//創建RDD source.collect();//打印 val mapadd=source.map(_*2)//每一個×2 mapadd.collect()//打印
(4)mapPartitions(func) 案例
1. 作用:類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。
2. 需求:創建一個RDD,使每個元素*2組成新的RDD
scala> val add=sc.parallelize(Array(1,2,3,4))
add: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> add.mapPartitions(x=>x.map(_*2))
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at mapPartitions at <console>:26
scala> res3.collect
res4: Array[Int] = Array(2, 4, 6, 8)
(5)mapPartitionsWithIndex(func) 案例
1. 作用:類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U];
2. 需求:創建一個RDD,使每個元素跟所在分區形成一個元組組成一個新的RDD
val rdd=sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> val indexRdd=rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))//index在這個例子中時下標值 indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:25 scala> indexRdd.collect res5: Array[(Int, Int)] = Array((0,1), (1,2), (2,3), (3,4))
cala> val indexRdd=rdd.mapPartitionsWithIndex((index,items)=>(items.map((3,_))))
indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at mapPartitionsWithIndex at <console>:25
scala> indexRdd.collect
res6: Array[(Int, Int)] = Array((3,1), (3,2), (3,3), (3,4))
(6)flatMap(func) 案例
1. 作用:類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
2. 需求:創建一個元素為1-5的RDD,運用flatMap創建一個新的RDD。
scala> val sourceFlat=sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> sourceFlat.collect() res8: Array[Int] = Array(1, 2, 3, 4, 5) scala> val flatMap=sourcelat.flatMap(1 to _)//根據上一個RDD創建一個新的RDD <console>:23: error: not found: value sourcelat val flatMap=sourcelat.flatMap(1 to _) ^ scala> val flatMap=sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at flatMap at <console>:25 scala> flatMap.collect res9: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
(7)glom案例
1. 作用:將每一個分區形成一個數組,形成新的RDD類型時RDD[Array[T]]
2. 需求:創建一個4個分區的RDD,並將每個分區的數據放到一個數組
scala> val rdd=sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> rdd.glom.collect() res10: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
(8)groupBy(func)案例
1. 作用:分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。
2. 需求:創建一個RDD,按照元素模以2的值進行分組。
scala> val rdd=sc.parallelize(1 to 15) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24 scala> val group=rdd.groupBy(_%2)//講所有數除以2 按照余數分組 group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[19] at groupBy at <console>:25 scala> group.collect res13: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8, 10, 12, 14)), (1,CompactBuffer(1, 3, 5, 7, 9, 11, 13, 15)))
(9)filter(func) 案例
1. 作用:過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成。
2. 需求:創建一個RDD(由字符串組成),過濾出一個新RDD(包含”xiao”子串)
scala> val sourceFilter=sc.parallelize(Array("xiaoming","xiaohu","mxlg","uzi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24 scala> sourceFilter.collect res14: Array[String] = Array(xiaoming, xiaohu, mxlg, uzi) scala> val filter=sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at filter at <console>:25 scala> filter.collect res15: Array[String] = Array(xiaoming, xiaohu)
(10)sample(withReplacement, fraction, seed) 案例
1. 作用:以指定的隨機種子隨機抽樣出數量為fraction的數據,withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣,seed用於指定隨機數生成器種子。
2. 需求:創建一個RDD(1-10),從中選擇放回和不放回抽樣
scala> val rdd=sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val sample1=rdd.sample(true,0.4,2)//有放回 sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[1] at sample at <console>:25 scala> sample1.collect res1: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) scala> var sample2=rdd.sample(false,0.2,3)//不放回 sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[2] at sample at <console>:25 scala> sample2.collect res2: Array[Int] = Array(1, 9) scala> var sample2=rdd.sample(false,0.4,2) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[3] at sample at <console>:25 scala> sample2.collect res3: Array[Int] = Array(1, 7, 8)
(11)distinct([numTasks])) 案例
1. 作用:對源RDD進行去重后返回一個新的RDD。默認情況下,只有8個並行任務來操作,但是可以傳入一個可選的numTasks參數改變它。
2. 需求:創建一個RDD,使用distinct()對其去重。
scala> val distinctRdd=sc.parallelize(List(1,2,1,5,3,5,4,8,6,4)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> val unionRDD =distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at distinct at <console>:25 scala> unionRDD.collect res5: Array[Int] = Array(4, 8, 1, 5, 6, 2, 3) scala> val unionRDD =distinctRdd.distinct(2)//設置並行度為2 unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at distinct at <console>:25 scala> unionRDD.collect res6: Array[Int] = Array(4, 6, 8, 2, 1, 3, 5)
(12)coalesce(numPartitions) 案例
1. 作用:縮減分區數,用於大數據集過濾后,提高小數據集的執行效率。
2. 需求:創建一個4個分區的RDD,對其縮減分區
scala> val rdd=sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24 scala> rdd.partitions.size res7: Int = 4 scala> val coalRDD=rdd.coalesce(3) coalRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[12] at coalesce at <console>:25 scala> coalRDD.partitions.size res9: Int = 3
(13)repartition(numPartitions) 案例
1. 作用:根據分區數,重新通過網絡隨機洗牌所有數據。
2. 需求:創建一個4個分區的RDD,對其重新分區
scala> val rdd=sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24 scala> val rerdd=rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at repartition at <console>:25 scala> rerdd.partitions.size res11: Int = 2
coalesce和repartition的區別
1. coalesce重新分區,可以選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。
2. repartition實際上是調用的coalesce,默認是進行shuffle的。
(14)sortBy(func,[ascending], [numTasks]) 案例
1. 作用;使用func先對數據進行處理,按照處理后的數據比較結果排序,默認為正序。
2. 需求:創建一個RDD,按照不同的規則進行排序
scala> val rdd=sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24 scala> rdd.sortBy(x=>x).collect//按照自己排序,默認升序 res12: Array[Int] = Array(1, 2, 3, 4) scala> rdd.sortBy(x=>x%3).collect//按照3的余數排序 res13: Array[Int] = Array(3, 1, 4, 2) scala> rdd.sortBy(p=>p%3).collect//字母不一定非是x res14: Array[Int] = Array(3, 1, 4, 2)
(15)union(otherDataset) 案例
1. 作用:對源RDD和參數RDD求並集后返回一個新的RDD
2. 需求:創建兩個RDD,求並集
scala> val rdd1=sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24 scala> val rdd2=sc.parallelize(3 to 7) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at parallelize at <console>:24 scala> val rdd3=rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[36] at union at <console>:27 scala> rdd3.collect res15: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)
(16)subtract (otherDataset) 案例
1. 作用:計算差的一種函數,去除兩個RDD中相同的元素,不同的RDD將保留下來
2. 需求:創建兩個RDD,求第一個RDD與第二個RDD的差集
scala> val rdd1=sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24 scala> val rdd2=sc.parallelize(3 to 7) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at parallelize at <console>:24
scala> rdd1.subtract(rdd2).collect res17: Array[Int] = Array(1, 2)
(17)intersection(otherDataset) 案例
1. 作用:對源RDD和參數RDD求交集后返回一個新的RDD
2. 需求:創建兩個RDD,求兩個RDD的交集
scala> val rdd1=sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val rdd2=sc.parallelize(3 to 7) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> val rdd3=rdd1.intersection(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at intersection at <console>:27 scala> rdd3.collect res0: Array[Int] = Array(4, 5, 3)
(18)cartesian(otherDataset) 案例
1. 作用:笛卡爾積(盡量避免使用)
2. 需求:創建兩個RDD,計算兩個RDD的笛卡爾積
scala> val rdd1=sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val rdd2=sc.parallelize(3 to 7) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.cartesian(rdd2).collect res1: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (1,7), (2,3), (2,4), (2,5), (2,6), (2,7), (3,3), (3,4), (3,5), (3,6), (3,7), (4,3), (5,3), (4,4), (5,4), (4,5), (5,5), (4,6), (4,7), (5,6), (5,7))
(19)zip(otherDataset)案例
1. 作用:將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及元素數量都相同,否則會拋出異常。
2. 需求:創建兩個RDD,並將兩個RDD組合到一起形成一個(k,v)RDD
scala> val rdd1=sc.parallelize(Array(1,2,3),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val rdd2=sc.parallelize(Array("a","b","c"),3) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24 scala> rdd1.zip(rdd2).collect res2: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
Key-Value類型
(20)partitionBy案例
1. 作用:對pairRDD進行分區操作,如果原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 否則會生成ShuffleRDD,即會產生shuffle過程。
2. 需求:創建一個4個分區的RDD,對其重新分區
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[13] at parallelize at <console>:24 scala> rdd.partitions.size res3: Int = 4 scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at partitionBy at <console>:25 scala> rdd2.partitions.size res4: Int = 2
(21)groupByKey案例
1. 作用:groupByKey也是對每個key進行操作,但只生成一個sequence。
2. 需求:創建一個pairRDD,將相同key對應值聚合到一個sequence中,並計算相同key對應值的相加結果。
scala> val words=Array("one","two","three","three","three" )//創建一個pairRDD words: Array[String] = Array(one, two, three, three, three) scala> val wordPairRDD=sc.parallelize(words).map(word=>(word,1)) wordPairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at map at <console>:26 scala> val group=wordPairRDD.groupByKey()//將相同key對應值聚合到一個sequence中 group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[17] at groupByKey at <console>:25 scala> group.collect//打印 res5: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1))) scala> group.map(t=>(t._1,t._2.sum))//聚合 res6: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:26 scala> res6.collect res7: Array[(String, Int)] = Array((two,1), (one,1), (three,3))
(22)reduceByKey(func, [numTasks]) 案例
1. 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置。
2. 需求:創建一個pairRDD,計算相同key對應值的相加結果
scala> val rdd=sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[19] at parallelize at <console>:24 scala> val reduce=rdd.reduceByKey((x,y)=>x+y) reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:25 scala> reduce.collect(); res8: Array[(String, Int)] = Array((female,6), (male,7))
reduceByKey和groupByKey的區別
1. reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v].
2. groupByKey:按照key進行分組,直接進行shuffle。
(23)foldByKey案例
參數:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
- 作用:aggregateByKey的簡化操作,seqop和combop相同
- 需求:創建一個pairRDD,計算相同key對應值的相加結果
res11: Array[(Int, Int)] = Array((1,3), (1,2), (1,4), (2,3), (3,6), (3,8)) scala> val agg=rdd.foldByKey(0)(_+_)//0為初始值 agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[24] at foldByKey at <console>:25 scala> agg.collect res12: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
(24)combineByKey[C] 案例
參數:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- 作用:對相同K,把V合並成一個集合。
- 參數描述:
(1)createCombiner: combineByKey() 會遍歷分區中的所有元素,因此每個元素的鍵要么還沒有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫作createCombiner()的函數來創建那個鍵對應的累加器的初始值
(2)mergeValue: 如果這是一個在處理當前分區之前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合並
(3)mergeCombiners: 由於每個分區都是獨立處理的, 因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應同一個鍵的累加器, 就需要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合並。
- 需求:創建一個pairRDD,根據key計算每種key的均值。(先計算每個key出現的次數以及可以對應值的總和,再相除得到結果
scala> val input=sc.parallelize(Array(("a",88),("b",95),("a",91),("b",93),("a",95),("b",98)),2)//輸入數據分成一個二元組 input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:24 ^ scala> val combine=input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))//將相同key對應的值相加,同時記錄該key出現的次數,放入一個二元組,具體此語句看下圖
combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[27] at combineByKey at <console>:25 scala> combine.collect//輸出 res15: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3))) scala> val result=combine.map{case(key,value)=>(key,value._1/value._2.toDouble)} //求平均值 result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[28] at map at <console>:25 scala> result.collect//輸出 res16: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333)) scala>
(25)sortByKey([ascending], [numTasks]) 案例
1. 作用:在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
2. 需求:創建一個pairRDD,按照key的正序和倒序進行排序
scala> var rdd=sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24 scala> rdd.sortByKey(true).collect//正序 res17: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc)) scala> rdd.sortByKey(false).collect//倒序 res18: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
(26)mapValues案例
1. 針對於(K,V)形式的類型只對V進行操作
2. 需求:創建一個pairRDD,並將value添加字符串"|||"
scala> val rdd3=sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[36] at parallelize at <console>:24 scala> rdd.mapValues(_+"|||").collect res19: Array[(Int, String)] = Array((3,aa|||), (6,cc|||), (2,bb|||), (1,dd|||))
(27)join(otherDataset, [numTasks]) 案例
1. 作用:在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
2. 需求:創建兩個pairRDD,並將key相同的數據聚合到一個元組。
scala> val rdd=sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24 scala> val rdd1=sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[39] at parallelize at <console>:24 scala> rdd.join(rdd1).collect res20: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
(28)cogroup(otherDataset, [numTasks]) 案例
1. 作用:在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
2. 需求:創建兩個pairRDD,並將key相同的數據聚合到一個迭代器。
scala> val rdd =sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[43] at parallelize at <console>:24 scala> val rdd1=sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at parallelize at <console>:24 scala> rdd.cogroup(rdd1).collect res21: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))