Spark大數據處理 之 RDD粗粒度轉換的威力


從WordCount看Spark大數據處理的核心機制(2)中我們看到Spark為了支持迭代和交互式數據挖掘,而明確提出了內存中可重用的數據集RDD。RDD的只讀特性,再加上粗粒度轉換操作形成的Lineage,形成了它獨立的高效容錯機制。

RDD的粗粒度的轉換是否有足夠的表達能力,來支持多種多樣的應用需求呢?先看看RDD究竟有哪些API,然后看它們如何模擬Google經典的MapReduce和圖數據處理框架Pregel。

RDD的API

轉換

def map[U](f: T => U): RDD[U]

將RDD[T]經過f轉換成RDD[U],T和U一一映射,兩個RDD元素個數相等

def flatMap[U](f: T => TraversableOnce[U]): RDD[U]

將RDD[T]經過f閉包轉換成RDD[U],一個T可以映射成0到多個U,兩個RDD元素通常不等

def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U]): RDD[U]

mapPartitions是partition級的轉換,多元素到多元素或單元素的轉換。

還記得從WordCount看Spark大數據處理的核心機制(1)中我們扒開的countByValue函數嗎?它就是通過mapPartitions來統計每個partition上所有單詞的計數。

def union(other: RDD[T]): RDD[T]

將兩個RDD[T]合並成一個RDD[T]。

可能一開始會覺得union操作會耗時較大,實際上這個操作非常廉價。RDD的元信息中包含了Partition/Lineage等信息,union只是合並元信息,而並不涉及具體的數據,so easy。

def distinct(): RDD[T]

將原RDD[T]轉換成新的RDD[T],但每個元素只出現一次。

distinct從業務意義上很容易理解,但消耗卻不少,需要通過網絡交換各個Partition的數據,小伙伴們要注意了。

以下所有的轉換都僅針對RDD[(K, V)]有效,是通過把RDD[(K, V)]隱式轉換成PairRDDFunctions[K, V]獲得的。使用前一定要導入SparkContext內的隱式轉化函數,如下:

import org.apache.spark.SparkContext._

不然找不到下面的函數,不要說一碼不負責亂說哈。

隱式轉換是Scala帶來的好東西,類似於C#或Ruby中可以把類打開的功能,實在是寫出優雅代碼不可多得的工具。不清楚的小伙伴記得一定要GFSOSO哈。

def groupByKey(): RDD[(K, Iterable[V])]

將RDD[(K, V)]中所有(K, V)鍵值對按K進行分組,每組一個元素,形成新的RDD[(K, Iterable[V])]。

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

將RDD[(K, V)]中所有(K, V)鍵值對按K進行分組歸並,最終每個K只有一個(K, V)與之對應。

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

將RDD[(K, V)]與RDD[(K, W)做join操作,形成新的RDD[(K, (V, W)),最終形成的RDD中只有兩個RDD中共有的K。

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

將RDD[(K, V)]與RDD[(K, W)一起分組。

值得注意的是:結果里面包含兩個RDD中所有的K,也就是說Iterable[V]和Iterable[W]中可能某個為空。

def mapValues[U](f: V => U): RDD[(K, U)]

僅對(K, V)中的V進行轉換,轉換后和原先的K一起形成新的(K, U)鍵值對,通常和groupByKey一起使用。

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

將RDD[(K, V)]按照K進行重新分區。

重新分區對每個分區數據非常少的情況很有幫助。減少分區,任務數量也會隨着分區的減少而減少,降低大量任務調度的開銷。

所有轉換返回的一定是RDD。

動作

def count(): Long

統計RDD[T]中元素T的個數。

def reduce(f: (T, T) => T): T

對RDD[T]中的元素進行兩兩合並,最終合並成一個值。

比如對RDD[Int]中的所有元素求和:ints.reduce((i1, i2) => i1 + i2)

def collect(): Array[T]

將RDD[T]中所有元素從Slave上收集到Driver上。

該方法應該只應用在元素較少的RDD上,否則Driver一定OutOfMemory。

def saveAsTextFile(path: String): Unit

把RDD[T]中的所有元素保存到磁盤,通常是保存到分布式文件系統。

注意path是個目錄,RDD中的每個元素對應了目錄下的一個文件。保存后形成:/path/part-00000,
/path/part-00001等文件。最大的好處就是解決了通過collect到Driver再保存到磁盤的問題。

常用的RDD API就上面這些,更多請參考Spark官方文檔。

所有動作返回的一定不是RDD。

轉換不會加載數據,僅記錄Lineage而已,而動作會觸發數據的加載,並根據Lineage完成所有的轉換,是延遲計算,極大地提升效率。

RDD對MapReduce的模擬

來看這篇文章的小伙伴們應該都清楚MapReduce模型了,它很容易使用RDD進行描述。假設有一個輸入數據集(其元素類型為T),和兩個函數myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],RDD API模擬代碼如下:

data.flatMap(myMap)
	.groupByKey()
	.map((k, vs) => myReduce(k, vs))

如果任務包含combiner,則相應的代碼為:

data.flatMap(myMap)
	.reduceByKey(myCombiner)
	.map((k, v) => myReduce(k, v))

RDD對Pregel圖計算的模擬

Pregel是面向圖算法的基於BSP范式的編程模型。程序由一系列超步(Superstep)協調迭代運行。在每個超步中,各個頂點執行用戶函數,並更新相應的頂點狀態,變異圖拓撲,然后向下一個超步的頂點集發送消息。這種模型能夠描述很多圖算法,包括最短路徑,雙邊匹配和PageRank等,我們以PageRank為例來說明。

PageRank可是搜索引擎的基礎,經典的大數據算法,還不知道的小伙伴請自行GFSOSO哈。

當前PageRank記為r,頂點表示狀態。在每個超步中,各個頂點向其所有鄰居發送貢獻值r/n,這里n是鄰居的數目。下一個超步開始時,每個頂點將其分值(rank)更新為 α/N + (1 - α) * Σci,這里的求和是各個頂點收到的所有貢獻值的和,N是頂點的總數。

Pregel的通信模式可以用RDD來描述,主要思想是:將每個超步中的頂點狀態和要發送的消息存儲為RDD,然后根據頂點ID分組,進行Shuffle通信(即cogroup操作)。然后對每個頂點ID上的狀態和消息應用用戶函數(即mapValues操作),產生一個新的RDD,即(VertexID, (NewState, OutgoingMessages))。然后執行map操作分離出下一次迭代的頂點狀態和消息(即mapValues和flatMap操作)。代碼如下:

val vertices = // RDD of (ID, Vertice) pairs
val incomingMessages = // RDD of (ID, Message) pairs
val grouped = vertices.cogroup(incomingMessages)
val newData = grouped.mapValues {
    (vert, incomingMsgs) => spreadRank(vert, incomingMsgs)
    // returns (newState, outgoingMsgs)
}.cache()
val newVerts = newData.mapValues((v,ms) => v)
val newMsgs = newData.flatMap((id,(v,ms)) => ms)

def spreadRank(...): ... = {
	// spread the incoming rank to outgoing rank
}

需要注意的是,這種實現方法中,RDD grouped,newData和newVerts的分區方法與輸入RDD vertices一樣。所以,頂點狀態一直存在於它們開始執行的機器上,這跟原Pregel一樣,這樣就減少了通信成本。因為cogroup和mapValues保持了與輸入RDD相同的分區方法,所以分區是自動進行的。

如果覺得上面這一段有難度,請在微信公眾號上聯系一碼。

經過四篇文章,Spark基礎知識還剩下共享變量,下一篇文章講過共享變量后,開始講開發Spark應用經常遇到的問題,以及如何優化性能。

推薦

動手寫Count

從WordCount看Spark大數據處理的核心機制(1)

從WordCount看Spark大數據處理的核心機制(2)

RDD粗粒度轉換的威力

查看《Spark大數據處理》系列文章,請進入YoyaProgrammer公眾號,點擊 核心技術,點擊 Spark大數據處理。

分類 Spark大數據處理

優雅程序員 原創 轉載請注明出處

圖片二維碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM