Spark使用總結與分享


背景

   

使用spark開發已有幾個月。相比於python/hive,scala/spark學習門檻較高。尤其記得剛開時,舉步維艱,進展十分緩慢。不過謝天謝地,這段苦澀(bi)的日子過去了。憶苦思甜,為了避免項目組的其他同學走彎路,決定總結和梳理spark的使用經驗。

   

Spark基礎

   

基石RDD

   

spark的核心是RDD(彈性分布式數據集),一種通用的數據抽象,封裝了基礎的數據操作,如map,filter,reduce等。RDD提供數據共享的抽象,相比其他大數據處理框架,如MapReduce,Pegel,DryadLINQ和HIVE等均缺乏此特性,所以RDD更為通用。

   

簡要地概括RDD:RDD是一個不可修改的,分布的對象集合。每個RDD由多個分區組成,每個分區可以同時在集群中的不同節點上計算。RDD可以包含Python,Java和Scala中的任意對象。

   

Spark生態圈中應用都是基於RDD構建(下圖),這一點充分說明RDD的抽象足夠通用,可以描述大多數應用場景。

   

RDD操作類型—轉換和動作

   

RDD的操作主要分兩類:轉換(transformation)和動作(action)。兩類函數的主要區別是,轉換接受RDD並返回RDD,而動作接受RDD但是返回非RDD。轉換采用惰性調用機制,每個RDD記錄父RDD轉換的方法,這種調用鏈表稱之為血緣(lineage);而動作調用會直接計算。

采用惰性調用,通過血緣連接的RDD操作可以管道化(pipeline),管道化的操作可以直接在單節點完成,避免多次轉換操作之間數據同步的等待

使用血緣串聯的操作可以保持每次計算相對簡單,而不用擔心有過多的中間數據,因為這些血緣操作都管道化了,這樣也保證了邏輯的單一性,而不用像MapReduce那樣,為了竟可能的減少map reduce過程,在單個map reduce中寫入過多復雜的邏輯。

   

   

RDD使用模式

   

RDD使用具有一般的模式,可以抽象為下面的幾步

  1. 加載外部數據,創建RDD對象
  2. 使用轉換(如filter),創建新的RDD對象
  3. 緩存需要重用的RDD
  4. 使用動作(如count),啟動並行計算

   

RDD高效的策略

   

Spark官方提供的數據是RDD在某些場景下,計算效率是Hadoop的20X。這個數據是否有水分,我們先不追究,但是RDD效率高的由一定機制保證的:

  1. RDD數據只讀,不可修改。如果需要修改數據,必須從父RDD轉換(transformation)到子RDD。所以,在容錯策略中,RDD沒有數據冗余,而是通過RDD父子依賴(血緣)關系進行重算實現容錯。
  2. RDD數據在內存中,多個RDD操作之間,數據不用落地到磁盤上,避免不必要的I/O操作。
  3. RDD存放的數據可以是java對象,所以避免的不必要的對象序列化和反序列化。

總而言之,RDD高效的主要因素是盡量避免不必要的操作和犧牲數據的操作精度,用來提高計算效率。

   

   

Spark使用技巧

   

RDD基本函數擴展

   

RDD雖然提供了很多函數,但是畢竟還是有限的,有時候需要擴展,自定義新的RDD的函數。在spark中,可以通過隱式轉換,輕松實現對RDD擴展。畫像開發過程中,平凡的會使用rollup操作(類似HIVE中的rollup),計算多個級別的聚合數據。下面是具體實,

/**

* 擴展spark rdd,為rdd提供rollup方法

*/

implicit class RollupRDD[T: ClassTag](rdd: RDD[(Array[String], T)]) extends Serializable {

 

/**

* 類似Sql中的rollup操作

*

* @param aggregate 聚合函數

* @param keyPlaceHold key占位符,默認采用FaceConf.STAT_SUMMARY

* @param isCache,確認是否緩存數據

* @return 返回聚合后的數據

*/

def rollup[U: ClassTag](

aggregate: Iterable[T] => U,

keyPlaceHold: String = FaceConf.STAT_SUMMARY,

isCache: Boolean = true): RDD[(Array[String], U)] = {

 

if (rdd.take(1).isEmpty) {

return rdd.map(x => (Array[String](), aggregate(Array[T](x._2))))

}

 

if (isCache) {

rdd.cache // 提高計算效率

}

val totalKeyCount = rdd.first._1.size

val result = { 1 to totalKeyCount }.par.map(untilKeyIndex => { // 並行計算

rdd.map(row => {

val combineKey = row._1.slice(0, untilKeyIndex).mkString(FaceConf.KEY_SEP) // 組合key

(combineKey, row._2)

}).groupByKey.map(row => { // 聚合計算

val oldKeyList = row._1.split(FaceConf.KEY_SEP)

val newKeyList = oldKeyList ++ Array.fill(totalKeyCount - oldKeyList.size) { keyPlaceHold }

(newKeyList, aggregate(row._2))

})

}).reduce(_ ++ _) // 聚合結果

 

result

}

 

}

上面代碼聲明了一個隱式類,具有一個成員變量rdd,類型是RDD[(Array[String], T)],那么如果應用代碼中出現了任何這樣的rdd對象,並且import當前的隱式轉換,那么編譯器就會將這個rdd當做上面的隱式類的對象,也就可以使用rollup函數,和一般的map,filter方法一樣。

   

   

RDD操作閉包外部變量原則

   

RDD相關操作都需要傳入自定義閉包函數(closure),如果這個函數需要訪問外部變量,那么需要遵循一定的規則,否則會拋出運行時異常。閉包函數傳入到節點時,需要經過下面的步驟:

  1. 驅動程序,通過反射,運行時找到閉包訪問的所有變量,並封成一個對象,然后序列化該對象
  2. 將序列化后的對象通過網絡傳輸到worker節點
  3. worker節點反序列化閉包對象
  4. worker節點執行閉包函數,

注意:外部變量在閉包內的修改不會被反饋到驅動程序。

簡而言之,就是通過網絡,傳遞函數,然后執行。所以,被傳遞的變量必須可以序列化,否則傳遞失敗。本地執行時,仍然會執行上面四步。

   

廣播機制也可以做到這一點,但是頻繁的使用廣播會使代碼不夠簡潔,而且廣播設計的初衷是將較大數據緩存到節點上,避免多次數據傳輸,提高計算效率,而不是用於進行外部變量訪問。

   

   

RDD數據同步

   

RDD目前提供兩個數據同步的方法:廣播和累計器。

   

廣播 broadcast

前面提到過,廣播可以將變量發送到閉包中,被閉包使用。但是,廣播還有一個作用是同步較大數據。比如你有一個IP庫,可能有幾G,在map操作中,依賴這個ip庫。那么,可以通過廣播將這個ip庫傳到閉包中,被並行的任務應用。廣播通過兩個方面提高數據共享效率:1,集群中每個節點(物理機器)只有一個副本,默認的閉包是每個任務一個副本;2,廣播傳輸是通過BT下載模式實現的,也就是P2P下載,在集群多的情況下,可以極大的提高數據傳輸速率。廣播變量修改后,不會反饋到其他節點。

   

累加器 Accumulator

累加器是一個write-only的變量,用於累加各個任務中的狀態,只有在驅動程序中,才能訪問累加器。而且,截止到1.2版本,累加器有一個已知的缺陷,在action操作中,n個元素的RDD可以確保累加器只累加n次,但是在transformation時,spark不確保,也就是累加器可能出現n+1次累加。

   

目前RDD提供的同步機制粒度太粗,尤其是轉換操作中變量狀態不能同步,所以RDD無法做復雜的具有狀態的事務操作。不過,RDD的使命是提供一個通用的並行計算框架,估計永遠也不會提供細粒度的數據同步機制,因為這與其設計的初衷是違背的。

   

RDD優化技巧

   

RDD緩存

需要使用多次的數據需要cache,否則會進行不必要的重復操作。舉個例子

val data = … // read from tdw

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

上面三段代碼中,data變量會加載兩次,高效的做法是在data加載完后,立刻持久化到內存中,如下

val data = … // read from tdw

data.cache

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

這樣,data在第一加載后,就被緩存到內存中,后面兩次操作均直接使用內存中的數據。

   

轉換並行化

RDD的轉換操作時並行化計算的,但是多個RDD的轉換同樣是可以並行的,參考如下

val dataList:Array[RDD[Int]] = …

val sumList = data.list.map(_.map(_.sum))

上面的例子中,第一個map是便利Array變量,串行的計算每個RDD中的每行的sum。由於每個RDD之間計算是沒有邏輯聯系的,所以理論上是可以將RDD的計算並行化的,在scala中可以輕松試下,如下

val dataList:Array[RDD[Int]] = …

val sumList = data.list.par.map(_.map(_.sum))

注意紅色代碼。

   

減少shuffle網絡傳輸

一般而言,網絡I/O開銷是很大的,減少網絡開銷,可以顯著加快計算效率。任意兩個RDD的shuffle操作(join等)的大致過程如下,

用戶數據userData和事件events數據通過用戶id連接,那么會在網絡中傳到另外一個節點,這個過程中,有兩個網絡傳輸過程。Spark的默認是完成這兩個過程。但是,如果你多告訴spark一些信息,spark可以優化,只執行一個網絡傳輸。可以通過使用、HashPartition,在userData"本地"先分區,然后要求events直接shuffle到userData的節點上,那么就減少了一部分網絡傳輸,減少后的效果如下,

虛線部分都是在本地完成的,沒有網絡傳輸。在數據加載時,就按照key進行partition,這樣可以經一部的減少本地的HashPartition的過程,示例代碼如下

val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…")

.partitionBy(new HashPartitioner(100)) // Create 100 partitions

.persist()

注意,上面一定要persist,否則會重復計算多次。100用來指定並行數量。

   

Spark其他

   

Spark開發模式

   

由於spark應用程序是需要在部署到集群上運行的,導致本地調試比較麻煩,所以經過這段時間的經驗累積,總結了一套開發流程,目的是為了盡可能的提高開發調試效率,同時保證開發質量。當然,這套流程可能也不是最優的,后面需要持續改進。

整個流程比較清楚,這里主要談談為什么需要單元測試。公司內的大多數項目,一般不提倡單元測試,而且由於項目進度壓力,開發人員會非常抵觸單元測試,因為會花費"額外"的精力。Bug這東西不會因為項目趕進度而消失,而且恰好相反,可能因為趕進度,而高於平均水平。所以,如果不花時間進行單元測試,那么會花同樣多,甚至更多的時間調試。很多時候,往往一些很小的bug,卻導致你花了很長時間去調試,而這些bug,恰好是很容易在單元測試中發現的。而且,單元測試還可以帶來兩個額外的好處:1)API使用范例;2)回歸測試。所以,還是單元測試吧,這是一筆投資,而且ROI還挺高!不過凡事需要掌握分寸,單元測試應該根據項目緊迫程度調整粒度,做到有所為,有所不為。

 

Spark其他功能

   

前面提到了spark生態圈,spark除了核心的RDD,還提供了之上的幾個很使用的應用:

  1. Spark SQL: 類似hive,使用rdd實現sql查詢
  2. Spark Streaming: 流式計算,提供實時計算功能,類似storm
  3. MLLib:機器學習庫,提供常用分類,聚類,回歸,交叉檢驗等機器學習算法並行實現。
  4. GraphX:圖計算框架,實現了基本的圖計算功能,常用圖算法和pregel圖編程框架。

   

后面需要繼續學習和使用上面的功能,尤其是與數據挖掘強相關的MLLib。

   

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM