Spark-Java版本WordCount示例


首先創建Spark的Maven工程,我這里使用的是Eclipse。

1、編寫WordCountApp代碼

package com.mengyao.spark.java.core;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * Spark的WordCount程序
 * @author mengyao
 *
 */
public class WordCountApp {

    public static void main(String[] args) {
        
        /**
         * 1、創建SparkConf對象,設置Spark應用程序的配置信息
         */
        SparkConf conf = new SparkConf()
                //設置Spark應用程序的名稱
                .setAppName(WordCountApp.class.getSimpleName());
      /**
         * 2、創建SparkContext對象,Java開發使用JavaSparkContext;Scala開發使用SparkContext
         * 在Spark中,SparkContext負責連接Spark集群,創建RDD、累積量和廣播量等。
         * Master參數是為了創建TaskSchedule(較低級的調度器,高層次的調度器為DAGSchedule),如下:
         *         如果setMaster("local")則創建LocalSchedule;
         *         如果setMaster("spark")則創建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函數,會啟動一個Client對象,連接到Spark集群。
         */
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        /**
         * 3、sc中提供了textFile方法是SparkContext中定義的,如下:
         *         def textFile(path: String): JavaRDD[String] = sc.textFile(path)    
         * 用來讀取HDFS上的文本文件、集群中節點的本地文本文件或任何支持Hadoop的文件系統上的文本文件,它的返回值是JavaRDD[String],是文本文件每一行
         */
        JavaRDD<String> lines = sc.textFile("hdfs://soy1:9000/mapreduces/word.txt");

        /**
         * 4、將行文本內容拆分為多個單詞
         * lines調用flatMap這個transformation算子(參數類型是FlatMapFunction接口實現類)返回每一行的每個單詞
         */
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
            private static final long serialVersionUID = -3243665984299496473L;
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split("\t"));
            }
        });
        
        /**
         * 5、將每個單詞的初始數量都標記為1個
         * words調用mapToPair這個transformation算子(參數類型是PairFunction接口實現類,PairFunction<String, String, Integer>的三個參數是<輸入單詞, Tuple2的key, Tuple2的value>),返回一個新的RDD,即JavaPairRDD
         */
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = -7879847028195817507L;
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        
        /**
         * 6、計算每個相同單詞出現的次數
         * pairs調用reduceByKey這個transformation算子(參數是Function2接口實現類)對每個key的value進行reduce操作,返回一個JavaPairRDD,這個JavaPairRDD中的每一個Tuple的key是單詞、value則是相同單詞次數的和
         */
        JavaPairRDD<String, Integer> wordCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = -4171349401750495688L;
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        
        /**
         * 7、使用foreach這個action算子提交Spark應用程序
         * 在Spark中,每個應用程序都需要transformation算子計算,最終由action算子觸發作業提交
         */
        wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            private static final long serialVersionUID = -5926812153234798612L;
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1+":"+wordCount._2);
            }
        });
        
        /**
         * 8、將計算結果文件輸出到文件系統
         *         HDFS:
         *             使用新版API(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;)
         *                 wordCount.saveAsNewAPIHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class, TextOutputFormat.class, new Configuration());
         *             使用舊版API(org.apache.hadoop.mapred.JobConf;org.apache.hadoop.mapred.OutputFormat;)
         *                 wordCount.saveAsHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class, OutputFormat.class, new JobConf(new Configuration()));
         *             使用默認TextOutputFile寫入到HDFS(注意寫入HDFS權限,如無權限則執行:hdfs dfs -chmod -R 777 /spark)
         *                 wordCount.saveAsTextFile("hdfs://soy1:9000/spark/wordCount");
         */
        wordCount.saveAsTextFile("hdfs://soy1:9000/spark/wordCount");
        
        /**
         * 9、關閉SparkContext容器,結束本次作業
         */
        sc.close();
        
    }

}

 

2、打成jar包上傳到集群中

 

3、使用spark/bin/spark-submit工具提交Spark應用到集群中(運行模式為yarn-cluster)

bin/spark-submit

--class com.mengyao.spark.java.core.WordCountApp

--master yarn-cluster

--num-executors 3

--driver-memory 512m

--executor-cores 3

/usr/local/apps/spark.java-0.0.1-SNAPSHOT-jar-with-dependencies.jar

 

4、POM文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.mengyao</groupId>
  <artifactId>spark.java</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark.java</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <junit.version>4.10</junit.version>
    <spark.version>1.3.0</spark.version>
    <hadoop.version>2.4.1</hadoop.version>
  </properties>

  <dependencies>
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>${junit.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>${spark.version}</version>
      </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
  </dependencies>
  
  <build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/main/test</testSourceDirectory>
    
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass></mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <executable>java</executable>
          <includeProjectDependencies>true</includeProjectDependencies>
          <includePluginDependencies>false</includePluginDependencies>
          <classpathScope>compile</classpathScope>
          <mainClass>com.mengyao.spark.java.core.WordCountApp</mainClass>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>

    </plugins>
  </build>
</project>

 

SparkSession spark = SparkSession.builder()
.master("local[2]")
.appName("ALSCase")
.getOrCreate();

// 從SparkSession.sparkContext()中創建RDD(使用Java調用ScalaAPI)

JavaRDD<Rating> ratingsRDD = spark.sparkContext()
.parallelize(JavaConverters.asScalaBufferConverter(exampleData).asScala().seq(), 1, ClassManifestFactory.classType(String.class))
.toJavaRDD()
.map(Rating::parseRating);
System.out.println(ratingsRDD.count());


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM