Spark2.0自定義累加器


Spark2.0 自定義累加器

在2.0中使用自定義累加器需要繼承AccumulatorV2這個抽象類,同時必須對以下6個方法進行實現:

1.reset 方法: 將累加器進行重置;

abstract defreset(): Unit

Resets this accumulator, which is zero value.

2.add 方法: 向累加器中添加另一個值;

abstract defadd(v: IN): Unit

3.merge方法: 合並另一個類型相同的累加器;

abstract defmerge(other: AccumulatorV2[IN, OUT]): Unit

Merges another same-type accumulator into this one and update its state, i.e.

4.value 取值

abstract defvalue: OUT

Defines the current value of this accumulator

5.復制:Creates a new copy of this accumulator.

abstract defcopy(): AccumulatorV2[IN, OUT]

6.

abstract defisZero: Boolean

Returns if this accumulator is zero value or not.

 

 

需要注意的是,對累加器的更新只有在action中生效,spark對累加器的每個task的更新只會應用一次,即重新啟動的任務不會更新累加器的值.而在transform中需要注意,每個任務可能會多次進行更新,如果task或者job被重復執行.同時累加器不會改變spark的lazy策略.

由於業務需求經常要構造若干Dataframe間數據的映射關系,而使用collectionAccumulator又要有一定量的重復性的Map操作, 故寫了這個生成Map的自定義累加器,IN為代表key和value的String 類型的tuple,最后生成Map, 如果累加器中已經含有了要添加的key且 key->value不重復則以字符串||對value進行分隔,並更新累加器的值;

代碼如下:

 
         
/**
* Created by Namhwik on 2016/12/27.
*/
class MapAccumulator extends AccumulatorV2[(String,String),mutable.Map[String, String]] {
private val mapAccumulator = mutable.Map[String,String]()
def add(keyAndValue:((String,String))): Unit ={
val key = keyAndValue._1
val value = keyAndValue._2
if (!mapAccumulator.contains(key))
mapAccumulator += key->value
else if(mapAccumulator.get(key).get!=value) {
mapAccumulator += key->(mapAccumulator.get(key).get+"||"+value)
}
}
def isZero: Boolean = {
mapAccumulator.isEmpty
}
def copy(): AccumulatorV2[((String,String)),mutable.Map[String, String]] ={
val newMapAccumulator = new MapAccumulator()
mapAccumulator.foreach(x=>newMapAccumulator.add(x))
newMapAccumulator
}
def value: mutable.Map[String,String] = {
mapAccumulator
}
def merge(other:AccumulatorV2[((String,String)),mutable.Map[String, String]]) = other match
{
case map:MapAccumulator => {
other.value.foreach(x =>
if (!this.value.contains(x._1))
this.add(x)
else
x._2.split("\\|\\|").foreach(
y => {
if (!this.value.get(x._1).get.split("\\|\\|").contains(y))
this.add(x._1, y)
}
)
)
}
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
def reset(): Unit ={
mapAccumulator.clear()
}
}
 

 

 

 

參考 <http://spark.apache.org/docs/latest/programming-guide.html>

 

 

ps:使用的時候需要register.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM