使用Spark 對以下內容進行詞頻統計 (使用Java語言)
hello world
hello java
hello cnblogs
代碼如下:
/** * Spark基於Java Api的詞頻統計 */ public class WordCountByJava { public static void main(String[] args) { // 初始化 SparkConf setAppName:設置應用名稱 setMaster:設置運行模式 SparkConf conf = new SparkConf().setAppName("WORDCOUNT").setMaster("local"); // 初始化 SparkContext對象 JavaSparkContext jsc = new JavaSparkContext(conf); // 使用SparkContext對象讀取文件,存為JavaRdd JavaRDD<String> dataRdd = jsc.textFile("G:\\test\\wc\\a.txt"); // 使用flatMap函數對原始Rdd進行轉換 按空格進行拆分,保存為集合 JavaRDD<String> flatMapRdd = dataRdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String s) throws Exception { // 拆分字符串 為一個數組 String[] word = s.split(" "); // 把數組轉換成List集合 List<String> list = Arrays.asList(word); // 把list集合轉換成Iterator集合 Iterator<String> it = list.iterator(); return it; } }); // 使用mapToPair進行map操作 形如: (word,1) JavaPairRDD<String, Integer> mapRdd = flatMapRdd.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); // 使用reduceByKey進行單詞統計 返回 (word,CountSum) JavaPairRDD<String, Integer> res = mapRdd.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); // 把最后的 rdd輸出 res.foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> tuple2) throws Exception { System.out.println(tuple2._1+" "+tuple2._2); } }); } }
