Transformation算子
基本的初始化
java
static SparkConf conf = null; static JavaSparkContext sc = null; static { conf = new SparkConf(); conf.setMaster("local").setAppName("TestTransformation"); sc = new JavaSparkContext(conf); }
scala
private val conf: SparkConf = new SparkConf().setAppName("TestTransformation").setMaster("local") private val sparkContext = new SparkContext(conf)
一、map、flatMap、mapParations、mapPartitionsWithIndex
1.1 map
(1) 使用Java7進行編寫
map十分容易理解,他是將源JavaRDD的一個一個元素的傳入call方法,並經過算法后一個一個的返回從而生成一個新的JavaRDD。
public static void map(){ //String[] names = {"張無忌","趙敏","周芷若"}; List<String> list = Arrays.asList("張無忌","趙敏","周芷若"); System.out.println(list.size()); JavaRDD<String> listRDD = sc.parallelize(list); JavaRDD<String> nameRDD = listRDD.map(new Function<String, String>() { @Override public String call(String name) throws Exception { return "Hello " + name; } }); nameRDD.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2) 使用Java8編寫
public static void map(){ String[] names = {"張無忌","趙敏","周芷若"}; List<String> list = Arrays.asList(names); JavaRDD<String> listRDD = sc.parallelize(list); JavaRDD<String> nameRDD = listRDD.map(name -> { return "Hello " + name; }); nameRDD.foreach(name -> System.out.println(name)); }
(3) 使用scala進行編寫
def map(): Unit ={ val list = List("張無忌", "趙敏", "周芷若") val listRDD = sc.parallelize(list) val nameRDD = listRDD.map(name => "Hello " + name) nameRDD.foreach(name => println(name)) }
(4) 運行結果
(5) 總結
可以看出,對於map算子,源JavaRDD的每個元素都會進行計算,由於是依次進行傳參,所以他是有序的,新RDD的元素順序與源RDD是相同的。而由有序又引出接下來的flatMap。
1.2 flatMap
(1) 使用Java7進行編寫
flatMap與map一樣,是將RDD中的元素依次的傳入call方法,他比map多的功能是能在任何一個傳入call方法的元素后面添加任意多元素,而能達到這一點,正是因為其進行傳參是依次進行的。
public static void flatMap(){ List<String> list = Arrays.asList("張無忌 趙敏","宋青書 周芷若"); JavaRDD<String> listRDD = sc.parallelize(list); JavaRDD<String> nameRDD = listRDD .flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }) .map(new Function<String, String>() { @Override public String call(String name) throws Exception { return "Hello " + name; } }); nameRDD.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2) 使用Java8進行編寫
public static void flatMap(){ List<String> list = Arrays.asList("張無忌 趙敏","宋青書 周芷若"); JavaRDD<String> listRDD = sc.parallelize(list); JavaRDD<String> nameRDD = listRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator()) .map(name -> "Hello " + name); nameRDD.foreach(name -> System.out.println(name)); }
(3) 使用scala進行編寫
def flatMap(): Unit ={ val list = List("張無忌 趙敏","宋青書 周芷若") val listRDD = sc.parallelize(list) val nameRDD = listRDD.flatMap(line => line.split(" ")).map(name => "Hello " + name) nameRDD.foreach(name => println(name)) }
(4) 運行結果
(5) 總結
flatMap的特性決定了這個算子在對需要隨時增加元素的時候十分好用,比如在對源RDD查漏補缺時。
map和flatMap都是依次進行參數傳遞的,但有時候需要RDD中的兩個元素進行相應操作時(例如:算存款所得時,下一個月所得的利息是要原本金加上上一個月所得的本金的),這兩個算子便無法達到目的了,這是便需要mapPartitions算子,他傳參的方式是將整個RDD傳入,然后將一個迭代器傳出生成一個新的RDD,由於整個RDD都傳入了,所以便能完成前面說的業務。
1.3 mapPartitions
(1) 使用Java7進行編寫
/** * map: * 一條數據一條數據的處理(文件系統,數據庫等等) * mapPartitions: * 一次獲取的是一個分區的數據(hdfs) * 正常情況下,mapPartitions 是一個高性能的算子 * 因為每次處理的是一個分區的數據,減少了去獲取數據的次數。 * * 但是如果我們的分區如果設置得不合理,有可能導致每個分區里面的數據量過大。 */ public static void mapPartitions(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6); //參數二代表這個rdd里面有兩個分區 JavaRDD<Integer> listRDD = sc.parallelize(list,2); listRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() { @Override public Iterator<String> call(Iterator<Integer> iterator) throws Exception { ArrayList<String> array = new ArrayList<>(); while (iterator.hasNext()){ array.add("hello " + iterator.next()); } return array.iterator(); } }).foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2) 使用Java8進行編寫
public static void mapParations(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6); JavaRDD<Integer> listRDD = sc.parallelize(list, 2); listRDD.mapPartitions(iterator -> { ArrayList<String> array = new ArrayList<>(); while (iterator.hasNext()){ array.add("hello " + iterator.next()); } return array.iterator(); }).foreach(name -> System.out.println(name)); }
(3) 使用scala進行編寫
def mapParations(): Unit ={ val list = List(1,2,3,4,5,6) val listRDD = sc.parallelize(list,2) listRDD.mapPartitions(iterator => { val newList: ListBuffer[String] = ListBuffer() while (iterator.hasNext){ newList.append("hello " + iterator.next()) } newList.toIterator }).foreach(name => println(name)) }
(4) 運行結果
1.4 mapPartitionsWithIndex
每次獲取和處理的就是一個分區的數據,並且知道處理的分區的分區號是啥?
(1)使用Java7編寫
public static void mapPartitionsWithIndex(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8); JavaRDD<Integer> listRDD = sc.parallelize(list, 2); listRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception { ArrayList<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ list1.add(index+"_"+iterator.next()); } return list1.iterator(); } },true) .foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2)使用Java8編寫
public static void mapPartitionsWithIndex() { List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8); JavaRDD<Integer> listRDD = sc.parallelize(list, 2); listRDD.mapPartitionsWithIndex((index,iterator) -> { ArrayList<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ list1.add(index+"_"+iterator.next()); } return list1.iterator(); },true) .foreach(str -> System.out.println(str)); }
(3)使用scala編寫
def mapPartitionsWithIndex(): Unit ={ val list = List(1,2,3,4,5,6,7,8) sc.parallelize(list).mapPartitionsWithIndex((index,iterator) => { val listBuffer:ListBuffer[String] = new ListBuffer while (iterator.hasNext){ listBuffer.append(index+"_"+iterator.next()) } listBuffer.iterator },true) .foreach(println(_)) }
(4)運行結果
二、reduce、reduceByKey
2.1 reduce
reduce其實是講RDD中的所有元素進行合並,當運行call方法時,會傳入兩個參數,在call方法中將兩個參數合並后返回,而這個返回值回合一個新的RDD中的元素再次傳入call方法中,繼續合並,直到合並到只剩下一個元素時。
(1)使用Java7編寫
public static void reduce(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6); JavaRDD<Integer> listRDD = sc.parallelize(list); Integer result = listRDD.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); System.out.println(result); }
(2)使用Java8編寫
public static void reduce(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6); JavaRDD<Integer> listRDD = sc.parallelize(list); Integer result = listRDD.reduce((x, y) -> x + y); System.out.println(result); }
(3)使用scala編寫
def reduce(): Unit ={ val list = List(1,2,3,4,5,6) val listRDD = sc.parallelize(list) val result = listRDD.reduce((x,y) => x+y) println(result) }
(4)運行結果
2.2 reduceByKey
reduceByKey僅將RDD中所有K,V對中K值相同的V進行合並。
(1)使用Java7編寫
public static void reduceByKey(){ List<Tuple2<String, Integer>> list = Arrays.asList( new Tuple2<String, Integer>("武當", 99), new Tuple2<String, Integer>("少林", 97), new Tuple2<String, Integer>("武當", 89), new Tuple2<String, Integer>("少林", 77) ); JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list); //運行reduceByKey時,會將key值相同的組合在一起做call方法中的操作 JavaPairRDD<String, Integer> result = listRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); result.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println("門派: " + tuple._1 + "->" + tuple._2); } }); }
(2)使用Java8編寫
public static void reduceByKey(){ List<Tuple2<String, Integer>> list = Arrays.asList( new Tuple2<String, Integer>("武當", 99), new Tuple2<String, Integer>("少林", 97), new Tuple2<String, Integer>("武當", 89), new Tuple2<String, Integer>("少林", 77) ); JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list); JavaPairRDD<String, Integer> resultRDD = listRDD.reduceByKey((x, y) -> x + y); resultRDD.foreach(tuple -> System.out.println("門派: " + tuple._1 + "->" + tuple._2)); }
(3)使用scala編寫
def reduceByKey(): Unit ={ val list = List(("武當", 99), ("少林", 97), ("武當", 89), ("少林", 77)) val mapRDD = sc.parallelize(list) val resultRDD = mapRDD.reduceByKey(_+_) resultRDD.foreach(tuple => println("門派: " + tuple._1 + "->" + tuple._2)) }
(4)運行結果
三、union,join和groupByKey
3.1 union
當要將兩個RDD合並時,便要用到union和join,其中union只是簡單的將兩個RDD累加起來,可以看做List的addAll方法。就想List中一樣,當使用union及join時,必須保證兩個RDD的泛型是一致的。
(1)使用Java7編寫
public static void union(){ final List<Integer> list1 = Arrays.asList(1, 2, 3, 4); final List<Integer> list2 = Arrays.asList(3, 4, 5, 6); final JavaRDD<Integer> rdd1 = sc.parallelize(list1); final JavaRDD<Integer> rdd2 = sc.parallelize(list2); rdd1.union(rdd2) .foreach(new VoidFunction<Integer>() { @Override public void call(Integer number) throws Exception { System.out.println(number + ""); } }); }
(2)使用Java8編寫
public static void union(){ final List<Integer> list1 = Arrays.asList(1, 2, 3, 4); final List<Integer> list2 = Arrays.asList(3, 4, 5, 6); final JavaRDD<Integer> rdd1 = sc.parallelize(list1); final JavaRDD<Integer> rdd2 = sc.parallelize(list2); rdd1.union(rdd2).foreach(num -> System.out.println(num)); }
(3)使用scala編寫
def union(): Unit ={ val list1 = List(1,2,3,4) val list2 = List(3,4,5,6) val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) rdd1.union(rdd2).foreach(println(_)) }
(4)運行結果
3.2 groupByKey
(1)使用Java7編寫
union只是將兩個RDD簡單的累加在一起,而join則不一樣,join類似於hadoop中的combin操作,只是少了排序這一段,再說join之前說說groupByKey,因為join可以理解為union與groupByKey的結合:groupBy是將RDD中的元素進行分組,組名是call方法中的返回值,而顧名思義groupByKey是將PairRDD中擁有相同key值得元素歸為一組。即:
public static void groupByKey(){ List<Tuple2<String,String>> list = Arrays.asList( new Tuple2("武當", "張三豐"), new Tuple2("峨眉", "滅絕師太"), new Tuple2("武當", "宋青書"), new Tuple2("峨眉", "周芷若") ); JavaPairRDD<String, String> listRDD = sc.parallelizePairs(list); JavaPairRDD<String, Iterable<String>> groupByKeyRDD = listRDD.groupByKey(); groupByKeyRDD.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() { @Override public void call(Tuple2<String, Iterable<String>> tuple) throws Exception { String menpai = tuple._1; Iterator<String> iterator = tuple._2.iterator(); String people = ""; while (iterator.hasNext()){ people = people + iterator.next()+" "; } System.out.println("門派:"+menpai + "人員:"+people); } }); }
(2)使用Java8編寫
public static void groupByKey(){ List<Tuple2<String,String>> list = Arrays.asList( new Tuple2("武當", "張三豐"), new Tuple2("峨眉", "滅絕師太"), new Tuple2("武當", "宋青書"), new Tuple2("峨眉", "周芷若") ); JavaPairRDD<String, String> listRDD = sc.parallelizePairs(list); JavaPairRDD<String, Iterable<String>> groupByKeyRDD = listRDD.groupByKey(); groupByKeyRDD.foreach(tuple -> { String menpai = tuple._1; Iterator<String> iterator = tuple._2.iterator(); String people = ""; while (iterator.hasNext()){ people = people + iterator.next()+" "; } System.out.println("門派:"+menpai + "人員:"+people); }); }
(3)使用scala編寫
def groupByKey(): Unit ={ val list = List(("武當", "張三豐"), ("峨眉", "滅絕師太"), ("武當", "宋青書"), ("峨眉", "周芷若")) val listRDD = sc.parallelize(list) val groupByKeyRDD = listRDD.groupByKey() groupByKeyRDD.foreach(t => { val menpai = t._1 val iterator = t._2.iterator var people = "" while (iterator.hasNext) people = people + iterator.next + " " println("門派:" + menpai + "人員:" + people) }) }
(4)運行結果
3.3 join
(1)使用Java7編寫
join是將兩個PairRDD合並,並將有相同key的元素分為一組,可以理解為groupByKey和Union的結合
public static void join(){ final List<Tuple2<Integer, String>> names = Arrays.asList( new Tuple2<Integer, String>(1, "東方不敗"), new Tuple2<Integer, String>(2, "令狐沖"), new Tuple2<Integer, String>(3, "林平之") ); final List<Tuple2<Integer, Integer>> scores = Arrays.asList( new Tuple2<Integer, Integer>(1, 99), new Tuple2<Integer, Integer>(2, 98), new Tuple2<Integer, Integer>(3, 97) ); final JavaPairRDD<Integer, String> nemesrdd = sc.parallelizePairs(names); final JavaPairRDD<Integer, Integer> scoresrdd = sc.parallelizePairs(scores); /** * <Integer, 學號 * Tuple2<String, 名字 * Integer>> 分數 */ final JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = nemesrdd.join(scoresrdd); // final JavaPairRDD<Integer, Tuple2<Integer, String>> join = scoresrdd.join(nemesrdd); joinRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() { @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> tuple) throws Exception { System.out.println("學號:" + tuple._1 + " 名字:"+tuple._2._1 + " 分數:"+tuple._2._2); } }); }
(2)使用Java8編寫
public static void join(){ final List<Tuple2<Integer, String>> names = Arrays.asList( new Tuple2<Integer, String>(1, "東方不敗"), new Tuple2<Integer, String>(2, "令狐沖"), new Tuple2<Integer, String>(3, "林平之") ); final List<Tuple2<Integer, Integer>> scores = Arrays.asList( new Tuple2<Integer, Integer>(1, 99), new Tuple2<Integer, Integer>(2, 98), new Tuple2<Integer, Integer>(3, 97) ); final JavaPairRDD<Integer, String> nemesrdd = sc.parallelizePairs(names); final JavaPairRDD<Integer, Integer> scoresrdd = sc.parallelizePairs(scores); final JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = nemesrdd.join(scoresrdd); joinRDD.foreach(tuple -> System.out.println("學號:"+tuple._1+" 姓名:"+tuple._2._1+" 成績:"+tuple._2._2)); }
(3)使用scala編寫
def join(): Unit = { val list1 = List((1, "東方不敗"), (2, "令狐沖"), (3, "林平之")) val list2 = List((1, 99), (2, 98), (3, 97)) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) val joinRDD = list1RDD.join(list2RDD) joinRDD.foreach(t => println("學號:" + t._1 + " 姓名:" + t._2._1 + " 成績:" + t._2._2)) }
(4)運行結果
四、sample、cartesian
4.1 sample
(1)使用Java7編寫
public static void sample(){ ArrayList<Integer> list = new ArrayList<>(); for(int i=1;i<=100;i++){ list.add(i); } JavaRDD<Integer> listRDD = sc.parallelize(list); /** * sample用來從RDD中抽取樣本。他有三個參數 * withReplacement: Boolean, * true: 有放回的抽樣 * false: 無放回抽象 * fraction: Double: * 抽取樣本的比例 * seed: Long: * 隨機種子 */ JavaRDD<Integer> sampleRDD = listRDD.sample(false, 0.1,0); sampleRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.print(num+" "); } }); }
(2)使用Java8編寫
public static void sample(){ ArrayList<Integer> list = new ArrayList<>(); for(int i=1;i<=100;i++){ list.add(i); } JavaRDD<Integer> listRDD = sc.parallelize(list); JavaRDD<Integer> sampleRDD = listRDD.sample(false, 0.1, 0); sampleRDD.foreach(num -> System.out.print(num + " ")); }
(3)使用scala編寫
def sample(): Unit ={ val list = 1 to 100 val listRDD = sc.parallelize(list) listRDD.sample(false,0.1,0).foreach(num => print(num + " ")) }
(4)運行結果
4.2 cartesian
cartesian是用於求笛卡爾積的
(1)使用Java7編寫
public static void cartesian(){ List<String> list1 = Arrays.asList("A", "B"); List<Integer> list2 = Arrays.asList(1, 2, 3); JavaRDD<String> list1RDD = sc.parallelize(list1); JavaRDD<Integer> list2RDD = sc.parallelize(list2); list1RDD.cartesian(list2RDD).foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println(tuple._1 + "->" + tuple._2); } }); }
(2)使用Java8編寫
public static void cartesian(){ List<String> list1 = Arrays.asList("A", "B"); List<Integer> list2 = Arrays.asList(1, 2, 3); JavaRDD<String> list1RDD = sc.parallelize(list1); JavaRDD<Integer> list2RDD = sc.parallelize(list2); list1RDD.cartesian(list2RDD).foreach(tuple -> System.out.print(tuple._1 + "->" + tuple._2)); }
(3)使用scala編寫
def cartesian(): Unit ={ val list1 = List("A","B") val list2 = List(1,2,3) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) list1RDD.cartesian(list2RDD).foreach(t => println(t._1 +"->"+t._2)) }
(4)運行結果
五、filter、distinct、intersection
5.1 filter
(1)使用Java7編寫
過濾出偶數
public static void filter(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> listRDD = sc.parallelize(list); JavaRDD<Integer> filterRDD = listRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer num) throws Exception { return num % 2 == 0; } }); filterRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.print(num + " "); } }); }
(2)使用Java8編寫
public static void filter(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> listRDD = sc.parallelize(list); JavaRDD<Integer> filterRDD = listRDD.filter(num -> num % 2 ==0); filterRDD.foreach(num -> System.out.print(num + " ")); }
(3)使用scala編寫
def filter(): Unit ={ val list = List(1,2,3,4,5,6,7,8,9,10) val listRDD = sc.parallelize(list) listRDD.filter(num => num % 2 ==0).foreach(print(_)) }
(4)運行結果
5.2 distinct
(1)使用Java7編寫
public static void distinct(){ List<Integer> list = Arrays.asList(1, 1, 2, 2, 3, 3, 4, 5); JavaRDD<Integer> listRDD = (JavaRDD<Integer>) sc.parallelize(list); JavaRDD<Integer> distinctRDD = listRDD.distinct(); distinctRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); }
(2)使用Java8編寫
public static void distinct(){ List<Integer> list = Arrays.asList(1, 1, 2, 2, 3, 3, 4, 5); JavaRDD<Integer> listRDD = (JavaRDD<Integer>) sc.parallelize(list); listRDD.distinct().foreach(num -> System.out.println(num)); }
(3)使用scala編寫
def distinct(): Unit ={ val list = List(1,1,2,2,3,3,4,5) sc.parallelize(list).distinct().foreach(println(_)) }
(4)運行結果
5.3 intersection
(1)使用Java7編寫
public static void intersection(){ List<Integer> list1 = Arrays.asList(1, 2, 3, 4); List<Integer> list2 = Arrays.asList(3, 4, 5, 6); JavaRDD<Integer> list1RDD = sc.parallelize(list1); JavaRDD<Integer> list2RDD = sc.parallelize(list2); list1RDD.intersection(list2RDD).foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); }
(2)使用Java8編寫
public static void intersection() { List<Integer> list1 = Arrays.asList(1, 2, 3, 4); List<Integer> list2 = Arrays.asList(3, 4, 5, 6); JavaRDD<Integer> list1RDD = sc.parallelize(list1); JavaRDD<Integer> list2RDD = sc.parallelize(list2); list1RDD.intersection(list2RDD).foreach(num ->System.out.println(num)); }
(3)使用scala編寫
def intersection(): Unit ={ val list1 = List(1,2,3,4) val list2 = List(3,4,5,6) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) list1RDD.intersection(list2RDD).foreach(println(_)) }
(4)運行結果
六、coalesce、repartition、repartitionAndSortWithinPartitions
6.1 coalesce
分區數由多 -》 變少
(1)使用Java7編寫
public static void coalesce(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); JavaRDD<Integer> listRDD = sc.parallelize(list, 3); listRDD.coalesce(1).foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.print(num); } }); }
(2)使用Java8編寫
public static void coalesce() { List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); JavaRDD<Integer> listRDD = sc.parallelize(list, 3); listRDD.coalesce(1).foreach(num -> System.out.println(num)); }
(3)使用scala編寫
def coalesce(): Unit = { val list = List(1,2,3,4,5,6,7,8,9) sc.parallelize(list,3).coalesce(1).foreach(println(_)) }
(4)運行結果
6.2 replication
進行重分區,解決的問題:本來分區數少 -》 增加分區數
(1)使用Java7編寫
public static void replication(){ List<Integer> list = Arrays.asList(1, 2, 3, 4); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); listRDD.repartition(2).foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); }
(2)使用Java8編寫
public static void replication(){ List<Integer> list = Arrays.asList(1, 2, 3, 4); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); listRDD.repartition(2).foreach(num -> System.out.println(num)); }
(3)使用scala編寫
def replication(): Unit ={ val list = List(1,2,3,4) val listRDD = sc.parallelize(list,1) listRDD.repartition(2).foreach(println(_)) }
(4)運行結果
6.3 repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函數是repartition函數的變種,與repartition函數不同的是,repartitionAndSortWithinPartitions在給定的partitioner內部進行排序,性能比repartition要高。
(1)使用Java7編寫
public static void repartitionAndSortWithinPartitions(){ List<Integer> list = Arrays.asList(1, 3, 55, 77, 33, 5, 23); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); JavaPairRDD<Integer, Integer> pairRDD = listRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer num) throws Exception { return new Tuple2<>(num, num); } }); JavaPairRDD<Integer, Integer> parationRDD = pairRDD.repartitionAndSortWithinPartitions(new Partitioner() { @Override public int getPartition(Object key) { Integer index = Integer.valueOf(key.toString()); if (index % 2 == 0) { return 0; } else { return 1; } } @Override public int numPartitions() { return 2; } }); parationRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, Integer>> iterator) throws Exception { final ArrayList<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ list1.add(index+"_"+iterator.next()); } return list1.iterator(); } },false).foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2)使用Java8編寫
public static void repartitionAndSortWithinPartitions(){ List<Integer> list = Arrays.asList(1, 4, 55, 66, 33, 48, 23); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); JavaPairRDD<Integer, Integer> pairRDD = listRDD.mapToPair(num -> new Tuple2<>(num, num)); pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitionsWithIndex((index,iterator) -> { ArrayList<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ list1.add(index+"_"+iterator.next()); } return list1.iterator(); },false) .foreach(str -> System.out.println(str)); }
(3)使用scala編寫
def repartitionAndSortWithinPartitions(): Unit ={ val list = List(1, 4, 55, 66, 33, 48, 23) val listRDD = sc.parallelize(list,1) listRDD.map(num => (num,num)) .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitionsWithIndex((index,iterator) => { val listBuffer: ListBuffer[String] = new ListBuffer while (iterator.hasNext) { listBuffer.append(index + "_" + iterator.next()) } listBuffer.iterator },false) .foreach(println(_)) }
(4)運行結果
七、cogroup、sortBykey、aggregateByKey
7.1 cogroup
對兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不同的是針對兩個RDD中相同的key的元素進行合並。
(1)使用Java7編寫
public static void cogroup(){ List<Tuple2<Integer, String>> list1 = Arrays.asList( new Tuple2<Integer, String>(1, "www"), new Tuple2<Integer, String>(2, "bbs") ); List<Tuple2<Integer, String>> list2 = Arrays.asList( new Tuple2<Integer, String>(1, "cnblog"), new Tuple2<Integer, String>(2, "cnblog"), new Tuple2<Integer, String>(3, "very") ); List<Tuple2<Integer, String>> list3 = Arrays.asList( new Tuple2<Integer, String>(1, "com"), new Tuple2<Integer, String>(2, "com"), new Tuple2<Integer, String>(3, "good") ); JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1); JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2); JavaPairRDD<Integer, String> list3RDD = sc.parallelizePairs(list3); list1RDD.cogroup(list2RDD,list3RDD).foreach(new VoidFunction<Tuple2<Integer, Tuple3<Iterable<String>, Iterable<String>, Iterable<String>>>>() { @Override public void call(Tuple2<Integer, Tuple3<Iterable<String>, Iterable<String>, Iterable<String>>> tuple) throws Exception { System.out.println(tuple._1+" " +tuple._2._1() +" "+tuple._2._2()+" "+tuple._2._3()); } }); }
(2)使用Java8編寫
public static void cogroup(){ List<Tuple2<Integer, String>> list1 = Arrays.asList( new Tuple2<Integer, String>(1, "www"), new Tuple2<Integer, String>(2, "bbs") ); List<Tuple2<Integer, String>> list2 = Arrays.asList( new Tuple2<Integer, String>(1, "cnblog"), new Tuple2<Integer, String>(2, "cnblog"), new Tuple2<Integer, String>(3, "very") ); List<Tuple2<Integer, String>> list3 = Arrays.asList( new Tuple2<Integer, String>(1, "com"), new Tuple2<Integer, String>(2, "com"), new Tuple2<Integer, String>(3, "good") ); JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1); JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2); JavaPairRDD<Integer, String> list3RDD = sc.parallelizePairs(list3); list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple -> System.out.println(tuple._1+" " +tuple._2._1() +" "+tuple._2._2()+" "+tuple._2._3())); }
(3)使用scala編寫
def cogroup(): Unit ={ val list1 = List((1, "www"), (2, "bbs")) val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very")) val list3 = List((1, "com"), (2, "com"), (3, "good")) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) val list3RDD = sc.parallelize(list3) list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple => println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3)) }
(4)運行結果
7.2 sortBykey
sortByKey函數作用於Key-Value形式的RDD,並對Key進行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions
中實現的,實現如下
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size) : RDD[(K, V)] = { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
從函數的實現可以看出,它主要接受兩個函數,含義和sortBy一樣,這里就不進行解釋了。該函數返回的RDD一定是ShuffledRDD類型的,因為對源RDD進行排序,必須進行Shuffle操作,而Shuffle操作的結果RDD就是ShuffledRDD。其實這個函數的實現很優雅,里面用到了RangePartitioner,它可以使得相應的范圍Key數據分到同一個partition中,然后內部用到了mapPartitions對每個partition中的數據進行排序,而每個partition中數據的排序用到了標准的sort機制,避免了大量數據的shuffle。下面對sortByKey的使用進行說明:
(1)使用Java7編寫
public static void sortByKey(){ List<Tuple2<Integer, String>> list = Arrays.asList( new Tuple2<>(99, "張三豐"), new Tuple2<>(96, "東方不敗"), new Tuple2<>(66, "林平之"), new Tuple2<>(98, "聶風") ); JavaPairRDD<Integer, String> listRDD = sc.parallelizePairs(list); listRDD.sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, String>>() { @Override public void call(Tuple2<Integer, String> tuple) throws Exception { System.out.println(tuple._2+"->"+tuple._1); } }); }
(2)使用Java8編寫
public static void sortByKey(){ List<Tuple2<Integer, String>> list = Arrays.asList( new Tuple2<>(99, "張三豐"), new Tuple2<>(96, "東方不敗"), new Tuple2<>(66, "林平之"), new Tuple2<>(98, "聶風") ); JavaPairRDD<Integer, String> listRDD = sc.parallelizePairs(list); listRDD.sortByKey(false).foreach(tuple ->System.out.println(tuple._2+"->"+tuple._1)); }
(3)使用scala編寫
def sortByKey(): Unit ={ val list = List((99, "張三豐"), (96, "東方不敗"), (66, "林平之"), (98, "聶風")) sc.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1)) }
(4)運行結果
7.3 aggregateByKey
aggregateByKey函數對PairRDD中相同Key的值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和aggregate函數類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因為aggregateByKey是對相同Key中的值進行聚合操作,所以aggregateByKey函數最終返回的類型還是Pair RDD,對應的結果是Key和聚合好的值;而aggregate函數直接是返回非RDD的結果,這點需要注意。在實現過程中,定義了三個aggregateByKey函數原型,但最終調用的aggregateByKey函數都一致。
(1)使用Java7編寫
public static void aggregateByKey(){ List<String> list = Arrays.asList("you,jump", "i,jump"); JavaRDD<String> listRDD = sc.parallelize(list); listRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(",")).iterator(); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word,1); } }).aggregateByKey(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1+i2; } }).foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println(tuple._1+"->"+tuple._2); } }); }
(2)使用Java8編寫
public static void aggregateByKey() { List<String> list = Arrays.asList("you,jump", "i,jump"); JavaRDD<String> listRDD = sc.parallelize(list); listRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator()) .mapToPair(word -> new Tuple2<>(word,1)) .aggregateByKey(0,(x,y)-> x+y,(m,n) -> m+n) .foreach(tuple -> System.out.println(tuple._1+"->"+tuple._2)); }
(3)使用scala編寫
def aggregateByKey(): Unit ={ val list = List("you,jump", "i,jump") sc.parallelize(list) .flatMap(_.split(",")) .map((_, 1)) .aggregateByKey(0)(_+_,_+_) .foreach(tuple =>println(tuple._1+"->"+tuple._2)) }
(4)運行結果