spark算子介紹


1.spark的算子分為轉換算子和Action算子,Action算子將形成一個job,轉換算子RDD轉換成另一個RDD,或者將文件系統的數據轉換成一個RDD

2.Spark的算子介紹地址:http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

3.Spark操作基本步驟【java版本,其他語言可以根據官網的案例進行學習】

(1)創建配置文件,將集群的運行模式設置好,給作業起一個名字,可以使用set方法其他配置設入。

SparkConf sparkConf = new SparkConf().setAppName("Demo").setMaster("local");
這里使用的是local的運行模式,起的名字是Demo

(2)創建SparkContext

JavaSparkContext javaContext = new JavaSparkContext(sparkConf);

(3)使用算子,操作數據

     JavaRDD<String> javaRdd = sparkContext.textFile("logfile.txt",1);
        javaRdd = javaRdd.cache();//這一句必須這樣寫,我們在數據計算很費時的時候,將數據緩存
        long line = javaRdd.count();
        System.out.println(line);

(4)關閉資源

sparkContext.close();

上面以一個求出數據行數的例子,看一下代碼操作的流程。

4.Action算子和介紹和舉例

(1)map算子;將數據讀取使用map進行操作,使用foreach算子計算出 結果。 每一次讀取partition中的一條數據進行分析

  案例:將數據乘以10,在輸出,測試算子。

package kw.test.demo;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

/*
 * 本案例:將數據 值乘以一個數,然后將數據的值返回。
 */
public class MapApp {
    public static void main(String[] args) {
        
        SparkConf conf = new SparkConf().setMaster("local").setAppName("MapTest");
        JavaSparkContext jsc= new JavaSparkContext(conf);
        List<Integer> list = Arrays.asList(1,2,3,4,5) ; 
        JavaRDD<Integer> javaRdd = jsc.parallelize(list);
        JavaRDD<Integer> result = javaRdd.map(new Function<Integer,Integer>() {
            @Override
            public Integer call(Integer list) throws Exception {
                // TODO Auto-generated method stub
                return list*10;
            }
        });
        result.foreach(new  VoidFunction<Integer>() {
            @Override
            public void call(Integer result) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(result);
            }
        });
        jsc.close();
    }
}

(2)MapPartition:將一整塊的數據放入然后處理,他和map的區別就是,map將一部分數據放入然后計算,MapPartition將一整塊的數據一起放入計算。

如果數據量小的時候,可以是Mappartition中,如果數據量比較大的時候使用Map會比較好,因為可以防止內存溢出。

package kw.test.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

public class MapPartitionApp {
    public static void main(String[] args) {
        /*
         * 創建配置文件
         * 創建出RDD
         */
        SparkConf sparkconf = new SparkConf().setMaster("local").setAppName("mapPartition");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkconf);
        /*
         * mapPartition的使用是將一個塊一起放入到算子中操作。
         * 
         * 假如說RDD上的數據不是太多的時候,可以使用mapPartition 來操作,如果一個RDD的數據比較多還是使用map好
         * 返回了大量數據,容易曹成內存溢出。
         */
    /*    准備數據集*/
        List <String> list= Arrays.asList("kangwang","kang","wang");
        JavaRDD<String> javaRDD = javaSparkContext.parallelize(list);
        
        final Map<String,Integer> sore = new HashMap<String ,Integer>();
        sore.put("kangwang", 0);
        sore.put("kang", 13);
        sore.put("wang", 454);
        
        JavaRDD<Integer> sRDD= javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {

            @Override
            public Iterator<Integer> call(Iterator<String> it) throws Exception {
                // TODO Auto-generated method stub
                List list = new ArrayList();
                
                while(it.hasNext())
                {
                    String name = it.next();
                    Integer so = sore.get(name);
                    list.add(so);
                }
                Iterator i =list.iterator();
                return  i;
            }
        });        
        sRDD.foreach(new VoidFunction<Integer>() {
            
            @Override
            public void call(Integer it) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("it的值是"+it);
            }
        });
    }
}

(3)MapPartitionWithIndex:

本案例:
  查看將數據的分配到具體的快上的信息。
  我們可以指定partition的個數,默認是2
  parallelize並行集合的時候,指定了並行度,也就是partition的個數是2
  具體他們的數據怎樣分,我們並不知道,由spark自己分配
 如果想要知道,就可以使用此算子,將數據的值打印出來。

package kw.test.demo;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
public class MapPartionWithIndex {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("MapPartitionWithIndex").setMaster("local");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        
        //准備數據
        List<String> list =new ArrayList<String>();
        list.add("Demo1");
        list.add("Demo2");
        list.add("Demo3");
        list.add("Demo4");
        list.add("Demo5");
        list.add("Demo6");
        list.add("Demo7");
        list.add("Demo8");
        list.add("Demo9");
        list.add("Demo10");
        list.add("Demo11");
        list.add("Demo12");
        //創建RDD,指定map的個數4
        JavaRDD<String> javaRDD = javaSparkContext.parallelize(list, 2);
        JavaRDD<String> javaRDD1 =javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer index, Iterator<String> it2) throws Exception {
                // TODO Auto-generated method stub
                //index是partition的個數
                List<String> list = new ArrayList<String>();
                while(it2.hasNext())
                {
                    String name = it2.next();
                    String info = "partition是:"+index+"數據的name是:"+name;
                    list.add(info);
                }
                return list.iterator();
            }
            
        }, true);
        
        javaRDD1.foreach(new VoidFunction<String>() {
            
            @Override
            public void call(String infos) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(infos);
            }
        });       
    }
}

(4)coalesce算子,是架構RDD的partition的數量縮減

   將一定數量的partition壓縮到更少的partition分區中去 

  使用的場景,很多時候在filter算子應用之后會優化一下到使用coalesce算子。 

 

  filter算子應用到RDD上面,說白了會應用到RDD對應到里面的每個partition上

  數據傾斜,換句話說就是有可能的partition里面就剩下了一條數據 建議使用coalesce算子,

  從前各個partition中 數據都更加的緊湊就可以減少它的 個數

package kw.test.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

public class CoalesceOpter {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("coalesceDemo").setMaster("local");
        JavaSparkContext javaContext = new JavaSparkContext(sparkConf);

        List<String> list = Arrays.asList("kw1","djf","kw1","fgf",
                "djf","kw1","djf","sdsds","kw1","ssdu","djf");
        
        JavaRDD<String>javaRDD = javaContext.parallelize(list,6);
        JavaRDD<String> info = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception {
                // TODO Auto-generated method stub
                List<String> list =new ArrayList<String>();
                while(arg1.hasNext())
                {
                    String name = arg1.next();
                    String info = arg0 +"^^^^^……………………………………………………………………"+ name;
                    list.add(info);
                }
                return list.iterator();
            }
        }, true);
        info.foreach(new VoidFunction<String>() {
            
            @Override
            public void call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0);
            }
        });
        info.coalesce(3);
        JavaRDD<String> javaRDD1 = info.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception {
                // TODO Auto-generated method stub
                List<String> list = new ArrayList<String>();
                while(arg1.hasNext())
                {
                    String name = arg1.next();
                    String info2 ="        " +name +"………………………………" +arg0;
                    list.add(info2);
                }
                return list.iterator();
            }
        }, true);
        javaRDD1.foreach(new VoidFunction<String>() {
            
            @Override
            public void call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0);
            }
        });
    }
}
* 

(5)filter此案例將數據的值過濾出來。使用的是filter算子

package kw.test.demo;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

/*
 * 此案例將數據的值過濾出來。使用的是filter算子
 */
public class APPFilter {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Filter");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        
        List<Integer> list = new ArrayList<Integer>();
        list.add(2);
        list.add(26);
        list.add(23);
        list.add(24);
        list.add(256);
        list.add(278);
        list.add(2543);
        list.add(23);
        list.add(26);
        
        JavaRDD<Integer> javaRDD = jsc.parallelize(list,2);
        //返回值  將返回true的數據返回
        JavaRDD<Integer> num= javaRDD.filter(new Function<Integer, Boolean>() {
            
            @Override
            public Boolean call(Integer it) throws Exception {
                // TODO Auto-generated method stub
                return it%2==0;
            }
        });
        num.foreach(new VoidFunction<Integer>() {
            
            @Override
            public void call(Integer arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0);
            }
        });
    }
}

 

 spark程序可以在本地運行,也可以在集群中運行,可以大成jar,放到真實的集群環境中運行程序。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM