Spark本地運行的幾個實例代碼(Java實現)


Spark本地運行的幾個實例代碼(Java實現)

初學spark,用Java寫了幾個本地運行的spark實例代碼,來記錄一下已學的spark常用的算子的使用和處理邏輯,不涉及分布式集群。相關內容僅為自己的個人理解,如有錯誤還請指出。

實例一:詞頻數統計

問題描述

統計一個文本文件中的每個單詞的出現次數,數據格式:
words.txt

過程分析

首先通過textFile()函數將文件讀入JavaRDD,然后通過flatMap算子將每一行的數據進行分割,得到多個String,一行數據分割得到的多個String以Iterator 的迭代器格式返回,返回之后的Iterator中的每一個String都會作為一個RDD。接着通過mapToPair算子給每一個word添加計數標記1(代表出現1次),該算子返回一個鍵值對RDD。最后通過reduceByKey算子根據相同的key對RDD進行reduce聚合操作,進行統計計數。

代碼

public class SparkWordCount {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        //添加這一行則在本地運行,不添加這一行則默認在集群執行
        conf.setMaster("local");
        conf.setAppName("WordCount");
        //基本的初始化
        JavaSparkContext sc=new JavaSparkContext(conf);
        //創建String類型的RDD,並從本地文件中讀取數據
        JavaRDD<String> fileRDD = sc.textFile("src/main/files/words.txt");//通過文件讀入創建RDD

        //flatMap()算子用來分割操作,將原RDD中的數據分成一個個片段
        //new FlatMapFunction<String, String>中的兩個String分別表示輸入和輸出類型
        JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
            	//通過Iterator迭代器可以將分割后的多個數據元素全部返回輸出
                return Arrays.asList(line.split("\\s+")).iterator();
            }
        });

        //mapToPair()算子是用來對分割后的一個個片段結果添加計數標志的,如出現次數1,該函數用來創建並返回pair類型的RDD. new PairFunction<String, String, Integer>中分別是輸入類型String和輸出類型<String, Integer>.
        JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1); //Tuple2是spark的二元數組類型,Java中沒有
            }
        });

        //reduceByKey()算子是根據key來聚合,reduce階段.new Function2<Integer, Integer, Integer>中分別是用來聚合的兩個輸入類型Integer,Integer和聚合后的輸出類型Integer.
        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        wordCountRDD.saveAsTextFile("E:\\result7");
    }

}

運行結果

在這里插入圖片描述

實例二:統計平均年齡

問題描述

java實現spark統計100萬人口的平均年齡以及每個年齡的出現次數,數據格式為“序號 年齡”
peopleAges.txt
數據生成代碼:

//生成年齡數據,格式“序號  年齡”
    private static void makeAgeData() throws IOException {
        File newFile = new File("src/main/files/peopleAges.txt");
        if (newFile.exists()){
            System.out.println("文件已存在!");
            return;
        }
        newFile.createNewFile();
        FileWriter fw = new FileWriter(newFile,true);
        Random rand = new Random();
        for (int i=1;i<=1000000;i++){
            fw.append(i+"  "+(rand.nextInt(100)+1)+"\n");
            fw.flush();
        }
        fw.close();
    }

過程分析

首先通過textFile()函數將文件數據讀入RDD中。然后使用mapToPair()算子將每一行數據中的年齡作為key並對每一個年齡添加計數標記1作為value,接着使用reduceByKey算子對相同年齡值的數據進行聚合。
求平均年齡時首先要求出年齡和,這也是reduce聚合操作。但是要注意reduce算子只能接收單個數據元素組成的RDD作為輸入,不能接收pair類型的RDD,所以對源文件讀出的RDD先通過map算子輸出只有年齡值數據的RDD,然后進行reduce()。

代碼

public class AvgAge {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("AvgAge");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //剛從文件讀出來的RDD已經是一行一行的字符串,所以可以直接進行mapToPair
        JavaRDD<String> fileRDD = sc.textFile("src/main/files/peopleAges.txt");
        JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(String s) throws Exception {
                return new Tuple2<>(Integer.parseInt(s.split("\\s+")[1]),1);
            }
        });

		//使用reduceByKey算子對具有相同年齡值的數據進行聚合,獲取每個年齡值的人數
        JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        //求平均年齡
        //先通過map算子取出每個年齡值作為一個RDD。
        //reduce()函數的輸入RDD不能是pair,只能是單個數據組成的RDD
        Integer ageSum = fileRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s.split("\\s+")[1]);
            }
        }).reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer s, Integer s2) throws Exception {
                return s+s2;
            }
        });
        System.out.println("平均年齡:"+ageSum/fileRDD.count());

        ageCountRDD.saveAsTextFile("src/main/files/ageAnalysis");

    }

運行結果

在這里插入圖片描述

案例三:統計身高最值

問題描述

統計男女人數,並分別計算出男性和女性的最高和最低身高,數據格式“序號 M/F 身高”
在這里插入圖片描述
數據生成代碼

	//生成性別身高數據,格式“序號  性別(M/F) 身高”
    private static void makeHeightData() throws IOException {
        File newFile = new File("src/main/files/heightData.txt");
        if (newFile.exists()){
            System.out.println("文件已存在!");
            return;
        }
        newFile.createNewFile();
        FileWriter fw = new FileWriter(newFile,true);
        Random rand = new Random();
        for (int i=1;i<=50000;i++){
            fw.append(i+"  M  "+(rand.nextInt(100)+100)+"\n");
            fw.append(i+"  F  "+(rand.nextInt(80)+100)+"\n");
            fw.flush();
        }
        fw.close();
    }

過程分析

首先通過textFile()函數將文件數據讀入RDD。然后使用filter算子分別過濾出男性和女性數據,接着用map算子分割出身高值並將其轉化成Integer類型,這樣才能用於數字排序,然后使用sortBy算子排序。sortBy算子可以直接對RDD中的數據排序,不用區分key還是value。

代碼

public class HeightMaxMin {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("HeightAnalysis");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("src/main/files/heightData.txt");
        //使用filter算子分別過濾出男性和女性數據
        JavaRDD<String> maleRDD = fileRDD.filter(new Function<String, Boolean>() {
            @Override
            //如果這行數據符合過濾條件則返回true
            public Boolean call(String s) throws Exception {
                return s.contains("M");
            }
        });
        JavaRDD<String> femaleRDD = fileRDD.filter(new Function<String, Boolean>() {
            @Override
            //如果這行數據符合過濾條件則返回true
            public Boolean call(String s) throws Exception {
                return s.contains("F");
            }
        });

        //使用map算子分割出身高並轉化為整數類型,這樣才能用排序
        JavaRDD<Integer> maleHeightRDD = maleRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s.split("\\s+")[2]);
            }
        });
        JavaRDD<Integer> femaleHeightRDD = femaleRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s.split("\\s+")[2]);
            }
        });

        //使用sortBy算子排序
        JavaRDD<Integer> sortmaleHeightRDD = (JavaRDD<Integer>) maleHeightRDD.sortBy(new Function<Integer, Object>() {
            @Override
            //返回的是排序的內容,即對輸入RDD中的哪一部分進行排序就輸出哪一部分
            public Object call(Integer integer) throws Exception {
                return integer;
            }
        }, false,10);//false降序true升序,10是partition分區數,因為沒有用集群所以也不太明白這個分區數具體指什么
        JavaRDD<Integer> sortfemaleHeightRDD = (JavaRDD<Integer>) femaleHeightRDD.sortBy(new Function<Integer, Object>() {
            @Override
            public Object call(Integer integer) throws Exception {
                return integer;
            }
        }, false,10);//第二個參數true/false是正序逆序,最后一個參數10是分區數

		//first()函數返回排名第一的數據
        System.out.println("男性: "+sortmaleHeightRDD.count()+"  "+sortmaleHeightRDD.first());
        System.out.println("女性: "+sortfemaleHeightRDD.count()+"  "+sortfemaleHeightRDD.first());

    }
}

運行結果

在這里插入圖片描述
在這里插入圖片描述

案例四:統計單詞頻率

問題描述

統計一段文本里出現頻率最高的前k個詞,注意單詞不分大小寫。

過程分析

首先從文件讀入數據到RDD,然后使用flatMap算子對每一行的數據按照空格進行分割,並將所有的字母都轉為小寫,接着使用mapToPair算子對每一個單詞添加計數標記1,然后使用reduceByKey算子對單詞進行reduce聚合,為了根據key來排序,聚合后再使用mapToPair算子將得到的pair里面的key和value調換一下位置。然后使用sortByKey算子根據key來進行排序,最后使用take算子取出排名前5的數據。

代碼

public class wordTopK {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("wordTopK");
        JavaSparkContext sc=new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("D:\\summer_study\\ppt\\hive.txt");
        //使用flatmap算子對每一行數據按空格分隔,並將所有的字母都轉為小寫
        //注意這里不用map是因為map只能輸出一個數據元素,而flatMap可以在輸入元素后添加任意多元素來輸出,
        // 比如分割后的多個元素組成Iterator來輸出,但是Iterator里的每一個元素依然是獨立的RDD。
        JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.toLowerCase().split("\\s+")).iterator();
            }
        });

        //使用maoToPair算子給每一個word加上計數標記1
        JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });

        //使用reduceByKey算子對word進行reduce聚合,為了根據key來排序,聚合后再將得到的pair里面的key和value調換一下
        JavaPairRDD<Integer, String> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
            }
        });

        //使用sortByKey算子排序,false降序,true升序
        JavaPairRDD<Integer,String> sortWordCountRDD = wordCountRDD.sortByKey(false);

        //使用take算子取出前5
        List<Tuple2<Integer,String>> result = sortWordCountRDD.take(5);
        System.out.println("結果:"+result.toString());
    }
}

運行結果

在這里插入圖片描述

一些總結:

  1. RDD就是spark中專用的一種數據格式,代表一種抽象數據類型,spark中的數據都是存在不同類型的RDD中,如JavaRDD<String>,JavaPairRDD<String,Integer>等。
  2. textFile讀文件生成的RDD可以理解成源文件中一行一行的數據,每一行的數據就是一個JavaRDD .
  3. flatMap算子和map算子的區別:map算子就是將源JavaRDD的一個一個元素的傳入call方法,並經過算法后一個一個的返回從而生成一個新的JavaRDD,注意call返回的數據只能是單個數據元素。 flatMap與map一樣,是將RDD中的元素依次的傳入call方法,他比map多的功能是能在任何一個傳入call方法的元素后面添加任意多元素,而能達到這一點,正是因為其進行傳參是依次進行的。比如分割后的多個元素組成Iterator來輸出,但是Iterator里的每一個元素依然是獨立的RDD,這個Iterator只能由flatMap算子輸出,map算子不可以,因為它只能輸出單個數據元素。

運行環境

ide:Idea
jdk:1.8.0_121
spark:2.1.1
附maven配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ffxn</groupId>
    <artifactId>ffxn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
            <!--<scope>provided</scope>-->
        </dependency>

    </dependencies>

</project>

注:以上僅為個人學習記錄,spark初學菜鳥,如有錯誤,敬請提出。
參考:
Spark 入門實戰之最好的實例
Spark學習之路


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM