Spark(Accumulator)陷阱及解決辦法


Accumulator簡介

Accumulator是spark提供的累加器,顧名思義,該變量只能夠增加。 
只有driver能獲取到Accumulator的值(使用value方法),Task只能對其做增加操作(使用 +=)。你也可以在為Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,可以幫助你了解程序運行的情況。

Accumulator使用

使用示例

舉個最簡單的accumulator的使用例子:

//在driver中定義
val accum = sc.accumulator(0, "Example Accumulator")
//在task中進行累加
sc.parallelize(1 to 10).foreach(x=> accum += 1)

//在driver中輸出
accum.value
//結果將返回10
res: 10

累加器的錯誤用法

val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator統計偶數出現的次數,同時偶數返回0,奇數返回1
val newData = data.map{x => {
if(x%2 == 0){
accum += 1
0
}else 1
}}
//使用action操作觸發執行
newData.count
//此時accum的值為5,是我們要的結果
accum.value

//繼續操作,查看剛才變動的數據,foreach也是action操作
newData.foreach(println)
//上個步驟沒有進行累計器操作,可是累加器此時的結果已經是10了
//這並不是我們想要的結果
accum.value

原因分析

官方對這個問題的解釋如下描述:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
我們都知道,spark中的一系列transform操作會構成一串長的任務鏈,此時需要通過一個action操作來觸發,accumulator也是一樣。因此在一個action操作之前,你調用value方法查看其數值,肯定是沒有任何變化的。

所以在第一次count(action操作)之后,我們發現累加器的數值變成了5,是我們要的答案。

之后又對新產生的的newData進行了一次foreach(action操作),其實這個時候又執行了一次map(transform)操作,所以累加器又增加了5。最終獲得的結果變成了10。


解決辦法

看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結果的准確性。

事實上,還是有解決方案的,只要將任務之間的依賴關系切斷就可以了。什么方法有這種功能呢?你們肯定都想到了,cache,persist。調用這個方法的時候會將之前的依賴切除,后續的累加器就不會再被之前的transfrom操作影響到了。

//
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)

//代碼和上方相同
val newData = data.map{x => {...}}
//使用cache緩存數據,切斷依賴。
newData.cache.count
//此時accum的值為5
accum.value

newData.foreach(println)
//此時的accum依舊是5
accum.value

總結

使用Accumulator時,為了保證准確性,只使用一次action操作。如果需要使用多次則使用cache或persist操作切斷依賴。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM