spark的RDD操作
在上一節Spark經典的單詞統計中,了解了幾個RDD操作,包括flatMap,map,reduceByKey,以及后面簡化的方案,countByValue。那么這一節將介紹更多常用的RDD操作,並且為每一種RDD我們分解來看其運作的情況。
spark的flatMap
flatMap,有着一對多的表現,輸入一輸出多。並且會將每一個輸入對應的多個輸出整合成一個大的集合,當然不用擔心這個集合會超出內存的范圍,因為spark會自覺地將過多的內容溢寫到磁盤。當然如果對運行的機器的內存有着足夠的信心,也可以將內容存儲到內存中。
為了更好地理解flatMap,我們將舉一個例子來說明。當然和往常一樣,會准備好例子對應的數據文本,文本名稱為uv.txt,該文本和示例程序可以從github上下載。以下會用三種語言:scala、java、python去描述,同時在java中會對比采用java和java8來實現各個例子。其中java和scala程序在github能直接下載,而python則暫時不提供,后續會補上。
scala實現
import org.apache.spark.{SparkConf, SparkContext}
object SparkFlatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap") val sc = new SparkContext(conf) //設置數據路徑 val textData = sc.textFile("./uv.txt") //輸出處理前總行數 println("before:"+textData.count()+"行") //輸出處理前第一行數據 println("first line:"+textData.first()) //進行flatMap處理 val flatData = textData.flatMap(line => line.split(" ")) //輸出處理后總行數 println("after:"+flatData.count()) //輸出處理后第一行數據 println("first line:"+flatData.first()) //將結果保存在flatResultScala文件夾中 flatData.saveAsTextFile("./flatResultScala") } } 復制代碼
java實現
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.Iterator;
public class SparkFlatMapJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); JavaSparkContext sc = new JavaSparkContext(conf); //java實現 flatMapJava(sc); //java8實現 flatMapJava8(sc); } public static void flatMapJava(JavaSparkContext sc){ //設置數據路徑 JavaRDD<String> textData = sc.textFile("./uv.txt"); //輸出處理前總行數 System.out.println("before:"+textData.count()+"行"); //輸出處理前第一行數據 System.out.println("first line:"+textData.first()+"行"); //進行flatMap處理 JavaRDD<String> flatData = textData.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //輸出處理后總行數 System.out.println("after:"+flatData.count()+"行"); //輸出處理后第一行數據 System.out.println("first line:"+flatData.first()+"行"); //將結果保存在flatResultScala文件夾中 flatData.saveAsTextFile("./flatResultJava"); } public static void flatMapJava8(JavaSparkContext sc){ sc.textFile("./uv.txt") .flatMap(line -> Arrays.asList(line.split(" ")).iterator()) .saveAsTextFile("./flatResultJava8"); } } 復制代碼
python實現
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("FlatMapPython") sc = SparkContext(conf=conf) textData = sc.textFile("./uv.txt") print("before:"+str(textData.count())+"行") print("first line"+textData.first()) flatData = textData.flatMap(lambda line:line.split(" ")) print("after:"+str(flatData.count())+"行") print("first line"+flatData.first()) flatData.saveAsTextFile("./resultFlatMap") 復制代碼
運行任意程序,得到相同結果
before:86400行
first line:2015-08-24_00:00:00 55311 buy
after:259200
first line:2015-08-24_00:00:00
復制代碼
查看文件

很顯然每一行都按照空格拆分成了三行,因此總行數是拆分前的三倍,第一行的內容只剩下原第一行的第一個數據,時間。這樣flatMap的作用就很明顯了。
spark的map
用同樣的方法來展示map操作,與flatMap不同的是,map通常是一對一,即輸入一個,對應輸出一個。但是輸出的結果可以是一個元組,一個元組則可能包含多個數據,但是一個元組是一個整體,因此算是一個元素。這里注意到在輸出的結果是元組時,scala和python能夠很正常處理,而在java中則有一點不同。
scala實現
import org.apache.spark.{SparkConf, SparkContext}
object SparkMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap") val sc = new SparkContext(conf) val textData = sc.textFile("./uv.txt") //得到一個最后一個操作值,前面的時間和次數舍棄 val mapData1 = textData.map(line => line.split(" ")(2)) println(mapData1.count()) println(mapData1.first()) mapData1.saveAsTextFile("./resultMapScala") //得到一個最后兩個值,前面的時間舍棄 val mapData2 = textData.map(line => (line.split(" ")(1),line.split(" ")(2))) println(mapData2.count()) println(mapData2.first()) //將所有值存到元組中去 val mapData3 = textData.map(line => (line.split(" ")(1),line.split(" ")(1),line.split(" ")(2))) println(mapData3.count()) println(mapData3.first()) } } 復制代碼
java實現
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.codehaus.janino.Java;
import scala.Tuple2;
import scala.Tuple3;
public class SparkMapJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); JavaSparkContext sc = new JavaSparkContext(conf); //java實現 mapJava(sc); //java8實現 mapJava8(sc); } public static void mapJava(JavaSparkContext sc){ JavaRDD<String> txtData = sc.textFile("./uv.txt"); //保留最后一個值 JavaRDD<String> mapData1 = txtData.map(new Function<String, String>() { @Override public String call(String s) throws Exception { return s.split(" ")[2]; } }); System.out.println(mapData1.count()); System.out.println(mapData1.first()); //保留最后兩個值 JavaRDD<Tuple2<String,String>> mapData2 = txtData.map(new Function<String, Tuple2<String,String>>() { @Override public Tuple2<String,String> call(String s) throws Exception { return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]); } }); System.out.println(mapData2.count()); System.out.println(mapData2.first()); //保留最后三個值 JavaRDD<Tuple3<String,String,String>> mapData3 = txtData.map(new Function<String, Tuple3<String,String,String>>() { @Override public Tuple3<String,String,String> call(String s) throws Exception { return new Tuple3<>(s.split(" ")[0],s.split(" ")[1],s.split(" ")[2]); } }); System.out.println(mapData2.count()); System.out.println(mapData2.first()); } public static void mapJava8(JavaSparkContext sc){ JavaRDD<String> mapData1 = sc.textFile("./uv.txt").map(line -> line.split(" ")[2]); System.out.println(mapData1.count()); System.out.println(mapData1.first()); JavaRDD<Tuple2<String,String>> mapData2 = sc.textFile("./uv.txt").map(line -> new Tuple2<String, String>(line.split(" ")[1],line.split(" ")[2])); System.out.println(mapData2.count()); System.out.println(mapData2.first()); JavaRDD<Tuple3<String,String,String>> mapData3 = sc.textFile("./uv.txt").map(line -> new Tuple3<String, String, String>(line.split(" ")[0],line.split(" ")[1],line.split(" ")[2])); System.out.println(mapData3.count()); System.out.println(mapData3.first()); } } 復制代碼
python實現
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("FlatMapPython") sc = SparkContext(conf=conf) textData = sc.textFile("./uv.txt") mapData1 = textData.map(lambda line : line.split(" ")[2]) print(mapData1.count()) print(mapData1.first()) mapData2 = textData.map(lambda line : (line.split(" ")[1],line.split(" ")[2])) print(mapData2.count()) print(mapData2.first()) mapData3 = textData.map(lambda line : (line.split(" ")[0],line.split(" ")[1],line.split(" ")[2])) print(mapData3.count()) print(mapData3.first()) 復制代碼
運行任意程序,得到相同結果
86400
buy
86400
(55311,buy)
86400
(55311,55311,buy)
復制代碼
Java中獨有的mapToPair
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SparkMapToPair {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); JavaSparkContext sc = new JavaSparkContext(conf); mapToPairJava(sc); mapToPairJava8(sc); } public static void mapToPairJava(JavaSparkContext sc){ JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]); } }); System.out.println(pairRDD.count()); System.out.println(pairRDD.first()); } public static void mapToPairJava8(JavaSparkContext sc){ JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(line -> new Tuple2<>(line.split(" ")[1],line.split(" ")[2])); System.out.println(pairRDD.count()); System.out.println(pairRDD.first()); } } 復制代碼
運行得到結果
86400
(55311,buy)
復制代碼
顯然我們發現這個結果,和用map處理保留后兩個的結果是一致的。靈活使用map、flatMap、mapToPair將非常重要,后面還將有運用多種操作去處理復雜的數據。以上所有程序的代碼都能夠在GitHub上下載