一、前述
Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序(就是我們編寫的一個應用程序)中有幾個Action類算子執行,就有幾個job運行。
二、具體
原始數據集:

1、count
返回數據集中的元素數。會在結果計算完成后回收到Driver端。返回行數
package com.spark.spark.actions; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** * count * 返回結果集中的元素數,會將結果回收到Driver端。 * */ public class Operator_count { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("collect"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); long count = lines.count(); System.out.println(count); jsc.stop(); } }
結果:返回行數即元素數

2、take(n)
first=take(1) 返回數據集中的第一個元素。
返回一個包含數據集前n個元素的集合。是一個(array)有幾個partiotion 會有幾個job觸發
package com.spark.spark.actions; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** * take * * @author root * */ public class Operator_takeAndFirst { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("take"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("a","b","c","d")); List<String> take = parallelize.take(2); String first = parallelize.first(); for(String s:take){ System.out.println(s); } jsc.stop(); } }
結果:

3、foreach
循環遍歷數據集中的每個元素,運行相應的邏輯。
4、collect
將計算結果回收到Driver端。當數據量很大時就不要回收了,會造成oom.
一般在使用過濾算子或者一些能返回少量數據集的算子后
package com.spark.spark.actions; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; /** * collect * 將計算的結果作為集合拉回到driver端,一般在使用過濾算子或者一些能返回少量數據集的算子后,將結果回收到Driver端打印顯示。 * */ public class Operator_collect { public static void main(String[] args) { /** * SparkConf對象中主要設置Spark運行的環境參數。 * 1.運行模式 * 2.設置Application name * 3.運行的資源需求 */ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("collect"); /** * JavaSparkContext對象是spark運行的上下文,是通往集群的唯一通道。 */ JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() { /** * */ private static final long serialVersionUID = 1L; @Override public Boolean call(String line) throws Exception { return !line.contains("hadoop"); } }); List<String> collect = resultRDD.collect(); for(String s :collect){ System.out.println(s); } jsc.stop(); } }
結果:

- countByKey
作用到K,V格式的RDD上,根據Key計數相同Key的數據集元素。(也就是個數)
java代碼:
package com.spark.spark.actions; import java.util.Arrays; import java.util.Map; import java.util.Map.Entry; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; /** * countByKey * * 作用到K,V格式的RDD上,根據Key計數相同Key的數據集元素。返回一個Map<K,Object> * @author root * */ public class Operator_countByKey { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("countByKey"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList( new Tuple2<Integer,String>(1,"a"), new Tuple2<Integer,String>(2,"b"), new Tuple2<Integer,String>(3,"c"), new Tuple2<Integer,String>(4,"d"), new Tuple2<Integer,String>(4,"e") )); Map<Integer, Object> countByKey = parallelizePairs.countByKey(); for(Entry<Integer,Object> entry : countByKey.entrySet()){ System.out.println("key:"+entry.getKey()+"value:"+entry.getValue()); } } }
結果:

- countByValue
根據數據集每個元素相同的內容來計數。返回相同內容的元素對應的條數。
java代碼:
package com.spark.spark.actions; import java.util.Arrays; import java.util.Map; import java.util.Map.Entry; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; /** * countByValue * 根據數據集每個元素相同的內容來計數。返回相同內容的元素對應的條數。 * * @author root * */ public class Operator_countByValue { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("countByKey"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList( new Tuple2<Integer,String>(1,"a"), new Tuple2<Integer,String>(2,"b"), new Tuple2<Integer,String>(2,"c"), new Tuple2<Integer,String>(3,"c"), new Tuple2<Integer,String>(4,"d"), new Tuple2<Integer,String>(4,"d") )); Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue(); for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){ System.out.println("key:"+entry.getKey()+",value:"+entry.getValue()); } } }
scala代碼:
package com.bjsxt.spark.actions
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* countByValue
* 根據數據集每個元素相同的內容來計數。返回相同內容的元素對應的條數。
*/
object Operator_countByValue {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("countByValue")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List("a","a","b"))
val rdd2 = rdd1.countByValue()
rdd2.foreach(println)
sc.stop()
}
}
代碼結果:
java:

scala:

- reduce
根據聚合邏輯聚合數據集中的每個元素。(reduce里面需要具體的邏輯,根據里面的邏輯對相同分區的數據進行計算)
java代碼:
package com.spark.spark.actions; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; /** * reduce * * 根據聚合邏輯聚合數據集中的每個元素。 * @author root * */ public class Operator_reduce { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("reduce"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3,4,5)); Integer reduceResult = parallelize.reduce(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); System.out.println(reduceResult); sc.stop(); } }
scala代碼:
package com.bjsxt.spark.actions
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* reduce
*
* 根據聚合邏輯聚合數據集中的每個元素。
*/
object Operator_reduce {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("reduce")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(Array(1,2))
val result = rdd1.reduce(_+_)
println(result)
sc.stop()
}
}
結果:
java:

scala:

