import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* groupbykey([numTasks]) 算子:
* 將rdd中的算子按照key進行分組操作,所有的key對應的是一個iterable
* 可以設置並行度,如果沒有設置並行默認與父RDD保持一直,也就是父RDD有多少partitions,它的並行度就是多少
* 是對RDD中的所有數據做shuffle,根據不同的Key映射到不同的partition中再進行aggregate
*/
public class GroupByKeyOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("groupbykey");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String,String>> list = Arrays.asList(
new Tuple2("W1","1"),
new Tuple2("W2","2"),
new Tuple2("W3","3"),
new Tuple2("W2","22"),
new Tuple2("W1","11")
);
JavaPairRDD<String,String> listRdd = sc.parallelizePairs(list,2);
JavaPairRDD<String,Iterable<String>> result = listRdd.groupByKey(3);
result.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
@Override
public void call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
System.err.println(stringIterableTuple2._1+":"+stringIterableTuple2._2);
}
});
}
}
微信掃描下圖二維碼加入博主知識星球,獲取更多大數據、人工智能、算法等免費學習資料哦!