一、累加器簡介
在Spark中如果想在Task計算的時候統計某些事件的數量,使用filter/reduce也可以,但是使用累加器是一種更方便的方式,累加器一個比較經典的應用場景是用來在Spark Streaming應用中記錄某些事件的數量。
使用累加器時需要注意只有Driver能夠取到累加器的值,Task端進行的是累加操作。
創建的Accumulator變量的值能夠在Spark Web UI上看到,在創建時應該盡量為其命名,下面探討如何在Spark Web UI上查看累加器的值。
示例代碼:
package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* @author CC11001100
*/
public class SparkWebUIShowAccumulatorDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
LongAccumulator fooCount = spark.sparkContext().longAccumulator("fooCount");
spark.createDataset(Collections.singletonList(1024), Encoders.INT())
.foreach((ForeachFunction<Integer>) fooCount::add);
try {
TimeUnit.DAYS.sleep(365 * 10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
啟動的時候注意觀察控制台上輸出的Spark Web UI的地址:
打開此鏈接,點進去Jobs-->Stage,可以看到fooCount累加器的值已經被累加到了1024:
二、Accumulator的簡單使用
Spark內置了三種類型的Accumulator,分別是LongAccumulator用來累加整數型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素。
package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 累加器的基本使用
*
* @author CC11001100
*/
public class AccumulatorsSimpleUseDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
// 內置的累加器有三種,LongAccumulator、DoubleAccumulator、CollectionAccumulator
// LongAccumulator: 數值型累加
LongAccumulator longAccumulator = sc.longAccumulator("long-account");
// DoubleAccumulator: 小數型累加
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");
// CollectionAccumulator:集合累加
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");
Dataset<Integer> num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
Dataset<Integer> num2 = num1.map((MapFunction<Integer, Integer>) x -> {
longAccumulator.add(x);
doubleAccumulator.add(x);
collectionAccumulator.add(x);
return x;
}, Encoders.INT()).cache();
num2.count();
System.out.println("longAccumulator: " + longAccumulator.value());
System.out.println("doubleAccumulator: " + doubleAccumulator.value());
// 注意,集合中元素的順序是無法保證的,多運行幾次發現每次元素的順序都可能會變化
System.out.println("collectionAccumulator: " + collectionAccumulator.value());
}
}
三、自定義Accumulator
當內置的Accumulator無法滿足要求時,可以繼承AccumulatorV2實現自定義的累加器。
實現自定義累加器的步驟:
1. 繼承AccumulatorV2,實現相關方法
2. 創建自定義Accumulator的實例,然后在SparkContext上注冊它
假設要累加的數非常大,內置的LongAccumulator已經無法滿足需求,下面是一個簡單的例子用來累加BigInteger:
package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
/**
* 自定義累加器
*
* @author CC11001100
*/
public class CustomAccumulatorDemo {
// 需要注意的是累加操作不能依賴順序,比如類似於StringAccumulator這種則會得到錯誤的結果
public static class BigIntegerAccumulator extends AccumulatorV2<BigInteger, BigInteger> {
private BigInteger num = BigInteger.ZERO;
public BigIntegerAccumulator() {
}
public BigIntegerAccumulator(BigInteger num) {
this.num = new BigInteger(num.toString());
}
@Override
public boolean isZero() {
return num.compareTo(BigInteger.ZERO) == 0;
}
@Override
public AccumulatorV2<BigInteger, BigInteger> copy() {
return new BigIntegerAccumulator(num);
}
@Override
public void reset() {
num = BigInteger.ZERO;
}
@Override
public void add(BigInteger num) {
this.num = this.num.add(num);
}
@Override
public void merge(AccumulatorV2<BigInteger, BigInteger> other) {
num = num.add(other.value());
}
@Override
public BigInteger value() {
return num;
}
}
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
// 直接new自定義的累加器
BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();
// 然后在SparkContext上注冊一下
sc.register(bigIntegerAccumulator, "bigIntegerAccumulator");
List<BigInteger> numList = Arrays.asList(new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"));
Dataset<BigInteger> num = spark.createDataset(numList, Encoders.kryo(BigInteger.class));
Dataset<BigInteger> num2 = num.map((MapFunction<BigInteger, BigInteger>) x -> {
bigIntegerAccumulator.add(x);
return x;
}, Encoders.kryo(BigInteger.class));
num2.count();
System.out.println("bigIntegerAccumulator: " + bigIntegerAccumulator.value());
}
}
思考:內置的累加器LongAccumulator、DoubleAccumulator、CollectionAccumulator和我上面的自定義BigIntegerAccumulator,它們都有一個共同的特點,就是最終的結果不受累加數據順序的影響(對於CollectionAccumulator來說,可以簡單的將結果集看做是一個無序Set),看到網上有博主舉例子StringAccumulator,這個就是一個錯誤的例子,就相當於開了一百個線程,每個線程隨機sleep若干毫秒然后往StringBuffer中追加字符,最后追加出來的字符串是無法被預測的。總結一下就是累加器的最終結果應該不受累加順序的影響,否則就要重新審視一下這個累加器的設計是否合理。
四、使用Accumulator的陷阱
來討論一下使用累加器的一些陷阱,累加器的累加是在Task中進行的,而這些Task就是我們在Dataset上調用的一些算子操作,這些算子操作有Transform的,也有Action的,來探討一下不同類型的算子對Accumulator有什么影響。
package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 累加器使用的陷阱
*
* @author CC11001100
*/
public class AccumulatorTrapDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
LongAccumulator longAccumulator = sc.longAccumulator("long-account");
// ------------------------------- 在transform算子中的錯誤使用 -------------------------------------------
Dataset<Integer> num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
Dataset<Integer> nums2 = num1.map((MapFunction<Integer, Integer>) x -> {
longAccumulator.add(1);
return x;
}, Encoders.INT());
// 因為沒有Action操作,nums.map並沒有被執行,因此此時廣播變量的值還是0
System.out.println("num2 1: " + longAccumulator.value()); // 0
// 調用一次action操作,num.map得到執行,廣播變量被改變
nums2.count();
System.out.println("num2 2: " + longAccumulator.value()); // 3
// 又調用了一次Action操作,廣播變量所在的map又被執行了一次,所以累加器又被累加了一遍,就悲劇了
nums2.count();
System.out.println("num2 3: " + longAccumulator.value()); // 6
// ------------------------------- 在transform算子中的正確使用 -------------------------------------------
// 累加器不應該被重復使用,或者在合適的時候進行cache斷開與之前Dataset的血緣關系,因為cache了就不必重復計算了
longAccumulator.setValue(0);
Dataset<Integer> nums3 = num1.map((MapFunction<Integer, Integer>) x -> {
longAccumulator.add(1);
return x;
}, Encoders.INT()).cache(); // 注意這個地方進行了cache
// 因為沒有Action操作,nums.map並沒有被執行,因此此時廣播變量的值還是0
System.out.println("num3 1: " + longAccumulator.value()); // 0
// 調用一次action操作,廣播變量被改變
nums3.count();
System.out.println("num3 2: " + longAccumulator.value()); // 3
// 又調用了一次Action操作,因為前一次調用count時num3已經被cache,num2.map不會被再執行一遍,所以這里的值還是3
nums3.count();
System.out.println("num3 3: " + longAccumulator.value()); // 3
// ------------------------------- 在action算子中的使用 -------------------------------------------
longAccumulator.setValue(0);
num1.foreach(x -> {
longAccumulator.add(1);
});
// 因為是Action操作,會被立即執行所以打印的結果是符合預期的
System.out.println("num4: " + longAccumulator.value()); // 3
}
}
五、Accumulator使用的奇淫技巧
累加器並不是只能用來實現加法,也可以用來實現減法,直接把要累加的數值改成負數就可以了:
package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 使用累加器實現減法
*
* @author CC11001100
*/
public class AccumulatorSubtraction {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
Dataset<Integer> nums = spark.createDataset(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), Encoders.INT());
LongAccumulator longAccumulator = spark.sparkContext().longAccumulator("AccumulatorSubtraction");
nums.foreach(x -> {
if (x % 3 == 0) {
longAccumulator.add(-2);
} else {
longAccumulator.add(1);
}
});
System.out.println("longAccumulator: " + longAccumulator.value()); // 2
}
}
相關資料:
1. Accumulators
2. When are accumulators truly reliable?
.
