Spark本地運行的幾個實例代碼(Java實現)
初學spark,用Java寫了幾個本地運行的spark實例代碼,來記錄一下已學的spark常用的算子的使用和處理邏輯,不涉及分布式集群。相關內容僅為自己的個人理解,如有錯誤還請指出。
實例一:詞頻數統計
問題描述
統計一個文本文件中的每個單詞的出現次數,數據格式:
過程分析
首先通過textFile()函數將文件讀入JavaRDD,然后通過flatMap算子將每一行的數據進行分割,得到多個String,一行數據分割得到的多個String以Iterator
代碼
public class SparkWordCount {
public static void main(String[] args){
SparkConf conf = new SparkConf();
//添加這一行則在本地運行,不添加這一行則默認在集群執行
conf.setMaster("local");
conf.setAppName("WordCount");
//基本的初始化
JavaSparkContext sc=new JavaSparkContext(conf);
//創建String類型的RDD,並從本地文件中讀取數據
JavaRDD<String> fileRDD = sc.textFile("src/main/files/words.txt");//通過文件讀入創建RDD
//flatMap()算子用來分割操作,將原RDD中的數據分成一個個片段
//new FlatMapFunction<String, String>中的兩個String分別表示輸入和輸出類型
JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
//通過Iterator迭代器可以將分割后的多個數據元素全部返回輸出
return Arrays.asList(line.split("\\s+")).iterator();
}
});
//mapToPair()算子是用來對分割后的一個個片段結果添加計數標志的,如出現次數1,該函數用來創建並返回pair類型的RDD. new PairFunction<String, String, Integer>中分別是輸入類型String和輸出類型<String, Integer>.
JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1); //Tuple2是spark的二元數組類型,Java中沒有
}
});
//reduceByKey()算子是根據key來聚合,reduce階段.new Function2<Integer, Integer, Integer>中分別是用來聚合的兩個輸入類型Integer,Integer和聚合后的輸出類型Integer.
JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
wordCountRDD.saveAsTextFile("E:\\result7");
}
}
運行結果
實例二:統計平均年齡
問題描述
java實現spark統計100萬人口的平均年齡以及每個年齡的出現次數,數據格式為“序號 年齡”
數據生成代碼:
//生成年齡數據,格式“序號 年齡”
private static void makeAgeData() throws IOException {
File newFile = new File("src/main/files/peopleAges.txt");
if (newFile.exists()){
System.out.println("文件已存在!");
return;
}
newFile.createNewFile();
FileWriter fw = new FileWriter(newFile,true);
Random rand = new Random();
for (int i=1;i<=1000000;i++){
fw.append(i+" "+(rand.nextInt(100)+1)+"\n");
fw.flush();
}
fw.close();
}
過程分析
首先通過textFile()函數將文件數據讀入RDD中。然后使用mapToPair()算子將每一行數據中的年齡作為key並對每一個年齡添加計數標記1作為value,接着使用reduceByKey算子對相同年齡值的數據進行聚合。
求平均年齡時首先要求出年齡和,這也是reduce聚合操作。但是要注意reduce算子只能接收單個數據元素組成的RDD作為輸入,不能接收pair類型的RDD,所以對源文件讀出的RDD先通過map算子輸出只有年齡值數據的RDD,然后進行reduce()。
代碼
public class AvgAge {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("AvgAge");
JavaSparkContext sc = new JavaSparkContext(conf);
//剛從文件讀出來的RDD已經是一行一行的字符串,所以可以直接進行mapToPair
JavaRDD<String> fileRDD = sc.textFile("src/main/files/peopleAges.txt");
JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(String s) throws Exception {
return new Tuple2<>(Integer.parseInt(s.split("\\s+")[1]),1);
}
});
//使用reduceByKey算子對具有相同年齡值的數據進行聚合,獲取每個年齡值的人數
JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
//求平均年齡
//先通過map算子取出每個年齡值作為一個RDD。
//reduce()函數的輸入RDD不能是pair,只能是單個數據組成的RDD
Integer ageSum = fileRDD.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return Integer.parseInt(s.split("\\s+")[1]);
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer s, Integer s2) throws Exception {
return s+s2;
}
});
System.out.println("平均年齡:"+ageSum/fileRDD.count());
ageCountRDD.saveAsTextFile("src/main/files/ageAnalysis");
}
運行結果
案例三:統計身高最值
問題描述
統計男女人數,並分別計算出男性和女性的最高和最低身高,數據格式“序號 M/F 身高”
數據生成代碼
//生成性別身高數據,格式“序號 性別(M/F) 身高”
private static void makeHeightData() throws IOException {
File newFile = new File("src/main/files/heightData.txt");
if (newFile.exists()){
System.out.println("文件已存在!");
return;
}
newFile.createNewFile();
FileWriter fw = new FileWriter(newFile,true);
Random rand = new Random();
for (int i=1;i<=50000;i++){
fw.append(i+" M "+(rand.nextInt(100)+100)+"\n");
fw.append(i+" F "+(rand.nextInt(80)+100)+"\n");
fw.flush();
}
fw.close();
}
過程分析
首先通過textFile()函數將文件數據讀入RDD。然后使用filter算子分別過濾出男性和女性數據,接着用map算子分割出身高值並將其轉化成Integer類型,這樣才能用於數字排序,然后使用sortBy算子排序。sortBy算子可以直接對RDD中的數據排序,不用區分key還是value。
代碼
public class HeightMaxMin {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("HeightAnalysis");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> fileRDD = sc.textFile("src/main/files/heightData.txt");
//使用filter算子分別過濾出男性和女性數據
JavaRDD<String> maleRDD = fileRDD.filter(new Function<String, Boolean>() {
@Override
//如果這行數據符合過濾條件則返回true
public Boolean call(String s) throws Exception {
return s.contains("M");
}
});
JavaRDD<String> femaleRDD = fileRDD.filter(new Function<String, Boolean>() {
@Override
//如果這行數據符合過濾條件則返回true
public Boolean call(String s) throws Exception {
return s.contains("F");
}
});
//使用map算子分割出身高並轉化為整數類型,這樣才能用排序
JavaRDD<Integer> maleHeightRDD = maleRDD.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return Integer.parseInt(s.split("\\s+")[2]);
}
});
JavaRDD<Integer> femaleHeightRDD = femaleRDD.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return Integer.parseInt(s.split("\\s+")[2]);
}
});
//使用sortBy算子排序
JavaRDD<Integer> sortmaleHeightRDD = (JavaRDD<Integer>) maleHeightRDD.sortBy(new Function<Integer, Object>() {
@Override
//返回的是排序的內容,即對輸入RDD中的哪一部分進行排序就輸出哪一部分
public Object call(Integer integer) throws Exception {
return integer;
}
}, false,10);//false降序true升序,10是partition分區數,因為沒有用集群所以也不太明白這個分區數具體指什么
JavaRDD<Integer> sortfemaleHeightRDD = (JavaRDD<Integer>) femaleHeightRDD.sortBy(new Function<Integer, Object>() {
@Override
public Object call(Integer integer) throws Exception {
return integer;
}
}, false,10);//第二個參數true/false是正序逆序,最后一個參數10是分區數
//first()函數返回排名第一的數據
System.out.println("男性: "+sortmaleHeightRDD.count()+" "+sortmaleHeightRDD.first());
System.out.println("女性: "+sortfemaleHeightRDD.count()+" "+sortfemaleHeightRDD.first());
}
}
運行結果
案例四:統計單詞頻率
問題描述
統計一段文本里出現頻率最高的前k個詞,注意單詞不分大小寫。
過程分析
首先從文件讀入數據到RDD,然后使用flatMap算子對每一行的數據按照空格進行分割,並將所有的字母都轉為小寫,接着使用mapToPair算子對每一個單詞添加計數標記1,然后使用reduceByKey算子對單詞進行reduce聚合,為了根據key來排序,聚合后再使用mapToPair算子將得到的pair里面的key和value調換一下位置。然后使用sortByKey算子根據key來進行排序,最后使用take算子取出排名前5的數據。
代碼
public class wordTopK {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("wordTopK");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String> fileRDD = sc.textFile("D:\\summer_study\\ppt\\hive.txt");
//使用flatmap算子對每一行數據按空格分隔,並將所有的字母都轉為小寫
//注意這里不用map是因為map只能輸出一個數據元素,而flatMap可以在輸入元素后添加任意多元素來輸出,
// 比如分割后的多個元素組成Iterator來輸出,但是Iterator里的每一個元素依然是獨立的RDD。
JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.toLowerCase().split("\\s+")).iterator();
}
});
//使用maoToPair算子給每一個word加上計數標記1
JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
//使用reduceByKey算子對word進行reduce聚合,為了根據key來排序,聚合后再將得到的pair里面的key和value調換一下
JavaPairRDD<Integer, String> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
}
});
//使用sortByKey算子排序,false降序,true升序
JavaPairRDD<Integer,String> sortWordCountRDD = wordCountRDD.sortByKey(false);
//使用take算子取出前5
List<Tuple2<Integer,String>> result = sortWordCountRDD.take(5);
System.out.println("結果:"+result.toString());
}
}
運行結果
一些總結:
- RDD就是spark中專用的一種數據格式,代表一種抽象數據類型,spark中的數據都是存在不同類型的RDD中,如JavaRDD<String>,JavaPairRDD<String,Integer>等。
- textFile讀文件生成的RDD可以理解成源文件中一行一行的數據,每一行的數據就是一個JavaRDD
. - flatMap算子和map算子的區別:map算子就是將源JavaRDD的一個一個元素的傳入call方法,並經過算法后一個一個的返回從而生成一個新的JavaRDD,注意call返回的數據只能是單個數據元素。 flatMap與map一樣,是將RDD中的元素依次的傳入call方法,他比map多的功能是能在任何一個傳入call方法的元素后面添加任意多元素,而能達到這一點,正是因為其進行傳參是依次進行的。比如分割后的多個元素組成Iterator來輸出,但是Iterator里的每一個元素依然是獨立的RDD,這個Iterator只能由flatMap算子輸出,map算子不可以,因為它只能輸出單個數據元素。
運行環境
ide:Idea
jdk:1.8.0_121
spark:2.1.1
附maven配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ffxn</groupId>
<artifactId>ffxn</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>
注:以上僅為個人學習記錄,spark初學菜鳥,如有錯誤,敬請提出。
參考:
Spark 入門實戰之最好的實例
Spark學習之路