scala版:
object WordCountDemo { def main(args: Array[String]): Unit = { //设置log级别 Logger.getLogger("org").setLevel(Level.WARN) val conf = new SparkConf().setAppName("WordCountDemo").setMaster("local") val sc = new SparkContext(conf) sc.textFile("hdfs://hadoop001:9000/in/word") .flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect().foreach(println(_)) sc.stop() } }
java版:
public class WordCountDemo { public static void main(String[] args) { //log Logger.getLogger("org").setLevel(Level.WARN); //配置 SparkConf conf = new SparkConf().setAppName("WordCountDemo").setMaster("local"); //spark上下文 JavaSparkContext jsc = new JavaSparkContext(conf); //创建 JavaRDD<String> linesRDD = jsc.textFile("hdfs://hadoop001:9000/in/word"); //flatMap操作 JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) throws Exception { String[] words = line.split(" "); return Arrays.asList(words); } }); //map操作 JavaPairRDD<String, Integer> tupleRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word,1); } }); //reduceByKey操作 JavaPairRDD<String, Integer> resultRDD = tupleRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //拉回driver端 List<Tuple2<String, Integer>> result = resultRDD.collect(); //遍历 for(Tuple2<String, Integer> tuple2 : result){ System.out.println(tuple2._1 +"="+ tuple2._2); } jsc.stop(); } }