Window7 開發 Spark 應用(JAVA版本)


WordCount是大數據學習最好的入門demo,今天就一起開發java版本的WordCount,然后提交到Spark3.0.0環境運行;

 

版本信息

OS: Window7

JAVA:1.8.0_181

Hadoop:3.2.1

Spark: 3.0.0-preview2-bin-hadoop3.2

IDE: IntelliJ IDEA 2019.2.4 x64

 

服務器搭建

Hadoop:CentOS7 部署 Hadoop 3.2.1 (偽分布式)

Spark:CentOS7 安裝 Spark3.0.0-preview2-bin-hadoop3.2 

 

示例源碼下載

Spark分詞應用開發示例代碼

應用開發

1. 本地新建一個Spark項目,POM.xml 內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.phpdragon</groupId>
    <artifactId>spark-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <spark.version>2.4.5</spark.version>
        <spark.scala.version>2.12</spark.scala.version>
    </properties>

    <dependencies>
        <!-- Spark dependency Start -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.github.fommil.netlib</groupId>
            <artifactId>all</artifactId>
            <version>1.1.2</version>
            <type>pom</type>
        </dependency>
        <!-- Spark dependency End -->

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>false</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.phpragon.spark.WordCount</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

 

2. 編寫分詞統計代碼:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @Description: Spark的分詞統計
 * @author: phpdragon@qq.com
 * @date: 2020/03/30 17:21
 */
@Slf4j
public class WordCount {

    public static void main(String[] args) {
        if(null==args
        || args.length<3
        || StringUtils.isEmpty(args[0])
        || StringUtils.isEmpty(args[1])
        || StringUtils.isEmpty(args[2])) {
            log.error("invalid params!");
        }

        String hdfsHost = args[0];
        String hdfsPort = args[1];
        String textFileName = args[2];

 //       String hdfsHost = "172.16.1.126";
 //       String  hdfsPort = "9000";
 //       String textFileName = "test.txt";

        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application(Java)");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://" + hdfsHost + ":" + hdfsPort;

        //文本文件的hdfs路徑
        String inputPath = hdfsBasePath + "/input/" + textFileName;

        //輸出結果文件的hdfs路徑
        String outputPath = hdfsBasePath + "/output/" + new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());

        log.info("input path : {}", inputPath);
        log.info("output path : {}", outputPath);

        log.info("import text");
        //導入文件
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

        log.info("do map operation");
        JavaPairRDD<String, Integer> counts = textFile
                //每一行都分割成單詞,返回后組成一個大集合
                .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
                //key是單詞,value是1
                .mapToPair(word -> new Tuple2<>(word, 1))
                //基於key進行reduce,邏輯是將value累加
                .reduceByKey((a, b) -> a + b);

        log.info("do convert");
        //先將key和value倒過來,再按照key排序
        JavaPairRDD<Integer, String> sorts = counts
                //key和value顛倒,生成新的map
                .mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1()))
                //按照key倒排序
                .sortByKey(false);

        log.info("take top 10");

        //取前10個
        List<Tuple2<Integer, String>> top10 = sorts.take(10);

        StringBuilder sbud = new StringBuilder("top 10 word :\n");

        //打印出來
        for(Tuple2<Integer, String> tuple2 : top10){
            sbud.append(tuple2._2())
                .append("\t")
                .append(tuple2._1())
                .append("\n");
        }

        log.info(sbud.toString());
        System.out.println(sbud.toString());

        log.info("merge and save as file");
        //分區合並成一個,再導出為一個txt保存在hdfs
        javaSparkContext.parallelize(top10).coalesce(1).saveAsTextFile(outputPath);

        log.info("close context");

        //關閉context
        javaSparkContext.close();
    }
}

 

 

3. 調整日志顯示級別

Spark自帶的輸出日志太多了,略煩,那么還可以修改輸出的級別限制輸出,主要是把log4j.rootCategory=INFO, console改為log4j.rootCategory=WARN, console即可抑制Spark把INFO級別的日志打到控制台上。
而如果要顯示更全面的信息,可以把INFO改為DEBUG。
log4j.properties內如如下:
log4j.rootLogger=${root.logger}
root.logger=WARN,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
shell.log.level=WARN
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
log4j.logger.org.apache.spark.repl.Main=${shell.log.level}
log4j.logger.org.apache.spark.api.python.PythonGatewayServer=${shell.log.level}

 

這個文件需要放到程序能自動讀取加載的地方,比如resources目錄下:

 

服務端調試

1. 在Hadoop服務器上新建目錄 input、output、spark/history

/data/server/hadoop/3.2.1/bin/hdfs dfs -mkdir /input 
/data/server/hadoop/3.2.1/bin/hdfs dfs -mkdir /output
/data/server/hadoop/3.2.1/bin/hdfs dfs -mkdir /spark
/data/server/hadoop/3.2.1/bin/hdfs dfs -mkdir /spark/history

 

2.上傳測試文本至Hadoop服務上:

/data/server/hadoop/3.2.1/bin/hdfs dfs -put ~/data/server/hadoop/3.2.1/LICENSE.txt /input/test.txt

 

3.編譯打包后代碼,上傳 spark-example-1.0-SNAPSHOT.jar 文件至Spark服務。執行下面的命令,命令的最后三個參數,是java的main方法的入參,具體的使用請參照WordCount類的源碼:

/home/data/server/spark/3.0.0-preview2-bin-hadoop3.2/bin/spark-submit \
--master spark://172.16.1.126:7077 \
--class com.phpragon.spark.WordCount \
--executor-memory 512m \
--total-executor-cores 2 \
./spark-example-1.0-SNAPSHOT.jar \
172.16.1.126 \
9000 \
test.txt

執行結果:

 

4.在hadoop服務器執行查看文件的命令,可見/output下新建了子目錄 20200330_172721:

[root@localhost spark]# hdfs dfs -ls /output
Found 1 items
drwxr-xr-x   - Administrator supergroup          0 2020-03-30 05:27 /output/20200330_172721

 

5.查看子目錄,發現里面有兩個文件:

[root@localhost spark]# hdfs dfs -ls /output/20200330_172721
Found 2 items
-rw-r--r-- 3 Administrator supergroup 0 2020-03-30 05:27 /output/20200330_172721/_SUCCESS
-rw-r--r-- 3 Administrator supergroup 93 2020-03-30 05:27 /output/20200330_172721/part-00000

 

上面看到的 /output/20200330_172721/part-00000就是輸出結果,用cat命令查看其內容:

[root@localhost spark]# hdfs dfs -cat /output/20200330_172721/part-00000
(4149,)
(1208,the)
(702,of)
(512,or)
(481,to)
(409,and)
(308,this)
(305,in)
(277,a)
(251,OR)

可見與前面控制台輸出的一致;

 

6. 在Spark的web頁面,可見剛剛執行的任務信息:

 

至此,第一個spark應用的開發和運行就完成了。但時間開發情況下不可能每次都編譯打包提交運行,這樣效率太低,不建議這樣開發程序。

 

本地調試

1.增加紅色部分代碼,設置為本地模式 。

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark WordCount Application(Java)");

 

2. 右鍵執行后報錯:

20/03/30 16:35:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/30 16:35:57 ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

出現這個問題的原因是我們在windows上模擬開發環境,但並沒有真正的搭建hadoop和spark

 

解決辦法:當然也並不需要我們真的去搭建hadoop,其實不用理它也是可以運行下去的。winutils.exe下載,鏈接:https://pan.baidu.com/s/1YZDqd_MkOgnfQT3YM-V3aQ  提取碼:xi44 

放到任意的目錄下,我這里是放到了D:\Server\hadoop\3.2.1\bin 目錄下:

 

重啟電腦后,右鍵執行main方法:

 

 

PS:

官方手冊

第一個spark應用開發詳解(java版)

編程指南—の—詳解加實踐

Spark spark-submit 提交的幾種模式

https://www.cnblogs.com/dhName/p/10579045.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM