在從WordCount看Spark大數據處理的核心機制(2)中我們看到Spark為了支持迭代和交互式數據挖掘,而明確提出了內存中可重用的數據集RDD。RDD的只讀特性,再加上粗粒度轉換操作形成的Lineage,形成了它獨立的高效容錯機制。
RDD的粗粒度的轉換是否有足夠的表達能力,來支持多種多樣的應用需求呢?先看看RDD究竟有哪些API,然后看它們如何模擬Google經典的MapReduce和圖數據處理框架Pregel。
RDD的API
轉換
def map[U](f: T => U): RDD[U]
將RDD[T]經過f轉換成RDD[U],T和U一一映射,兩個RDD元素個數相等
def flatMap[U](f: T => TraversableOnce[U]): RDD[U]
將RDD[T]經過f閉包轉換成RDD[U],一個T可以映射成0到多個U,兩個RDD元素通常不等
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U]): RDD[U]
mapPartitions是partition級的轉換,多元素到多元素或單元素的轉換。
還記得從WordCount看Spark大數據處理的核心機制(1)中我們扒開的countByValue函數嗎?它就是通過mapPartitions來統計每個partition上所有單詞的計數。
def union(other: RDD[T]): RDD[T]
將兩個RDD[T]合並成一個RDD[T]。
可能一開始會覺得union操作會耗時較大,實際上這個操作非常廉價。RDD的元信息中包含了Partition/Lineage等信息,union只是合並元信息,而並不涉及具體的數據,so easy。
def distinct(): RDD[T]
將原RDD[T]轉換成新的RDD[T],但每個元素只出現一次。
distinct從業務意義上很容易理解,但消耗卻不少,需要通過網絡交換各個Partition的數據,小伙伴們要注意了。
以下所有的轉換都僅針對RDD[(K, V)]有效,是通過把RDD[(K, V)]隱式轉換成PairRDDFunctions[K, V]獲得的。使用前一定要導入SparkContext內的隱式轉化函數,如下:
import org.apache.spark.SparkContext._
不然找不到下面的函數,不要說一碼不負責亂說哈。
隱式轉換是Scala帶來的好東西,類似於C#或Ruby中可以把類打開的功能,實在是寫出優雅代碼不可多得的工具。不清楚的小伙伴記得一定要GFSOSO哈。
def groupByKey(): RDD[(K, Iterable[V])]
將RDD[(K, V)]中所有(K, V)鍵值對按K進行分組,每組一個元素,形成新的RDD[(K, Iterable[V])]。
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
將RDD[(K, V)]中所有(K, V)鍵值對按K進行分組歸並,最終每個K只有一個(K, V)與之對應。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
將RDD[(K, V)]與RDD[(K, W)做join操作,形成新的RDD[(K, (V, W)),最終形成的RDD中只有兩個RDD中共有的K。
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
將RDD[(K, V)]與RDD[(K, W)一起分組。
值得注意的是:結果里面包含兩個RDD中所有的K,也就是說Iterable[V]和Iterable[W]中可能某個為空。
def mapValues[U](f: V => U): RDD[(K, U)]
僅對(K, V)中的V進行轉換,轉換后和原先的K一起形成新的(K, U)鍵值對,通常和groupByKey一起使用。
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
將RDD[(K, V)]按照K進行重新分區。
重新分區對每個分區數據非常少的情況很有幫助。減少分區,任務數量也會隨着分區的減少而減少,降低大量任務調度的開銷。
所有轉換返回的一定是RDD。
動作
def count(): Long
統計RDD[T]中元素T的個數。
def reduce(f: (T, T) => T): T
對RDD[T]中的元素進行兩兩合並,最終合並成一個值。
比如對RDD[Int]中的所有元素求和:ints.reduce((i1, i2) => i1 + i2)
def collect(): Array[T]
將RDD[T]中所有元素從Slave上收集到Driver上。
該方法應該只應用在元素較少的RDD上,否則Driver一定OutOfMemory。
def saveAsTextFile(path: String): Unit
把RDD[T]中的所有元素保存到磁盤,通常是保存到分布式文件系統。
注意path是個目錄,RDD中的每個元素對應了目錄下的一個文件。保存后形成:/path/part-00000,
/path/part-00001等文件。最大的好處就是解決了通過collect到Driver再保存到磁盤的問題。
常用的RDD API就上面這些,更多請參考Spark官方文檔。
所有動作返回的一定不是RDD。
轉換不會加載數據,僅記錄Lineage而已,而動作會觸發數據的加載,並根據Lineage完成所有的轉換,是延遲計算,極大地提升效率。
RDD對MapReduce的模擬
來看這篇文章的小伙伴們應該都清楚MapReduce模型了,它很容易使用RDD進行描述。假設有一個輸入數據集(其元素類型為T),和兩個函數myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],RDD API模擬代碼如下:
data.flatMap(myMap)
.groupByKey()
.map((k, vs) => myReduce(k, vs))
如果任務包含combiner,則相應的代碼為:
data.flatMap(myMap)
.reduceByKey(myCombiner)
.map((k, v) => myReduce(k, v))
RDD對Pregel圖計算的模擬
Pregel是面向圖算法的基於BSP范式的編程模型。程序由一系列超步(Superstep)協調迭代運行。在每個超步中,各個頂點執行用戶函數,並更新相應的頂點狀態,變異圖拓撲,然后向下一個超步的頂點集發送消息。這種模型能夠描述很多圖算法,包括最短路徑,雙邊匹配和PageRank等,我們以PageRank為例來說明。
PageRank可是搜索引擎的基礎,經典的大數據算法,還不知道的小伙伴請自行GFSOSO哈。
當前PageRank記為r,頂點表示狀態。在每個超步中,各個頂點向其所有鄰居發送貢獻值r/n,這里n是鄰居的數目。下一個超步開始時,每個頂點將其分值(rank)更新為 α/N + (1 - α) * Σci,這里的求和是各個頂點收到的所有貢獻值的和,N是頂點的總數。
Pregel的通信模式可以用RDD來描述,主要思想是:將每個超步中的頂點狀態和要發送的消息存儲為RDD,然后根據頂點ID分組,進行Shuffle通信(即cogroup操作)。然后對每個頂點ID上的狀態和消息應用用戶函數(即mapValues操作),產生一個新的RDD,即(VertexID, (NewState, OutgoingMessages))。然后執行map操作分離出下一次迭代的頂點狀態和消息(即mapValues和flatMap操作)。代碼如下:
val vertices = // RDD of (ID, Vertice) pairs
val incomingMessages = // RDD of (ID, Message) pairs
val grouped = vertices.cogroup(incomingMessages)
val newData = grouped.mapValues {
(vert, incomingMsgs) => spreadRank(vert, incomingMsgs)
// returns (newState, outgoingMsgs)
}.cache()
val newVerts = newData.mapValues((v,ms) => v)
val newMsgs = newData.flatMap((id,(v,ms)) => ms)
def spreadRank(...): ... = {
// spread the incoming rank to outgoing rank
}
需要注意的是,這種實現方法中,RDD grouped,newData和newVerts的分區方法與輸入RDD vertices一樣。所以,頂點狀態一直存在於它們開始執行的機器上,這跟原Pregel一樣,這樣就減少了通信成本。因為cogroup和mapValues保持了與輸入RDD相同的分區方法,所以分區是自動進行的。
如果覺得上面這一段有難度,請在微信公眾號上聯系一碼。
經過四篇文章,Spark基礎知識還剩下共享變量,下一篇文章講過共享變量后,開始講開發Spark應用經常遇到的問題,以及如何優化性能。
推薦
查看《Spark大數據處理》系列文章,請進入YoyaProgrammer公眾號,點擊 核心技術,點擊 Spark大數據處理。
分類 Spark大數據處理
優雅程序員 原創 轉載請注明出處