一:累加器簡介
(一)累加器用途
在spark應用程序中,我們經常會有這樣的需求,如異常監控,調試,記錄符合某特性的數據的數目,這種需求都需要用到計數器,
如果一個變量不被聲明為一個累加器,那么它將在被改變時不會再driver端進行全局匯總,
即在分布式運行時每個task運行的只是原始變量的一個副本,並不能改變原始變量的值,
但是當這個變量被聲明為累加器后,該變量就會有分布式計數的功能。
累加器是用於跨執行器聚合信息的變量。例如,這些信息可以與數據或API診斷相關,比如有多少記錄被破壞了,或者某個特定的庫API被調用了多少次。
(二)累加器概念
在Spark中如果想在Task計算的時候統計某些事件的數量,使用filter/reduce也可以,但是使用累加器是一種更方便的方式,累加器一個比較經典的應用場景是用來在Spark Streaming應用中記錄某些事件的數量。使用累加器時需要注意只有Driver能夠取到累加器的值,Task端進行的是累加操作。(可以認為在task端使用寫鎖,一次只能一個task寫入,不會出現競爭導致數據出錯)
Spark提供的Accumulator,主要用於多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能,只能累加,不能減少。累加器只能在Driver端構建,並只能從Driver端讀取結果,在Task端只能進行累加。
注意:在每個執行器上更新累加器,都會將累加數據轉發回Driver驅動程序。(所以為了避免網絡傳輸次數過大,可以將多次更新的值放入本地變量,到達指定數值后,更新給累加器,減少網絡傳輸次數)
(三)累加器概念圖
二:累加器實現原理
每個Executor各自可以在累加器變量中增加delta值,Executor把delta值發送給Driver,Driver將所有的delta值加在一起。
1.Executor1增加42,Driver接收到之后,VALID的值為42。

2.Executor2增加8,VALID的值為50。Executor3增加10,VALID的值為60。在Driver中讀取該變量時,得到的結果為60。


三:累加器案例講解
(一)前提
處理目錄下的日志文件,某些日志被破壞,導致部分行數據丟失,為空白。我們要統計出來一共有多少行數據被破壞。
(二)不使用累加器的代碼(分析錯誤)---重點
讀取some log file目錄下的所有日志,甚至分區為4.
上述代碼的問題是,當Driver驅動程序打印變量blankLines時,它的值將是零。
這是因為,當Spark將這段代碼發送給每個Task執行程序時,這些變量將成為該執行程序的局部變量,並且其更新后的值不會被轉發回驅動程序。
為了避免這個問題,我們需要在累加器中設置變量:空白行,這樣每個執行器中對這個變量的所有更新都將被轉發回驅動程序。
(三)改進用累加器
這保證了在每個Task執行器上更新累加器空白行,並將更新轉發回驅動程序。
熟悉Hadoop Map-Reduce的人會注意到Spark的累加器類似於Hadoop的Map-Reduce計數器。
三:累加器注意事項
(一) 累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最后的值,在Excutor端更新。
(二)累加器不是一個調優的操作,因為如果不這樣做,結果是錯的(必須這樣做)
(三)累加器沒有改變Spark的lazy計算模型,如果累計器在RDD的操作中更新了,累計器的值只會在RDD作為action的一部分被計算時更新。所以,在lazy的transformation中(如map()),累加器的更新不能保證被執行。
為了安全起見,始終只在action中使用累加器。
(四)更多關於transformation和action可以見四中
四:累加器的使用
(一)累加器累加一個數組
//在driver中定義 val acc = sc.longAccumulator("Test Accumulator") //在task中進行累加 sc.parallelize(1 to 10).foreach(x => acc.add(x)) //在driver中輸出 println(acc.value)
(二)累加器錯誤用法---少加(transformation)
val accum = sc.longAccumulator("Error Accumulator") val numberRDD = sc.parallelize(1 to 10).map(n => { accum.add(1) n + 1 }) println("accumulator: " + accum.value)
執行完畢,打印的值時多少呢?
答案是0,累加器沒有改變Spark的lazy計算模型,如果累計器在RDD的操作中更新了,累計器的值只會在RDD作為action的一部分被計算時更新。所以,在lazy的transformation中(如map()),累加器的更新不能保證被執行。
(三)累加器錯誤用法---多加(transformation和action)
val accum = sc.longAccumulator("Error2 Accumulator") val numberRDD = sc.parallelize(1 to 10).map(n => { accum.add(1) n + 1 }) numberRDD.count() println("accum1: " + accum.value) numberRDD.reduce(_+_) println("accum2: " + accum.value)
結果得到:
accum1:10 accum2: 20
雖然只在map里進行了累加器加1的操作,但是兩次得到的累加器的值卻不一樣,這是由於count和reduce都是action類型的操作,觸發了兩次作業的提交,所以map算子實際上被執行了兩次,在reduce操作提交作業后累加器又完成了一輪計數,所以最終的累加器的值為20。究其原因是因為count雖然促使numberRDD累計出來,但是由於沒有對其進行緩存,所以下次再次需要使用numberRDD這個數據集時,還需要從並行化數據集的部分開始執行計算。解釋道這里,這個問題的解決方法也就很清楚了,就是在count之前調用numberRDD的cache方法(或persist),這樣在count后數據集就會被緩存下來,reduce操作就會讀取緩存的數據集,而無需從頭開始計算。
(四)使用cache保持數據一致
val accum = sc.longAccumulator("Error2 Accumulator") val numberRDD = sc.parallelize(1 to 10).map(n => { accum.add(1) n + 1 }) numberRDD.cache().count() println("accum1: " + accum.value) numberRDD.reduce(_+_) println("accum2: " + accum.value)
這次兩次打印的值就會保持一致了。都是10。
(五)如果累計器在actions操作算子里面執行時,只會累加一遍。可以實現(四)中情況
val accum = sc.longAccumulator("Error2 Accumulator") val numberRDD = sc.parallelize(1 to 10).map(n => { n + 1 }) numberRDD.foreach(t =>{ accum.add(1) }) println("accum1: " + accum.value) numberRDD.reduce(_+_) println("accum2: " + accum.value)
輸出結果:
accum1: 10 accum2: 10
(六)對比transformation和action
對於只在actions執行更新操作的累加器,Spark會保證任務對累加器的更新操作只會應用一次,例如,重啟任務不會更新累加器的值。在transformations中用戶應該意識到,如果任務或作業階段重新執行,每個任務的更新操作會應用多次。
(七)transformation和action補充:聚合算子
def method2(lines: RDD[String]): Unit = { val totalAcc: LongAccumulator = lines.sparkContext.longAccumulator("total") //定義累加器 val dbRDD: RDD[String] = lines.filter(line => { //filter transformation延時計算 val fields = line.split("\\^") val jobs = fields(1) jobs.contains("大數據") || jobs.contains("hadoop") || jobs.contains("spark") }) val eduRDD = dbRDD.map( line => { //map transformation延時計算 totalAcc.add(1) val fields = line.split("\\^") val edu = fields(6) (edu, 1) }).reduceByKey(_ + _) //reduce transformation延時計算 //確保累加器執行 eduRDD.count() //count action直接觸發計算 val sum = totalAcc.value println(sum) eduRDD.foreach{ case (edu, count) =>{ println(s"學歷是${edu}的人數為${count},占得比例為: "+count/(sum+0.0)) }} println(totalAcc.value) }
上面代碼結果為sum=448,但是經過foreach()后,得到的totalAcc.value的結果仍然是448?
分析:
測試發現eduRDD是dbRDD經過map和reduceByKey兩個算子得到的結果,所以eduRDD經過多次action算子也不會重復累加了。
但是如果將map和reduceByKey算子分開,並且是在map中添加的累加器,那重復調用reduceByKey后生成的RDD的action算子,將會翻倍累加。(eduRDD1.reduceByKey,其中eduRDD1是通過dbRDD.map添加累加器后生成的),此時為重復調用eduRDD1,即重復進行累加。
但是連着使用兩個算子后,調用將不會重復累加,通過測試發現當經過聚合算子的操作后,得到的RDD重復調用action,累加類不再進行累加。
五:內置累加器
上面的代碼使用了內置支持的Long類型累加器,當然還存在其他內置支持的累加器:double類型、集合Collection類型
(一)集合累加器---官方提供的一個自定義累加器案例
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]") val sc = new SparkContext(conf) case class User(name:String,telephone:String) val collect_plus=Array(User("Alice","15837312345") //這是我們將要進行累加的值 ,User("Bob","13937312666") ,User("Thomas","13637312345") ,User("Tom","18537312777") ,User("Boris","13837312998") ) val rdd05=sc.parallelize(collect_plus,2) val user_plustool=sc.collectionAccumulator[User]("用戶累加器") //定義累加器 rdd05.foreach(user => { val teletephone=user.telephone.reverse if (teletephone(0)==teletephone(1) && teletephone(0)==teletephone(2)){ user_plustool.add(user) //進行累加 } } ) println(user_plustool) } }
六:自定義累加器
(一)自定義累加器講解
可以通過AccumulatorV2創建自己的類型。AccumulatorV2抽象類由多個方法,其中必須重寫的是:
reset:用於重置累加器為0
add:用於向累加器加一個值
merge:用於合並另一個同類型的累加器到當前累加器
其它必須重寫的方法有:
copy():創建此累加器的新副本 isZero():返回該累加器是否為零值 value():獲取此累加器的當前值
(二)自定義累加器實現
下面這個累加器可以用於在程序運行過程中收集一些文本類信息,最終以Set[String]的形式返回。
package com.rk.spark import java.util import org.apache.spark.util.AccumulatorV2 class MyAccumulatorV2 extends AccumulatorV2[String, java.util.Set[String]]{ private val set:java.util.Set[String] = new util.HashSet[String]() override def isZero: Boolean = { set.isEmpty } override def reset(): Unit = { set.clear() } override def add(v: String): Unit = { set.add(v) } override def merge(other: AccumulatorV2[String, util.Set[String]]): Unit = { other match { case o:MyAccumulatorV2 => set.addAll(o.value) } } override def value: java.util.Set[String] = { java.util.Collections.unmodifiableSet(set) } override def copy(): AccumulatorV2[String, util.Set[String]] = { val newAcc = new MyAccumulatorV2() set.synchronized{ newAcc.set.addAll(set) } newAcc } }
(三)測試自定義累加器
package com.rk.spark import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} object AccumulatorTest { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.spark-project").setLevel(Level.OFF) //設置日志打印級別 val conf = new SparkConf() .setMaster("local[*]") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) //自定義 val myAccum = new MyAccumulatorV2 //定義自定義累加器 sc.register(myAccum, "MyAccumulator") val sum: Int = sc.parallelize( Array("1", "2a", "3", "4f", "a5", "6", "2a"), 2) .filter(line => { //用於過濾掉一些數據 val pattern = """^-?(\d+)""" val flag = line.matches(pattern) //返回為0,沒有匹配成功,過濾掉。返回1,匹配成功,保留 if (!flag) { myAccum.add(line) //將我們filter要過濾掉的數據保存到累加器中 } flag }).map(_.toInt).reduce(_ + _) println("sum: " + sum) for (v <- myAccum.value.toArray) { print(v + " ") } println() sc.stop() } }
本例中利用自定義的收集器收集過濾操作中被過濾掉的元素,當然這部分的元素的數據量不能太大。
sum: 10 4f a5 2a