Spark2.0 自定義累加器
在2.0中使用自定義累加器需要繼承AccumulatorV2這個抽象類,同時必須對以下6個方法進行實現:
1.reset 方法: 將累加器進行重置;
abstract defreset(): Unit
Resets this accumulator, which is zero value.
2.add 方法: 向累加器中添加另一個值;
abstract defadd(v: IN): Unit
3.merge方法: 合並另一個類型相同的累加器;
abstract defmerge(other: AccumulatorV2[IN, OUT]): Unit
Merges another same-type accumulator into this one and update its state, i.e.
4.value 取值
abstract defvalue: OUT
Defines the current value of this accumulator
5.復制:Creates a new copy of this accumulator.
abstract defcopy(): AccumulatorV2[IN, OUT]
6.
abstract defisZero: Boolean
Returns if this accumulator is zero value or not.
需要注意的是,對累加器的更新只有在action中生效,spark對累加器的每個task的更新只會應用一次,即重新啟動的任務不會更新累加器的值.而在transform中需要注意,每個任務可能會多次進行更新,如果task或者job被重復執行.同時累加器不會改變spark的lazy策略.
由於業務需求經常要構造若干Dataframe間數據的映射關系,而使用collectionAccumulator又要有一定量的重復性的Map操作, 故寫了這個生成Map的自定義累加器,IN為代表key和value的String 類型的tuple,最后生成Map, 如果累加器中已經含有了要添加的key且 key->value不重復則以字符串||對value進行分隔,並更新累加器的值;
代碼如下:
/**
* Created by Namhwik on 2016/12/27.
*/
class MapAccumulator extends AccumulatorV2[(String,String),mutable.Map[String, String]] {
private val mapAccumulator = mutable.Map[String,String]()
def add(keyAndValue:((String,String))): Unit ={
val key = keyAndValue._1
val value = keyAndValue._2
if (!mapAccumulator.contains(key))
mapAccumulator += key->value
else if(mapAccumulator.get(key).get!=value) {
mapAccumulator += key->(mapAccumulator.get(key).get+"||"+value)
}
}
def isZero: Boolean = {
mapAccumulator.isEmpty
}
def copy(): AccumulatorV2[((String,String)),mutable.Map[String, String]] ={
val newMapAccumulator = new MapAccumulator()
mapAccumulator.foreach(x=>newMapAccumulator.add(x))
newMapAccumulator
}
def value: mutable.Map[String,String] = {
mapAccumulator
}
def merge(other:AccumulatorV2[((String,String)),mutable.Map[String, String]]) = other match
{
case map:MapAccumulator => {
other.value.foreach(x =>
if (!this.value.contains(x._1))
this.add(x)
else
x._2.split("\\|\\|").foreach(
y => {
if (!this.value.get(x._1).get.split("\\|\\|").contains(y))
this.add(x._1, y)
}
)
)
}
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
def reset(): Unit ={
mapAccumulator.clear()
}
}
參考 <http://spark.apache.org/docs/latest/programming-guide.html>
ps:使用的時候需要register.