Spark WordCount的兩種方式


Spark WordCount的兩種方式。

語言:Java

工具:Idea

項目:Java Maven

pom.xml如下:

<properties>
        <spark.version>1.2.0</spark.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

第一種方式,比較常規的按部就班的

package pairs; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; public class WordCount1 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount1"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDD<String> input = sc.textFile(filename); JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); //pairs
        JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); //reduce
        JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //output
        counts.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> tuple2) throws Exception { System.out.println(tuple2); } }); sc.stop(); } }

代碼輸出:

(rose,2) (jack,3)

 

第二種更為簡潔

package pairs; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.Arrays; import java.util.Map; public class WordCount2 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount2"); JavaSparkContext sc = new JavaSparkContext(conf); String filename = "D:\\tmp\\words.txt"; JavaRDD<String> input = sc.textFile(filename); JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); Map<String,Long> result = lines.countByValue(); System.out.println(result); sc.stop(); } }

代碼輸出:

{rose=2, jack=3}

通過對比可以發現,第一種方式一直都是轉化操作,最后打印的是Tuple2;而第二種方式變成了行動操作,直接輸出Map<String,Long>。

具體有什么區別,或者效率上有啥不同,待后續深入學習。

 

參考資料:

《Spark快速大數據分析》

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM