import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* mapPartitionsWithIndex算子:
* 與mapPartitions相似,可以看見使用到了哪一個partitions
*
* mapPartitions第二個參數preservesPartition(boolean,默認為false)的含義:
* 此標志用於優化目的,當您不修改分區時,將它設置為false,
* 如果您需要修改分區時,將它設置為true,這樣spark可以更有效地執行操作,
* 但如果您不告訴spark,它無法知道你的目的,也將無法達到優化的目的。
*
* 采用分區的話:parallelize優先級最高,其次是conf.set,最后是local[]
*/
public class MapPartitionsWithIndexOperator {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("mapPartitionsWithIndex");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> names = Arrays.asList("w1","w2","w3","w4","w5","W6","W7");
//將list轉為RDD並且分為2個partition
JavaRDD<String> nameRDD = sc.parallelize(names,2);
// Function2入參:第一個參數為partition的index,第二個為入參,第三個為返回值
JavaRDD<String> resultRDD = nameRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer integer, Iterator<String> iterator) throws Exception {
List<String> nameList = new ArrayList<>();
while (iterator.hasNext()){
nameList.add(integer+":"+iterator.next());
}
return nameList.iterator();
}
},true);
//修改sparkRDD分區
JavaRDD<String> repartitionRDD = resultRDD.repartition(4);
System.err.println(repartitionRDD.partitions().size());
repartitionRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.err.println("mapPartitionsWithIndex:"+s);
}
});
}
}
微信掃描下圖二維碼加入博主知識星球,獲取更多大數據、人工智能、算法等免費學習資料哦!
