Spark RDD編程核心


一句話說,在Spark中對數據的操作其實就是對RDD的操作,而對RDD的操作不外乎創建、轉換、調用求值。

什么是RDD

 

  RDD(Resilient Distributed Dataset),彈性分布式數據集。

  它定義了如何在集群的每個節點上操作數據的一系列命令,而不是指真實的數據,Spark通過RDD可以對每個節點的多個分區進行並行的數據操作。

  之所以稱彈性,是因為其有高容錯性。默認情況下,Spark會在每一次行動操作后進行RDD重計算,如想在多個行動操作中使用RDD,可以將其緩存(以分區的方式持久化)到集群中每台機器的內存或者磁盤中。當一台機器失效無法讀取RDD數據時,可通過此特性重算丟掉的分區,從而恢復數據,此過程對用戶透明。

  

如何創建RDD

 

  可通過以下幾種方式創建RDD:

  • 通過讀取外部數據集 (本地文件系統/HDFS/...)
  • 通過讀取集合對象    (List/Set/...)
  • 通過已有的RDD生成新的RDD    

 

 

Spark對RDD操作方式

  Spark對RDD的操作分兩種,即轉換操作(Transformation)和行動操作(Action)。

    • 轉換操作:不觸發實際計算,返回一個新的RDD,例如對數據的匹配操作map和過濾操作filter,惰性求值
    • 行動操作:會觸發實際計算,會向驅動器返回結果或將結果寫到外部系統。

  如何區別兩種操作?

    看返回值類型,返回RDD類型的為轉換操作,返回其他數據類型的是行動操作。

  何為惰性求值?

    Spark在執行轉換操作時不會觸發實際的計算,而等到執行行動操作時才會實際計算。

  為何會有惰性求值?

    我們應把RDD看做是Spark通過轉換操作后構建出來的一套定義如何計算數據的指令列表,而非存放着數據的數據集。

    如果每經過一次轉換操作都觸發計算,將會有系統負擔,而惰性求值會將多個轉換操作合並到一起,抵消不必要的步驟后,在最后必要的時才進行運算,獲得性能的提升同時又減輕系統運算負擔。如涉及多次轉換操作時情景需求如下,我想找  轉換1:深圳市的人  > 轉換2:南山區的人> 轉換3:騰訊大廈的人    ==惰性求值、合並操作==>騰訊大廈的人。 

 

轉換操作

  1. 基本轉換操作,以{1,2,3,3}為例,f代表函數

函數名 目的 示例 結果
map(f) 將函數應用於每一個元素中,返回值構成新的RDD rdd.map(x=>x+1) {2,3,4,4}
flatMap(f) 將函數應用於每一個元素中,並把元素中迭代器內所有內容一並生成新的RDD,常用於切分單詞 rdd.flatMap(x=>x.to(3)) {1,2,3,,2,3,3,3}
filter(f) 過濾元素 rdd.filter(x=>x!=1)  {2,3,3}
distinct() 元素去重 rdd.distinct()  {1,2,3}
sample( withReplacement, fraction , [seed] ) 元素采樣,以及是否需要替換 rdd.sample(false,0.5)  不確定值,不確定數目

  2. 集合轉換操作,以{1,2,3}{3,4,5}為例,rdd代表已生成的RDD實例

函數名 目的 示例 結果
union(rdd) 合並兩個RDD所有元素(不去重) rdd1.union(rdd2) {1,2,3,3,4,5}
intersection(rdd) 求兩個RDD的交集 rdd1.intersection(rdd2) {3}
substract(rdd) 移除在RDD2中存在的RDD1元素 rdd1.substract(rdd2) {1,2}
cartesian(rdd) 求兩個RDD的笛卡爾積 rdd1.cartesian(rdd2) {(1,3),(1,4),(1,5)...(3,5)}

 

行動操作

   基本行動操作,以{1,2,3,3}為例,f代表函數

函數名 目的 示例 結果
collect() 收集並返回RDD中所有元素 rdd.collect() {1,2,3,3}
count() RDD中元素的個數 rdd.count() 4
countByValue() 各元素出現的個數 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 從RDD中返回num個元素 rdd.take(2) {1,2}
top(num) 返回最前面的num個元素 rdd.take(2) {3,3}
takeOrdered(num,[ordering]) 按提供的順序返回前num個元素 rdd.takeOrdered(2,[myOrdering]) {3,3}
takeSample(withReplacement, num ,[seed]) 返回任意元素 takeSample(false,1) 不確定值
reduce(f) 並行整合RDD中所有元素,返回一個同一類型元素 rdd.reduce((x,y) => x+y ) 9
fold(zeroValue)(f) 與reduce一樣,不過需要提供初始值 rdd.fold(0)((x,y) => x+y ) 9
aggregate(zeroValue)(seqOp , combOp) 與reduce相似,不過返回不同類型的元素

rdd. aggregate(( 0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))

{9,4}
foreach(f) 給每個元素使用給定的函數,結果不需發回本地 rdd.foreach(f)

 *后面有詳解

 

q1: take()、top()和takeOrdered() 的區別,順序在其中如何理解

take(): 用於獲取RDD中從0到num-1下標的元素,不排序。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 
scala> rdd1.take(1)
res0: Array[Int] = Array(10)                                                    
 
scala> rdd1.take(2)
res1: Array[Int] = Array(10, 4)

top():用於從RDD中,按照默認(降序)或者指定的排序規則,返回前num個元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 
scala> rdd1.top(1)
res2: Array[Int] = Array(12)
 
scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)
 
//指定排序規則
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef
 
scala> rdd1.top(1)
res4: Array[Int] = Array(2)
 
scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)

takeOrdered():按自然順序輸出

scala> val rdd = sc.makeRDD(Seq(3,2,5,1,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val result = rdd.takeOrdered(2)
result: Array[Int] = Array(1, 2)

 

q2: fold()詳解

fold(zeroValue)(fun), 使用zeroValue和每個分區的元素進行聚合運算,最后各分區結果和zeroValue再進行一次聚合運算。

object LFold {
  def main(args:Array[String]) {
    val conf = new SparkConf ().setMaster ("local").setAppName ("app_1")
    val sc = new SparkContext (conf)
    val listRDD = sc.parallelize(List(1,2,3,4,5),1)
    val sum= listRDD.fold(3)((x,y)=>printLn(x,y))
    println("Sum -> "+sum)

  }
  def printLn(param : (Int,Int) ): Int = {
    println("=============="+param.toString()+"==============")
    val ret : Int = param._1+param._2
    ret
  }
}

運行結果:

==============(3,1)==============
==============(4,2)==============
==============(6,3)==============
==============(9,4)==============
==============(13,5)==============

==============(3,18)==============

Sum -> 21

解析:

a. 第一次執行相加時,此時無匯總值,所以取默認值3作補充加法。(3,1)

b. 隨后逐個元素相加,至最后一個元素5。(13,5)

c. 匯總相加所有的值,此時無匯總值,所以取默認值3作補充加法。(3,18)

d. 最后相加 3+1+2+3+4+5+3 = 21

 

為方便理解再舉例子:

val listRDD = sc.parallelize(List(1,2,3,4,5),2)
val sum= listRDD.fold(3)((x,y)=> x + y ) 

2個分區,zeroValue為3 ,經過3次聚合操作,結果應為24 ,詳細分析如圖

 

 

q3: aggregate()詳解

語法格式: aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 

與fold相近,aggregate函數將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。

參數解析:

@param zeroValue the initial value for the accumulated result of each partition for the
`seqOp` operator, and also the initial value for the combine results from
different partitions for the `combOp` operator - this will typically be the
neutral element (e.g. `Nil` for list concatenation or `0` for summation)
//zeroValue是為seqOp函數定義的每個分區計算結果的初始值,也是為combOp函數定義的不同分區的聚合值的初始值。
//--這通常是一個典型的中間元素(如:'Nil')代表字符串拼接操作,'0'代表求和操作。

@param seqOp an operator used to accumulate results within a partition
//用於計算毎個分區的元素聚合的結果
@param combOp an associative operator used to combine results from different partitions
//用於計算不同分區聚合的結果

 求{1,2,3,3}的平均值:

val rdd= sc.parallelize(List(1,2,3,3,4,5),2)
val result = rdd. aggregate((1, 0)) (
   (x, y) => (x._1 + y, x._2 + 1),      // 單個分區(單個分區元素相加總數,單個分區元素個數相加)
   (x, y) => (x._1 + y._1, x._2 + y._2) // 不同分區(所有分區元素總數相加,所有分區元素個數相加)
)
val avg = result._1 / result._2. toDouble  //avg = 3.5

解析:

元組1,求出單個分區里的元素聚合的總和以及元素個數

元組2,把不同分區里的元素聚合的總和以及元素個數進行最后的聚合

分析如圖

 

RDD持久化

  持久化即以序列化的形式緩存。

  如上所述,RDD轉換操作會惰性求值,如果多次訪問同一個RDD調用行動操作,Spark每次都要重算RDD,消耗極大。

  為了避免多次計算同一個RDD,可以對數據進行持久化。  

  出於不同目的和場景需求,我們可選擇的持久化級別有:

級別 使用空間 CPU時間 是否在內存中 是否在磁盤上
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 部分 部分
MEMORY_AND_DISK_SER 部分 部分
DISK_ONLY

  在Scala中的調用方法為

val result = input. map( x => x * x) 
result. persist( StorageLevel.DISK_ ONLY) 
println( result. count()) 
println( result. collect(). mkString(","))

  如果要緩存的數據太多,內存放不下,Spark會自動使用LRU(最近最小使用)的緩存策略把最老的分區從內存中移除。

  最后,可調用rdd.unpersist()方法手動移除RDD緩存。 

 

  詳細內容參考《Spark快速大數據分析第三章》

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM