RDD,彈性分布式數據集,即分布式的元素集合。在spark中,對所有數據的操作不外乎是創建RDD、轉化已有的RDD以及調用RDD操作進行求值。在這一切的背后,Spark會自動將RDD中的數據分發到集群中,並將操作並行化。
Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。RDD可以包含Python,Java,Scala中任意類型的對象,甚至可以包含用戶自定義的對象。
用戶可以使用兩種方法創建RDD:
讀取一個外部數據集,或在驅動器程序中分發驅動器程序中的對象集合,比如list或者set。
RDD的轉化操作都是惰性求值的,這意味着我們對RDD調用轉化操作,操作不會立即執行。相反,Spark會在內部記錄下所要求執行的操作的相關信息。我們不應該把RDD看做存放着特定數據的數據集,而最好把每個RDD當做我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。數據讀取到RDD中的操作也是惰性的,數據只會在必要時讀取。轉化操作和讀取操作都有可能多次執行。
2.創建RDD數據集
(1)讀取一個外部數據集
val input=sc.textFile(inputFileDir)
(2)分發對象集合,這里以list為例
val lines =sc.parallelize(List("hello world","this is a test"));
3.RDD操作
(1)轉化操作
實現過濾器轉化操作:
val lines =sc.parallelize(List("error:a","error:b","error:c","test"));
val errors=lines.filter(line => line.contains("error"));
errors.collect().foreach(println);
輸出:
error:a
error:b
error:c
可見,列表list中包含詞語error的表項都被正確的過濾出來了。
(2)合並操作
將兩個RDD數據集合並為一個RDD數據集
接上述程序示例:
val lines =sc.parallelize(List("error:a","error:b","error:c","test","warnings:a"));
val errors=lines.filter(line => line.contains("error"));
val warnings =lines.filter(line => line.contains("warnings"));
val unionLines =errors.union(warnings);
unionLines.collect().foreach(println);
輸出:
error:a
error:b
error:c
warning:a
可見,將原始列表項中的所有error項和warning項都過濾出來了。
(3)獲取RDD數據集中的部分或者全部元素
①獲取RDD數據集中的部分元素 .take(int num) 返回值List<T>
獲取RDD數據集中的前num項。
/**程序示例:接上
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]
unionLines.take(2).foreach(println);
輸出:
error:a
error:b
可見,輸出了RDD數據集unionLines的前2項
②獲取RDD數據集中的全部元素 .collect() 返回值 List<T>
程序示例:
val all =unionLines.collect();
all.foreach(println);
遍歷輸出RDD數據集unionLines的每一項
4.向spark傳遞函數
在scala中,我們可以把定義的內聯函數、方法的引用或靜態方法傳遞給Spark,就像Scala的其他函數式API一樣。我們還要考慮其他一些細節,必須所傳遞的函數及其引用的數據需要是可序列化的(實現了Java的Serializable接口)。除此之外,與Python類似,傳遞一個對象的方法或者字段時,會包含對整個對象的引用。我們可以把需要的字段放在一個局部變量中,來避免包含該字段的整個對象。
class searchFunctions (val query:String){
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchFunctionReference(rdd: RDD[String]) :RDD[String]={
//問題: isMach表示 this.isMatch ,因此我們需要傳遞整個this
rdd.filter(isMatch)
}
def getMatchesFunctionReference(rdd: RDD[String]) :RDD[String] ={
//問題: query表示 this.query ,因此我們需要傳遞整個this
rdd.flatMap(line => line.split(query))
}
def getMatchesNoReference(rdd:RDD[String]):RDD[String] ={
//安全,只把我們需要的字段拿出來放入局部變量之中
val query1=this.query;
rdd.flatMap(x =>x.split(query1)
)
}
}
5.針對每個元素的轉化操作:
轉化操作map()接收一個函數,把這個函數用於RDD中的每個元素,將函數的返回結果作為結果RDD中對應的元素。關鍵詞:
轉化
轉化操作filter()接受一個函數,並將RDD中滿足該函數的元素放入新的RDD中返回。關鍵詞:
過濾
示例圖如下所示:

①map()
計算RDD中各值的平方
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.map(value => value*value);
println(result.collect().mkString(","));
輸出:
1,4,9,16
filter()
② 去除RDD集合中值為1的元素:
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.filter(value => value!=1);
println(result.collect().mkString(","));
結果:
2,3,4
我們也可以采取傳遞函數的方式,就像這樣:
函數:
def filterFunction(value:Int):Boolean ={
value!=1
}
使用:
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.filter(filterFunction);
println(result.collect().mkString(","));
③ 有時候,我們希望對每個輸入元素生成多個輸出元素。實現該功能的操作叫做flatMap()。和map()類似,我們提供給flatMap()的函數被分別應用到了輸入的RDD的每個元素上。不過返回的不是一個元素,而是一個返回值序列的迭代器。輸出的RDD倒不是由迭代器組成的。我們得到的是一個包含各個迭代器可以訪問的所有元素的RDD。flatMap()的一個簡單用途是將輸入的字符串切分成單詞,如下所示:
val rdd=sc.parallelize(List("Hello world","hello you","world i love you"));
val result=rdd.flatMap(line => line.split(" "));
println(result.collect().mkString("\n"));
輸出:
hello
world
hello
you
world
i
love
you
6.集合操作

RDD中的集合操作
函數
|
用途
|
RDD1.distinct()
|
生成一個只包含不同元素的新RDD。需要數據混洗。 |
RDD1.union(RDD2)
|
返回一個包含兩個RDD中所有元素的RDD |
RDD1.intersection(RDD2)
|
只返回兩個RDD中都有的元素 |
RDD1.substr(RDD2)
|
返回一個只存在於第一個RDD而不存在於第二個RDD中的所有元素組成的RDD。需要數據混洗。 |
集合操作對笛卡爾集的處理:

RDD1.cartesian(RDD2)
|
返回兩個RDD數據集的笛卡爾集 |
程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集
val rdd1=sc.parallelize(List(1,2));
val rdd2=sc.parallelize(List(1,2));
val rdd=rdd1.cartesian(rdd2);
println(rdd.collect().mkString("\n"));
輸出:
(1,1)
(1,2)
(2,1)
(2,2)
7.行動操作
(1)reduce操作
reduce()接收一個函數作為參數,這個函數要操作兩個RDD的元素類型的數據並返回一個同樣類型的新元素。一個簡單的例子就是函數+,可以用它來對我們的RDD進行累加。使用reduce(),可以很方便地計算出RDD中所有元素的總和,元素的個數,以及其他類型的聚合操作。
以下是求RDD數據集所有元素和的程序示例:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.reduce((x,y) =>x+y);
println(results);
輸出:55
(2)fold()操作
接收一個與reduce()接收的函數簽名相同的函數,再加上一個初始值來作為每個分區第一次調用時的結果。你所提供的初始值應當是你提供的操作的單位元素,也就是說,使用你的函數對這個初始值進行多次計算不會改變結果(例如+對應的0,*對應的1,或者拼接操作對應的空列表)。
程序實例:
①計算RDD數據集中所有元素的和:
zeroValue=0;//求和時,初始值為0。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(0)((x,y) =>x+y);
println(results);
②計算RDD數據集中所有元素的積:
zeroValue=1;//求積時,初始值為1。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(1)((x,y) =>x*y);
println(results);
(3)aggregate()操作
aggregate()函數返回值類型
不必與所操作的RDD類型相同。
與fold()類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。然后通過一個函數把RDD中的元素合並起來放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合並。
以下是程序實例:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val result=rdd.aggregate((0,0))(
(acc,value) =>(acc._1+value,acc._2+1),
(acc1,acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)
)
val average=result._1/result._2;
println(average)
輸出:5
最終返回的是一個Tuple2<int,int>對象, 他被初始化為(0,0),當遇到一個int值時,將該int數的值加到Tuple2對象的_1中,並將_2值加1,如果遇到一個Tuple2對象時,將這個Tuple2的_1和_2的值歸並到最終返回的Tuple2值中去。
表格:對一個數據為{1,2,3,3}的RDD進行基本的RDD行動操作
函數名 | 目的 | 示例 | 結果 |
collect() | 返回RDD的所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD的元素個數 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出現的次數 | rdd.countByValue() | {(1,1), (2,1), (3,2) } |
take(num) | 從RDD中返回num個元素 | rdd.take(2) | {1,2} |
top(num) | 從RDD中返回最前面的num個元素 | rdd.takeOrdered(2)(myOrdering) | {3,3} |
takeOrdered(num) (ordering) |
從RDD中按照提供的順序返回最前面的num個元素 |
rdd.takeSample(false,1) | 非確定的 |
takeSample(withReplacement,num,[seed]) | 從RDD中返回任意一些元素 | rdd.takeSample(false,1) | 非確定的 |
reduce(func) | 並行整合RDD中所有數據 | rdd.reduce((x,y) => x+y) |
9 |
fold(zero)(func) | 和reduce()一樣,但是需要提供初始值 | rdd.fold(0)((x,y) => x+y) |
9 |
aggregate(zeroValue)(seqOp,combOp) | 和reduce()相似,但是通常返回不同類型的函數 | rdd.aggregate((0,0)) ((x,y) => (x._1+y,x._2+1), (x,y)=> (x._1+y._1,x._2+y._2) ) |
(9,4) |
foreach(func) | 對RDD中的每個元素使用給定的函數 | rdd.foreach(func) | 無 |
8.持久化緩存
因為Spark RDD是惰性求值的,而有時我們希望能多次使用同一個RDD。
如果簡單地對RDD調用行動操作,Spark每次都會重算RDD以及它的所有依賴。這在迭代算法中消耗格外大,因為迭代算法常常會多次使用同一組數據。
為了避免多次計算同一個RDD,可以讓Spark對數據進行持久化。當我們讓Spark持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。
出於不同的目的,我們可以為RDD選擇不同的持久化級別。默認情況下persist()會把數據以序列化的形式緩存在JVM的堆空間中
不同關鍵字對應的存儲級別表
級別 |
使用的空間
|
cpu時間
|
是否在內存
|
是否在磁盤
|
備注
|
MEMORY_ONLY
|
高 |
低
|
是
|
否
|
直接儲存在內存 |
MEMORY_ONLY_SER |
低
|
高
|
是
|
否
|
序列化后儲存在內存里
|
MEMORY_AND_DISK
|
低 |
中等
|
部分
|
部分
|
如果數據在內存中放不下,溢寫在磁盤上 |
MEMORY_AND_DISK_SER
|
低 |
高
|
部分
|
部分
|
數據在內存中放不下,溢寫在磁盤中。內存中存放序列化的數據。 |
DISK_ONLY
|
低
|
高
|
否
|
是
|
直接儲存在硬盤里面
|
程序示例:
將RDD數據集持久化在內存中。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).persist(StorageLevel.MEMORY_ONLY);
println(rdd.count())
println(rdd.collect().mkString(","));
RDD還有unpersist()方法,調用該方法可以手動把持久化的RDD從緩存中移除。
9.不同的RDD類型
在scala中,將RDD轉為由特定函數的RDD(比如在RDD[Double]上進行數值操作),是由隱式轉換來自動處理的。這些隱式轉換可以隱式地將一個RDD轉為各種封裝類,比如DoubleRDDFunctions(數值數據的RDD)和PairRDDFunctions(鍵值對RDD),這樣我們就有了諸如mean()和variance()之類的額外的函數。
示例程序:
val rdd=sc.parallelize(List(1.0,2.0,3.0,4.0,5.0));
println(rdd.mean());
其實RDD[T]中並沒有mean()函數,只是隱式轉換自動將其轉換為DoubleRDDFunctions。