Spark累加器(Accumulator)陷阱及解決辦法


累加器(accumulator)是Spark中提供的一種分布式的變量機制,其原理類似於mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調試時對作業執行過程中的事件進行計數。

Spark內置的提供了Long和Double類型的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數的同時進行計數,最后計算剩下整數的和。

 

 1     val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")
 2     val sc = new SparkContext(sparkConf)
 3     val accum = sc.longAccumulator("longAccum") //統計奇數的個數
 4     val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
 5       if(n%2!=0) accum.add(1L) 
 6       n%2==0
 7     }).reduce(_+_)
 8  
 9     println("sum: "+sum)
10     println("accum: "+accum.value)
11  
12     sc.stop()

 

 

 

 

結果為:

sum: 20
accum: 5

 

這是結果正常的情況,但是在使用累加器的過程中如果對於spark的執行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。

少加的情況:

對於如下代碼:

 

1     val accum = sc.longAccumulator("longAccum")
2     val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
3       accum.add(1L)
4       n+1
5     })
6     println("accum: "+accum.value)


執行完畢,打印的值是多少呢?答案是0,因為累加器不會改變spark的lazy的計算模型,即在打印的時候像map這樣的transformation還沒有真正的執行,從而累加器的值也就不會更新。

 

多加的情況:

對於如下代碼:

 

1     val accum = sc.longAccumulator("longAccum")
2     val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
3       accum.add(1L)
4       n+1
5     })
6     numberRDD.count
7     println("accum1:"+accum.value)
8     numberRDD.reduce(_+_)
9     println("accum2: "+accum.value)
結果我們得到了:

accum1:9

accum2: 18

 

我們雖然只在map里進行了累加器加1的操作,但是兩次得到的累加器的值卻不一樣,這是由於count和reduce都是action類型的操作,觸發了兩次作業的提交,所以map算子實際上被執行了了兩次,在reduce操作提交作業后累加器又完成了一輪計數,所以最終累加器的值為18。究其原因是因為count雖然促使numberRDD被計出來,但是由於沒有對其進行緩存,所以下次再次需要使用numberRDD這個數據集是,還需要從並行化數據集的部分開始執行計算。解釋到這里,這個問題的解決方法也就很清楚了,就是在count之前調用numberRDD的cache方法(或persist),這樣在count后數據集就會被緩存下來,reduce操作就會讀取緩存的數據集而無需從頭開始計算了。改成如下代碼即可:

 

1     val accum = sc.longAccumulator("longAccum")
2     val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
3       accum.add(1L)
4       n+1
5     })
6     numberRDD.cache().count
7     println("accum1:"+accum.value)
8     numberRDD.reduce(_+_)
9     println("accum2: "+accum.value)

這次兩次打印的值就會保持一致了。

 

自定義累加器

自定義累加器類型的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。官方同時給出了一個實現的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應用執行過程中的一些信息。例如,我們可以用這個類收集Spark處理數據時的一些細節,當然,由於累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規模要加以控制,不宜過大。
實現自定義類型累加器需要繼承AccumulatorV2並至少覆寫下例中出現的方法,下面這個累加器可以用於在程序運行過程中收集一些文本類信息,最終以Set[String]的形式返回。
 1 import java.util
 2  
 3 import org.apache.spark.util.AccumulatorV2
 4  
 5 class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
 6   private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
 7  
 8   override def isZero: Boolean = {
 9     _logArray.isEmpty
10   }
11  
12   override def reset(): Unit = {
13     _logArray.clear()
14   }
15  
16   override def add(v: String): Unit = {
17     _logArray.add(v)
18   }
19  
20   override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
21     other match {
22       case o: LogAccumulator => _logArray.addAll(o.value)
23     }
24  
25   }
26  
27   override def value: java.util.Set[String] = {
28     java.util.Collections.unmodifiableSet(_logArray)
29   }
30  
31   override def copy(): AccumulatorV2[String, util.Set[String]] = {
32     val newAcc = new LogAccumulator()
33     _logArray.synchronized{
34       newAcc._logArray.addAll(_logArray)
35     }
36     newAcc
37   }
38 }

 

 


測試類:
 
 1 import java.util
 2  
 3 import org.apache.spark.util.AccumulatorV2
 4  
 5 class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
 6   private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
 7  
 8   override def isZero: Boolean = {
 9     _logArray.isEmpty
10   }
11  
12   override def reset(): Unit = {
13     _logArray.clear()
14   }
15  
16   override def add(v: String): Unit = {
17     _logArray.add(v)
18   }
19  
20   override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
21     other match {
22       case o: LogAccumulator => _logArray.addAll(o.value)
23     }
24  
25   }
26  
27   override def value: java.util.Set[String] = {
28     java.util.Collections.unmodifiableSet(_logArray)
29   }
30  
31   override def copy(): AccumulatorV2[String, util.Set[String]] = {
32     val newAcc = new LogAccumulator()
33     _logArray.synchronized{
34       newAcc._logArray.addAll(_logArray)
35     }
36     newAcc
37   }
38 }

 


本例中利用自定義的收集器收集過濾操作中被過濾掉的元素,當然這部分的元素的數據量不能太大。運行結果如下:
sum; 32
7cd 4b 2a 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM