通過繼承AccumulatorV2可以實現自定義累加器。
官方案例可參考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
下面是我自己寫的一個統計卡種數量的案例。
package com.shuai7boy.myscalacode import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 case class Card(var card1Count: Int, var card2Count: Int) class CalcCardCount extends AccumulatorV2[Card, Card] { var result = new Card(0, 0) /** * * 判斷,這個要和reset設定值一致 * * @return */ override def isZero: Boolean = { result.card1Count == 0 && result.card2Count == 0 } /** * * 復制一個新的對象 * * @return */ override def copy(): AccumulatorV2[Card, Card] = { val newCalcCardCount = new CalcCardCount() newCalcCardCount.result = this.result newCalcCardCount } /** * * 重置每個分區的數值 */ override def reset(): Unit = { result.card1Count = 0 result.card2Count = 0 } /** * 每個分區累加自己的數值 * * @param v */ override def add(v: Card): Unit = { result.card1Count += v.card1Count result.card2Count += v.card2Count } /** * * 合並分區值,求得總值 * * @param other */ override def merge(other: AccumulatorV2[Card, Card]): Unit = other match { case o: CalcCardCount => { result.card1Count += o.result.card1Count result.card2Count += o.result.card2Count } } //返回結果 override def value: Card = result } object CardCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("calcCardCountDemo").setMaster("local") val sc = new SparkContext(conf) val cc = new CalcCardCount sc.register(cc) val cardList = sc.parallelize(List[String]("card1 1", "card1 3", "card1 7", "card2 5", "card2 2"), 2) val cardMapRDD = cardList.map(card => { var cardInfo = new Card(0, 0) card.split(" ")(0) match { case "card1" => cardInfo = Card(card.split(" ")(1).toInt, 0) case "card2" => cardInfo = Card(0, card.split(" ")(1).toInt) case _ => Card(0, 0) } cc.add(cardInfo) }) cardMapRDD.count() //執行action,觸發上面的累加操作 println("card1總數量為:" + cc.result.card1Count + ",card2總數量為:" + cc.result.card2Count) } }
打印結果是:
card1總數量為:11,card2總數量為:7
通過上面代碼,就可以同時統計兩個變量的值了,當然如果需要更多,可以擴展。默認的累加器只實現了一個。