一句話說,在Spark中對數據的操作其實就是對RDD的操作,而對RDD的操作不外乎創建、轉換、調用求值。
什么是RDD
RDD(Resilient Distributed Dataset),彈性分布式數據集。
它定義了如何在集群的每個節點上操作數據的一系列命令,而不是指真實的數據,Spark通過RDD可以對每個節點的多個分區進行並行的數據操作。
之所以稱彈性,是因為其有高容錯性。默認情況下,Spark會在每一次行動操作后進行RDD重計算,如想在多個行動操作中使用RDD,可以將其緩存(以分區的方式持久化)到集群中每台機器的內存或者磁盤中。當一台機器失效無法讀取RDD數據時,可通過此特性重算丟掉的分區,從而恢復數據,此過程對用戶透明。
如何創建RDD
可通過以下幾種方式創建RDD:
- 通過讀取外部數據集 (本地文件系統/HDFS/...)
- 通過讀取集合對象 (List/Set/...)
- 通過已有的RDD生成新的RDD
Spark對RDD操作方式
Spark對RDD的操作分兩種,即轉換操作(Transformation)和行動操作(Action)。
-
- 轉換操作:不觸發實際計算,返回一個新的RDD,例如對數據的匹配操作map和過濾操作filter,惰性求值。
- 行動操作:會觸發實際計算,會向驅動器返回結果或將結果寫到外部系統。
如何區別兩種操作?
看返回值類型,返回RDD類型的為轉換操作,返回其他數據類型的是行動操作。
何為惰性求值?
Spark在執行轉換操作時不會觸發實際的計算,而等到執行行動操作時才會實際計算。
為何會有惰性求值?
我們應把RDD看做是Spark通過轉換操作后構建出來的一套定義如何計算數據的指令列表,而非存放着數據的數據集。
如果每經過一次轉換操作都觸發計算,將會有系統負擔,而惰性求值會將多個轉換操作合並到一起,抵消不必要的步驟后,在最后必要的時才進行運算,獲得性能的提升同時又減輕系統運算負擔。如涉及多次轉換操作時情景需求如下,我想找 轉換1:深圳市的人 > 轉換2:南山區的人> 轉換3:騰訊大廈的人 ==惰性求值、合並操作==>騰訊大廈的人。
轉換操作
1. 基本轉換操作,以{1,2,3,3}為例,f代表函數
函數名 | 目的 | 示例 | 結果 |
map(f) | 將函數應用於每一個元素中,返回值構成新的RDD | rdd.map(x=>x+1) | {2,3,4,4} |
flatMap(f) | 將函數應用於每一個元素中,並把元素中迭代器內所有內容一並生成新的RDD,常用於切分單詞 | rdd.flatMap(x=>x.to(3)) | {1,2,3,,2,3,3,3} |
filter(f) | 過濾元素 | rdd.filter(x=>x!=1) | {2,3,3} |
distinct() | 元素去重 | rdd.distinct() | {1,2,3} |
sample( withReplacement, fraction , [seed] ) | 元素采樣,以及是否需要替換 | rdd.sample(false,0.5) | 不確定值,不確定數目 |
2. 集合轉換操作,以{1,2,3}{3,4,5}為例,rdd代表已生成的RDD實例
函數名 | 目的 | 示例 | 結果 |
union(rdd) | 合並兩個RDD所有元素(不去重) | rdd1.union(rdd2) | {1,2,3,3,4,5} |
intersection(rdd) | 求兩個RDD的交集 | rdd1.intersection(rdd2) | {3} |
substract(rdd) | 移除在RDD2中存在的RDD1元素 | rdd1.substract(rdd2) | {1,2} |
cartesian(rdd) | 求兩個RDD的笛卡爾積 | rdd1.cartesian(rdd2) | {(1,3),(1,4),(1,5)...(3,5)} |
行動操作
基本行動操作,以{1,2,3,3}為例,f代表函數
函數名 | 目的 | 示例 | 結果 |
collect() | 收集並返回RDD中所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD中元素的個數 | rdd.count() | 4 |
countByValue() | 各元素出現的個數 | rdd.countByValue() | {(1,1),(2,1),(3,2)} |
take(num)* | 從RDD中返回num個元素 | rdd.take(2) | {1,2} |
top(num)* | 返回最前面的num個元素 | rdd.take(2) | {3,3} |
takeOrdered(num,[ordering])* | 按提供的順序返回前num個元素 | rdd.takeOrdered(2,[myOrdering]) | {3,3} |
takeSample(withReplacement, num ,[seed]) | 返回任意元素 | takeSample(false,1) | 不確定值 |
reduce(f) | 並行整合RDD中所有元素,返回一個同一類型元素 | rdd.reduce((x,y) => x+y ) | 9 |
fold(zeroValue)(f)* | 與reduce一樣,不過需要提供初始值 | rdd.fold(0)((x,y) => x+y ) | 9 |
aggregate(zeroValue)(seqOp , combOp)* | 與reduce相似,不過返回不同類型的元素 | rdd. aggregate(( 0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) |
{9,4} |
foreach(f) | 給每個元素使用給定的函數,結果不需發回本地 | rdd.foreach(f) | 無 |
*后面有詳解
q1: take()、top()和takeOrdered() 的區別,順序在其中如何理解
take(): 用於獲取RDD中從0到num-1下標的元素,不排序。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 scala> rdd1.take(1) res0: Array[Int] = Array(10) scala> rdd1.take(2) res1: Array[Int] = Array(10, 4)
top():用於從RDD中,按照默認(降序)或者指定的排序規則,返回前num個元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 scala> rdd1.top(1) res2: Array[Int] = Array(12) scala> rdd1.top(2) res3: Array[Int] = Array(12, 10) //指定排序規則 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef scala> rdd1.top(1) res4: Array[Int] = Array(2) scala> rdd1.top(2) res5: Array[Int] = Array(2, 3)
takeOrdered():按自然順序輸出
scala> val rdd = sc.makeRDD(Seq(3,2,5,1,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 scala> val result = rdd.takeOrdered(2) result: Array[Int] = Array(1, 2)
q2: fold()詳解
fold(zeroValue)(fun), 使用zeroValue和每個分區的元素進行聚合運算,最后各分區結果和zeroValue再進行一次聚合運算。
object LFold { def main(args:Array[String]) { val conf = new SparkConf ().setMaster ("local").setAppName ("app_1") val sc = new SparkContext (conf) val listRDD = sc.parallelize(List(1,2,3,4,5),1) val sum= listRDD.fold(3)((x,y)=>printLn(x,y)) println("Sum -> "+sum) } def printLn(param : (Int,Int) ): Int = { println("=============="+param.toString()+"==============") val ret : Int = param._1+param._2 ret } }
運行結果:
==============(3,1)==============
==============(4,2)==============
==============(6,3)==============
==============(9,4)==============
==============(13,5)==============
==============(3,18)==============
Sum -> 21
解析:
a. 第一次執行相加時,此時無匯總值,所以取默認值3作補充加法。(3,1)
b. 隨后逐個元素相加,至最后一個元素5。(13,5)
c. 匯總相加所有的值,此時無匯總值,所以取默認值3作補充加法。(3,18)
d. 最后相加 3+1+2+3+4+5+3 = 21
為方便理解再舉例子:
val listRDD = sc.parallelize(List(1,2,3,4,5),2) val sum= listRDD.fold(3)((x,y)=> x + y )
2個分區,zeroValue為3 ,經過3次聚合操作,結果應為24 ,詳細分析如圖
q3: aggregate()詳解
語法格式: aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
與fold相近,aggregate函數將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。
參數解析:
@param zeroValue the initial value for the accumulated result of each partition for the
`seqOp` operator, and also the initial value for the combine results from
different partitions for the `combOp` operator - this will typically be the
neutral element (e.g. `Nil` for list concatenation or `0` for summation)
//zeroValue是為seqOp函數定義的每個分區計算結果的初始值,也是為combOp函數定義的不同分區的聚合值的初始值。
//--這通常是一個典型的中間元素(如:'Nil')代表字符串拼接操作,'0'代表求和操作。
@param seqOp an operator used to accumulate results within a partition
//用於計算毎個分區的元素聚合的結果@param combOp an associative operator used to combine results from different partitions
//用於計算不同分區聚合的結果
求{1,2,3,3}的平均值:
val rdd= sc.parallelize(List(1,2,3,3,4,5),2)
val result = rdd. aggregate((1, 0)) ( (x, y) => (x._1 + y, x._2 + 1), // 單個分區(單個分區元素相加總數,單個分區元素個數相加) (x, y) => (x._1 + y._1, x._2 + y._2) // 不同分區(所有分區元素總數相加,所有分區元素個數相加) ) val avg = result._1 / result._2. toDouble //avg = 3.5
解析:
元組1,求出單個分區里的元素聚合的總和以及元素個數
元組2,把不同分區里的元素聚合的總和以及元素個數進行最后的聚合
分析如圖
RDD持久化
持久化即以序列化的形式緩存。
如上所述,RDD轉換操作會惰性求值,如果多次訪問同一個RDD調用行動操作,Spark每次都要重算RDD,消耗極大。
為了避免多次計算同一個RDD,可以對數據進行持久化。
出於不同目的和場景需求,我們可選擇的持久化級別有:
級別 | 使用空間 | CPU時間 | 是否在內存中 | 是否在磁盤上 |
MEMORY_ONLY | 高 | 低 | 是 | 否 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 |
MEMORY_AND_DISK | 高 | 中 | 部分 | 部分 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 |
DISK_ONLY | 低 | 高 | 否 | 是 |
在Scala中的調用方法為
val result = input. map( x => x * x) result. persist( StorageLevel.DISK_ ONLY) println( result. count()) println( result. collect(). mkString(","))
如果要緩存的數據太多,內存放不下,Spark會自動使用LRU(最近最小使用)的緩存策略把最老的分區從內存中移除。
最后,可調用rdd.unpersist()方法手動移除RDD緩存。
詳細內容參考《Spark快速大數據分析第三章》