Spark 累加器


由於spark是分布式的計算,所以使得每個task間不存在共享的變量,而為了實現共享變量spark實現了兩種類型 - 累加器與廣播變量,

對於其概念與理解可以參考:共享變量(廣播變量和累加器)  。可能需要注意:Spark累加器(Accumulator)陷阱及解決辦法

因此,我們便可以利用累加器與廣播變量來構造一些比較常用的關系,以Map的形式廣播出去,提高效率。

如下通過累加器構造了一個DF數據間
的映射關系,

defgetMap(spark:SparkSession,data:DataFrame){
//通過collectionAccumulator構造Map關系
valmyAccumulator=spark.sparkContext.collectionAccumulator[(String,Long)]
data.foreach(
row=>{
valname=row.getAs[String]("name")
valage=row.getAs[Long]("age")
myAccumulator.add(name,age)
}
)
valaiterator:util.Iterator[(String,Long)]=myAccumulator.value.iterator()
varnewMap:Map[String,Long]=Map()
while(aiterator.hasNext){
vala=aiterator.next()
valkey=a._1
valvalue=a._2
if(!newMap.contains(key)){
newMap+=(key->value)
}
else{
valoldvalue=newMap(key)
newMap+=(key->(oldvalue+value))
}
}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM