Spark WordCount的两种方式。
语言:Java
工具:Idea
项目:Java Maven
pom.xml如下:
<properties>
<spark.version>1.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
第一种方式,比较常规的按部就班的
package pairs; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; public class WordCount1 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount1"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDD<String> input = sc.textFile(filename); JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); //pairs
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); //reduce
JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //output
counts.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> tuple2) throws Exception { System.out.println(tuple2); } }); sc.stop(); } }
代码输出:
(rose,2) (jack,3)
第二种更为简洁
package pairs; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.Arrays; import java.util.Map; public class WordCount2 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount2"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDD<String> input = sc.textFile(filename); JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); Map<String,Long> result = lines.countByValue(); System.out.println(result); sc.stop(); } }
代码输出:
{rose=2, jack=3}
通过对比可以发现,第一种方式一直都是转化操作,最后打印的是Tuple2;而第二种方式变成了行动操作,直接输出Map<String,Long>。
具体有什么区别,或者效率上有啥不同,待后续深入学习。
参考资料:
《Spark快速大数据分析》