1.spark的算子分為轉換算子和Action算子,Action算子將形成一個job,轉換算子RDD轉換成另一個RDD,或者將文件系統的數據轉換成一個RDD
2.Spark的算子介紹地址:http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html
3.Spark操作基本步驟【java版本,其他語言可以根據官網的案例進行學習】
(1)創建配置文件,將集群的運行模式設置好,給作業起一個名字,可以使用set方法其他配置設入。
SparkConf sparkConf = new SparkConf().setAppName("Demo").setMaster("local"); 這里使用的是local的運行模式,起的名字是Demo
(2)創建SparkContext
JavaSparkContext javaContext = new JavaSparkContext(sparkConf);
(3)使用算子,操作數據
JavaRDD<String> javaRdd = sparkContext.textFile("logfile.txt",1); javaRdd = javaRdd.cache();//這一句必須這樣寫,我們在數據計算很費時的時候,將數據緩存 long line = javaRdd.count(); System.out.println(line);
(4)關閉資源
sparkContext.close();
上面以一個求出數據行數的例子,看一下代碼操作的流程。
4.Action算子和介紹和舉例
(1)map算子;將數據讀取使用map進行操作,使用foreach算子計算出 結果。 每一次讀取partition中的一條數據進行分析
案例:將數據乘以10,在輸出,測試算子。
package kw.test.demo; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; /* * 本案例:將數據 值乘以一個數,然后將數據的值返回。 */ public class MapApp { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("MapTest"); JavaSparkContext jsc= new JavaSparkContext(conf); List<Integer> list = Arrays.asList(1,2,3,4,5) ; JavaRDD<Integer> javaRdd = jsc.parallelize(list); JavaRDD<Integer> result = javaRdd.map(new Function<Integer,Integer>() { @Override public Integer call(Integer list) throws Exception { // TODO Auto-generated method stub return list*10; } }); result.foreach(new VoidFunction<Integer>() { @Override public void call(Integer result) throws Exception { // TODO Auto-generated method stub System.out.println(result); } }); jsc.close(); } }
(2)MapPartition:將一整塊的數據放入然后處理,他和map的區別就是,map將一部分數據放入然后計算,MapPartition將一整塊的數據一起放入計算。
如果數據量小的時候,可以是Mappartition中,如果數據量比較大的時候使用Map會比較好,因為可以防止內存溢出。
package kw.test.demo; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; public class MapPartitionApp { public static void main(String[] args) { /* * 創建配置文件 * 創建出RDD */ SparkConf sparkconf = new SparkConf().setMaster("local").setAppName("mapPartition"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkconf); /* * mapPartition的使用是將一個塊一起放入到算子中操作。 * * 假如說RDD上的數據不是太多的時候,可以使用mapPartition 來操作,如果一個RDD的數據比較多還是使用map好 * 返回了大量數據,容易曹成內存溢出。 */ /* 准備數據集*/ List <String> list= Arrays.asList("kangwang","kang","wang"); JavaRDD<String> javaRDD = javaSparkContext.parallelize(list); final Map<String,Integer> sore = new HashMap<String ,Integer>(); sore.put("kangwang", 0); sore.put("kang", 13); sore.put("wang", 454); JavaRDD<Integer> sRDD= javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { @Override public Iterator<Integer> call(Iterator<String> it) throws Exception { // TODO Auto-generated method stub List list = new ArrayList(); while(it.hasNext()) { String name = it.next(); Integer so = sore.get(name); list.add(so); } Iterator i =list.iterator(); return i; } }); sRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer it) throws Exception { // TODO Auto-generated method stub System.out.println("it的值是"+it); } }); } }
(3)MapPartitionWithIndex:
本案例:
查看將數據的分配到具體的快上的信息。
我們可以指定partition的個數,默認是2
parallelize並行集合的時候,指定了並行度,也就是partition的個數是2
具體他們的數據怎樣分,我們並不知道,由spark自己分配
如果想要知道,就可以使用此算子,將數據的值打印出來。
package kw.test.demo; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; public class MapPartionWithIndex { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("MapPartitionWithIndex").setMaster("local"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); //准備數據 List<String> list =new ArrayList<String>(); list.add("Demo1"); list.add("Demo2"); list.add("Demo3"); list.add("Demo4"); list.add("Demo5"); list.add("Demo6"); list.add("Demo7"); list.add("Demo8"); list.add("Demo9"); list.add("Demo10"); list.add("Demo11"); list.add("Demo12"); //創建RDD,指定map的個數4 JavaRDD<String> javaRDD = javaSparkContext.parallelize(list, 2); JavaRDD<String> javaRDD1 =javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<String> it2) throws Exception { // TODO Auto-generated method stub //index是partition的個數 List<String> list = new ArrayList<String>(); while(it2.hasNext()) { String name = it2.next(); String info = "partition是:"+index+"數據的name是:"+name; list.add(info); } return list.iterator(); } }, true); javaRDD1.foreach(new VoidFunction<String>() { @Override public void call(String infos) throws Exception { // TODO Auto-generated method stub System.out.println(infos); } }); } }
(4)coalesce算子,是架構RDD的partition的數量縮減
將一定數量的partition壓縮到更少的partition分區中去
使用的場景,很多時候在filter算子應用之后會優化一下到使用coalesce算子。
filter算子應用到RDD上面,說白了會應用到RDD對應到里面的每個partition上
數據傾斜,換句話說就是有可能的partition里面就剩下了一條數據 建議使用coalesce算子,
從前各個partition中 數據都更加的緊湊就可以減少它的 個數
package kw.test.demo; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; public class CoalesceOpter { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("coalesceDemo").setMaster("local"); JavaSparkContext javaContext = new JavaSparkContext(sparkConf); List<String> list = Arrays.asList("kw1","djf","kw1","fgf", "djf","kw1","djf","sdsds","kw1","ssdu","djf"); JavaRDD<String>javaRDD = javaContext.parallelize(list,6); JavaRDD<String> info = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception { // TODO Auto-generated method stub List<String> list =new ArrayList<String>(); while(arg1.hasNext()) { String name = arg1.next(); String info = arg0 +"^^^^^……………………………………………………………………"+ name; list.add(info); } return list.iterator(); } }, true); info.foreach(new VoidFunction<String>() { @Override public void call(String arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0); } }); info.coalesce(3); JavaRDD<String> javaRDD1 = info.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception { // TODO Auto-generated method stub List<String> list = new ArrayList<String>(); while(arg1.hasNext()) { String name = arg1.next(); String info2 =" " +name +"………………………………" +arg0; list.add(info2); } return list.iterator(); } }, true); javaRDD1.foreach(new VoidFunction<String>() { @Override public void call(String arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0); } }); } } *
(5)filter此案例將數據的值過濾出來。使用的是filter算子
package kw.test.demo; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; /* * 此案例將數據的值過濾出來。使用的是filter算子 */ public class APPFilter { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Filter"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); List<Integer> list = new ArrayList<Integer>(); list.add(2); list.add(26); list.add(23); list.add(24); list.add(256); list.add(278); list.add(2543); list.add(23); list.add(26); JavaRDD<Integer> javaRDD = jsc.parallelize(list,2); //返回值 將返回true的數據返回 JavaRDD<Integer> num= javaRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer it) throws Exception { // TODO Auto-generated method stub return it%2==0; } }); num.foreach(new VoidFunction<Integer>() { @Override public void call(Integer arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0); } }); } }
spark程序可以在本地運行,也可以在集群中運行,可以大成jar,放到真實的集群環境中運行程序。