由於spark是分布式的計算,所以使得每個task間不存在共享的變量,而為了實現共享變量spark實現了兩種類型 - 累加器與廣播變量,
對於其概念與理解可以參考:共享變量(廣播變量和累加器) 。可能需要注意:Spark累加器(Accumulator)陷阱及解決辦法
因此,我們便可以利用累加器與廣播變量來構造一些比較常用的關系,以Map的形式廣播出去,提高效率。
如下通過累加器構造了一個DF數據間
的映射關系,
defgetMap(spark:SparkSession,data:DataFrame){ //通過collectionAccumulator構造Map關系 valmyAccumulator=spark.sparkContext.collectionAccumulator[(String,Long)] data.foreach( row=>{ valname=row.getAs[String]("name") valage=row.getAs[Long]("age") myAccumulator.add(name,age) } ) valaiterator:util.Iterator[(String,Long)]=myAccumulator.value.iterator() varnewMap:Map[String,Long]=Map() while(aiterator.hasNext){ vala=aiterator.next() valkey=a._1 valvalue=a._2 if(!newMap.contains(key)){ newMap+=(key->value) } else{ valoldvalue=newMap(key) newMap+=(key->(oldvalue+value)) } } }