介紹:
RDD--Resilient Distributed Dataset
Spark中RDD是一個不可變的分布式對象集合。每個RDD被分為多個分區,這些分區運行在集群的不同的節點上。
RDD可以包含Python、Java、Scala中的任意類型的對象,以及自定義的對象。
創建RDD的兩種方法:
1 讀取一個數據集(SparkContext.textFile()) : lines = sc.textFile("README.md")
2 讀取一個集合(SparkContext.parallelize()) : lines = sc.paralelize(List("pandas","i like pandas"))
RDD的兩種操作:
1 轉化操作(transformation) : 由一個RDD生成一個新的RDD
2 行動操作(action) : 對RDD中的元素進行計算,並把結果返回
RDD的惰性計算:
可以在任何時候定義新的RDD,但Spark會惰性計算這些RDD。它們只有在第一次行動操作中用到的時候才會真正的計算。
此時也不是把所有的計算都完成,而是進行到滿足行動操作的行為為止。
lines.first() : Spark只會計算RDD的第一個元素的值
常見的轉化操作:
對一個RDD的轉化操作:
原始RDD:
scala> val rdd = sc.parallelize(List(1,2,3,3)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
map() : 對每個元素進行操作,返回一個新的RDD
scala> rdd.map(x => x +1 ).collect() res0: Array[Int] = Array(2, 3, 4, 4)
flatMap() : 對每個元素進行操作,將返回的迭代器的所有元素組成一個新的RDD返回
scala> rdd.flatMap(x => x.to(3)).collect() res2: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
filter() : 最每個元素進行篩選,返回符合條件的元素組成的一個新RDD
scala> rdd.filter(x => x != 1).collect() res3: Array[Int] = Array(2, 3, 3)
distinct() : 去掉重復元素
scala> rdd.distinct().collect() res5: Array[Int] = Array(1, 2, 3)
sample(withReplacement,fration,[seed]) : 對RDD采樣,以及是否去重
第一個參數如果為true,可能會有重復的元素,如果為false,不會有重復的元素;
第二個參數取值為[0,1],最后的數據個數大約等於第二個參數乘總數;
第三個參數為隨機因子。
scala> rdd.sample(false,0.5).collect() res7: Array[Int] = Array(3, 3) scala> rdd.sample(false,0.5).collect() res8: Array[Int] = Array(1, 2) scala> rdd.sample(false,0.5,10).collect() res9: Array[Int] = Array(2, 3)
對兩個RDD的轉化操作:
原始RDD:
scala> val rdd1 = sc.parallelize(List(1,2,3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:27 scala> val rdd2 = sc.parallelize(List(3,4,5)) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
union() :合並,不去重
scala> rdd1.union(rdd2).collect() res10: Array[Int] = Array(1, 2, 3, 3, 4, 5)
intersection() :交集
scala> rdd1.intersection(rdd2).collect() res11: Array[Int] = Array(3)
subtract() : 移除相同的內容
scala> rdd1.subtract(rdd2).collect() res12: Array[Int] = Array(1, 2)
cartesian() :笛卡兒積
scala> rdd1.cartesian(rdd2).collect() res13: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
常見的行動操作:
原始RDD:
scala> val rdd = sc.parallelize(List(1,2,3,3)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
collect() :返回所有元素
scala> rdd.collect() res14: Array[Int] = Array(1, 2, 3, 3)
count() :返回元素個數
scala> rdd.count() res15: Long = 4
countByValue() : 各個元素出現的次數
scala> rdd.countByValue() res16: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
take(num) : 返回num個元素
scala> rdd.take(2) res17: Array[Int] = Array(1, 2)
top(num) : 返回前num個元素
scala> rdd.top(2) res18: Array[Int] = Array(3, 3)
takeOrdered(num)[(ordering)] :按提供的順序,返回最前面的num個元素(需要好好再研究一下)
scala> rdd.takeOrdered(2) res28: Array[Int] = Array(1, 2) scala> rdd.takeOrdered(3) res29: Array[Int] = Array(1, 2, 3)
takeSample(withReplacement,num,[seed]) :采樣
scala> rdd.takeSample(false,1) res19: Array[Int] = Array(2) scala> rdd.takeSample(false,2) res20: Array[Int] = Array(2, 3) scala> rdd.takeSample(false,2,20) res21: Array[Int] = Array(3, 3)
reduce(func) :並行整合RDD中的所有數據(最常用的)
scala> rdd.reduce((x,y) => x + y) res22: Int = 9
aggregate(zeroValue)(seqOp,combOp) :先使用seqOp將RDD中每個分區中的T類型元素聚合成U類型,再使用combOp將之前每個分區聚合后的U類型聚合成U類型, 特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的類型為U
scala> rdd.aggregate((0,0))((x, y) => (x._1 + y, x._2 +1), (x,y) => (x._1 + y._1, x._2 + y._2)) res24: (Int, Int) = (9,4)
fold(zero)(func) :將aggregate中的seqOp和combOp使用同一個函數op
scala> rdd.fold(0)((x, y) => x + y) res25: Int = 9
foreach(func):對每個元素使用func
scala> rdd.foreach(x => println(x*2)) 4 6 6 2
持久化:
因為RDD是惰性求值的,有時我們需要多次使用同一個RDD,為了避免多次計算同一個RDD,可以讓Spark對數據進行持久化。
用persist()把數據以序列化的形式緩存到JVM的堆空間中(先序列化再做其他操作)
持久化級別
org.apache.spark.storage.StorageLevel中的持久化級別,想保存兩份持久化數據的話,在持久化級別末尾加上"_2"

如果緩存的數據太多,內存不足的話,spark會利用LRU策略把老的分區從內存中移除,對於使用存放在內存中的緩存級別,下次再使用這些數據需要從新計算,
對於使用內存和磁盤的緩存級別,移除的分區會寫入磁盤。
scala> val result = rdd.map(x => x * x)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at map at <console>:29
scala> result.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
res30: result.type = MapPartitionsRDD[36] at map at <console>:29
scala> println(result.count())
4
scala> println(result.collect())
[I@381d7867
scala> println(result.collect().mkString(","))
1,4,9,9
最后,用unpersist()手動把持久化的RDD從緩存中移除。
參照於《Spark快速大數據分析》
