Spark學習(二):RDD編程


介紹:

RDD--Resilient Distributed Dataset

Spark中RDD是一個不可變的分布式對象集合。每個RDD被分為多個分區,這些分區運行在集群的不同的節點上。
RDD可以包含Python、Java、Scala中的任意類型的對象,以及自定義的對象。

創建RDD的兩種方法:
1 讀取一個數據集(SparkContext.textFile()) : lines = sc.textFile("README.md")
2 讀取一個集合(SparkContext.parallelize()) : lines = sc.paralelize(List("pandas","i like pandas"))

RDD的兩種操作:
1 轉化操作(transformation) : 由一個RDD生成一個新的RDD
2 行動操作(action) : 對RDD中的元素進行計算,並把結果返回

RDD的惰性計算:
可以在任何時候定義新的RDD,但Spark會惰性計算這些RDD。它們只有在第一次行動操作中用到的時候才會真正的計算。
此時也不是把所有的計算都完成,而是進行到滿足行動操作的行為為止。
lines.first() : Spark只會計算RDD的第一個元素的值

常見的轉化操作:
對一個RDD的轉化操作:

原始RDD:

scala> val rdd = sc.parallelize(List(1,2,3,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

map() : 對每個元素進行操作,返回一個新的RDD

scala> rdd.map(x => x +1 ).collect()
res0: Array[Int] = Array(2, 3, 4, 4)

flatMap() : 對每個元素進行操作,將返回的迭代器的所有元素組成一個新的RDD返回

scala> rdd.flatMap(x => x.to(3)).collect()
res2: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)

filter() : 最每個元素進行篩選,返回符合條件的元素組成的一個新RDD

scala> rdd.filter(x => x != 1).collect()
res3: Array[Int] = Array(2, 3, 3)

distinct() : 去掉重復元素

scala> rdd.distinct().collect()
res5: Array[Int] = Array(1, 2, 3)

sample(withReplacement,fration,[seed]) : 對RDD采樣,以及是否去重
  第一個參數如果為true,可能會有重復的元素,如果為false,不會有重復的元素;
  第二個參數取值為[0,1],最后的數據個數大約等於第二個參數乘總數;
  第三個參數為隨機因子。

scala> rdd.sample(false,0.5).collect()
res7: Array[Int] = Array(3, 3)

scala> rdd.sample(false,0.5).collect()
res8: Array[Int] = Array(1, 2)

scala> rdd.sample(false,0.5,10).collect()
res9: Array[Int] = Array(2, 3)

對兩個RDD的轉化操作:

原始RDD:

scala> val rdd1 = sc.parallelize(List(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:27

scala> val rdd2 = sc.parallelize(List(3,4,5))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27

union() :合並,不去重

scala> rdd1.union(rdd2).collect()
res10: Array[Int] = Array(1, 2, 3, 3, 4, 5)

intersection() :交集

scala> rdd1.intersection(rdd2).collect()
res11: Array[Int] = Array(3)

subtract() : 移除相同的內容

scala> rdd1.subtract(rdd2).collect()
res12: Array[Int] = Array(1, 2)

cartesian() :笛卡兒積

scala> rdd1.cartesian(rdd2).collect()
res13: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))

 常見的行動操作:

原始RDD:

scala> val rdd = sc.parallelize(List(1,2,3,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

collect() :返回所有元素

scala> rdd.collect()
res14: Array[Int] = Array(1, 2, 3, 3)

count() :返回元素個數

scala> rdd.count()
res15: Long = 4

countByValue() : 各個元素出現的次數

scala> rdd.countByValue()
res16: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

take(num) : 返回num個元素

scala> rdd.take(2)
res17: Array[Int] = Array(1, 2)

top(num) : 返回前num個元素

scala> rdd.top(2)
res18: Array[Int] = Array(3, 3)

takeOrdered(num)[(ordering)] :按提供的順序,返回最前面的num個元素(需要好好再研究一下)

scala> rdd.takeOrdered(2)
res28: Array[Int] = Array(1, 2)

scala> rdd.takeOrdered(3)
res29: Array[Int] = Array(1, 2, 3)

takeSample(withReplacement,num,[seed]) :采樣

scala> rdd.takeSample(false,1)
res19: Array[Int] = Array(2)

scala> rdd.takeSample(false,2)
res20: Array[Int] = Array(2, 3)

scala> rdd.takeSample(false,2,20)
res21: Array[Int] = Array(3, 3)

reduce(func) :並行整合RDD中的所有數據(最常用的)

scala> rdd.reduce((x,y) => x + y)
res22: Int = 9

aggregate(zeroValue)(seqOp,combOp) :先使用seqOp將RDD中每個分區中的T類型元素聚合成U類型,再使用combOp將之前每個分區聚合后的U類型聚合成U類型, 特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的類型為U

scala> rdd.aggregate((0,0))((x, y) => (x._1 + y, x._2 +1), (x,y) => (x._1 + y._1, x._2 + y._2))
res24: (Int, Int) = (9,4)

fold(zero)(func) :將aggregate中的seqOp和combOp使用同一個函數op

scala> rdd.fold(0)((x, y) => x + y)
res25: Int = 9

foreach(func):對每個元素使用func

scala> rdd.foreach(x => println(x*2))
4
6
6
2

 

持久化:

因為RDD是惰性求值的,有時我們需要多次使用同一個RDD,為了避免多次計算同一個RDD,可以讓Spark對數據進行持久化。

用persist()把數據以序列化的形式緩存到JVM的堆空間中(先序列化再做其他操作)

持久化級別

org.apache.spark.storage.StorageLevel中的持久化級別,想保存兩份持久化數據的話,在持久化級別末尾加上"_2"

如果緩存的數據太多,內存不足的話,spark會利用LRU策略把老的分區從內存中移除,對於使用存放在內存中的緩存級別,下次再使用這些數據需要從新計算,
對於使用內存和磁盤的緩存級別,移除的分區會寫入磁盤。

scala> val result = rdd.map(x => x * x)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at map at <console>:29

scala> result.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
res30: result.type = MapPartitionsRDD[36] at map at <console>:29

scala> println(result.count())
4

scala> println(result.collect())
[I@381d7867

scala> println(result.collect().mkString(","))
1,4,9,9

  

最后,用unpersist()手動把持久化的RDD從緩存中移除。

 參照於《Spark快速大數據分析》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM