spark常见的transformation和action算子


RDD:RDD分区数,若从HDFS创建RDD,RDD的分区就是和文件块一一对应,若是集合并行化形式创建,RDD分区数可以指定,一般默认值是CPU的核数。

task:task数量就是和分区数量对应。

这个全:https://www.cnblogs.com/frankdeng/p/9301672.html

一、transformation算子:

(1)map(func):将函数应用于RDD中的每一个元素,将返回值构成新的RDD。输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。

rdd.map(x=>x+1)

如:{1,2,3,3}    结果为 {2,3,4,4}

hadoop fs -cat /tmp/lxw1234/1.txt
hello world
hello spark
hello hive

//读取HDFS文件到RDD
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子
scala> var mapresult = data.map(line => line.split("\\s+"))
mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
//结果
scala> mapresult.collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))

 

(2)flatMap(func):比map多一步合并操作,首先将数组元素进行映射,然后合并压平所有的数组。

//使用flatMap算子
scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
//结果
scala> flatmapresult.collect
res1: Array[String] = Array(hello, world, hello, spark, hello, hive)

参考博客:https://www.cnblogs.com/devin-ou/p/8028305.html

 

(3)mapPartitions(func):函数中传入的参数是迭代器,迭代器里面保存的是一个分区里面的数据。

/**

* makeRDD方法的第一个参数代表的是RDD中的 元素

* 第二个参数:RDD的分区数

* rdd[Int]

*/

val rdd = sc.makeRDD(1 to 10,3)

/**

* mapPartitions这个算子遍历的单位是partition

* 会将一个partition的数据量全部加载到一个集合里面

*/

val mapPartitonsRDD = rdd.mapPartitions(iterator=>{

val list = new ListBuffer[Int]()

//创建一个数据库连接

while(iterator.hasNext){

val num = iterator.next()

list.+=(num+100)

}

//批量插入数据库

list.iterator

}, false)

/**

* 想要执行,必须有action类的算子

* collect算子会将集群中计算的结果回收到Driver端,慎用

*/

val resultArr = mapPartitonsRDD.collect()

resultArr.foreach { println }

map和mapPartition的异同:

  mapPartition  function一次处理一个分区的数据,性能比较高;

  map的function一次只处理一条数据。

  如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。

参考博客:https://blog.csdn.net/wuxintdrh/article/details/80278479

 

(4)distinct:对RDD中的元素进行去重操作。

scala> data.flatMap(line => line.split("\\s+")).collect
res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
 
scala> data.flatMap(line => line.split("\\s+")).distinct.collect
res62: Array[String] = Array(hive, hello, world, spark, hi)

 

(5)reduceByKey(func,[numTask]):找到相同的key,对其进行聚合,聚合的规则由func指定。

reduce任务的数量可以由numTask指定

goodsSaleRDD.reduceByKey((x,y) => x+y)

 参考博客:https://www.jianshu.com/p/af175e66ce99

 

(6)groupByKey():对相同的key进行分组。

 

(7)aggregateByKey(zeroValue: U,  numPartitions: Int)(seqOp: (U, V) => U,  combOp: (U, U) => U)

第一个参数代表着 初始值

第二个参数是中间聚合,在每个分区内部按照key执行聚合操作。这个分两步,第一步先将每个value和初始值作为函数参数进行计算,返回的结果作为新的kv对。然后在对结果再带入到函数中计算。

第三个参数是最终聚合,对中间聚合结果进行最终聚合。

 

例如:一个RDD有两个分区,

patition1:(1,1) (1,2) (2,1)

patition2:(2,3)(2,4)(1,7)

首先,在每个patition中将value和初始值三带入到seqFunc函数中,得到中间结果kv:

patition1:(1,3) (1,3) (2,3)

patition2:(2,3)(2,4)(1,7)

再将中间结果kv带入到seqFunc函数中,按照key进行聚合

patition1:(1,3)(2,3)

patition2:(2,4)(1,7)

最后,进行整体聚合,将上一步结果带入combFunc

(1,10)(2,7)

def seqFunc(a,b):
    print "seqFunc:%s,%s" %(a,b)
    return max(a,b) #取最大值
def combFunc(a,b):
    print "combFunc:%s,%s" %(a ,b)
    return a + b #累加起来
'''
    aggregateByKey这个算子内部肯定有分组
'''
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)

参考博客:https://blog.csdn.net/qq_35440040/article/details/82691794 这个写的挺乱,但有地方可以参考

 

(8)combineByKey ( createCombiner: V=>C,  mergeValue: (C, V) =>C,  mergeCombiners: (C,C) =>C )   :

主要分为三步,第一步,对value进行初始化处理;第二步,在分区内部对(key,value)进行处理,第三步,所有分区间对(key,value)进行处理。

https://www.jianshu.com/p/b77a6294f31c

参考博客:https://www.jianshu.com/p/b77a6294f31c

 

(9)sortBy():排序操作

 

二、action算子

基本RDD的action操作

1、reduce():接收一个函数作为参数,这个函数操作两个相同元素类型的RDD并返回一个同样类型的新元素。

val sum=rdd.reduce( (x,y) => x+y )

 

2、aggregate(zeroValue)(seqOp, combOp):期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来并放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

val result = input.aggregate((0,0))(
                    (acc, value) => (acc._1 + value, acc._2+1),
                    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))     
val avg = result._1 / result._2.toDouble       

 

2、collect():以普通集合或者值的形式,把数据返回驱动器程序,它会将整个RDD的内容返回。通常在单元测试中使用,由于需要将数据复制到驱动器进程中,collect()要求所有的数据都必须能一同放入单台机器的内存中。

rdd.collect()

结果:{1,2,3,3}

  Spark的collect方法,是Action类型的一个算子,会从远程集群拉取数据到driver端。最后,将大量数据汇集到一个driver节点上,将数据用数组存放,占用了jvm堆内存,非常用意造成内存溢出,只用作小型数据的观察。

如何避免使用collect:

若需要遍历RDD中元素,可以使用foreach语句;
若需要打印RDD中元素,可用take语句,返回数据集前n个元素,data.take(1000).foreach(println),这点官方文档里有说明;
若需要查看其中内容,可用saveAsTextFile方法;
总之,单机环境下使用collect问题并不大,但分布式环境下尽量规避,如有其他需要,手动编写代码实现相应功能就好。


参考博客:https://blog.csdn.net/chaoshengmingyue/article/details/82021746

 

3、first:返回RDD中的第一个元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 
scala> rdd1.first
res14: (String, String) = (A,1)


4、take():返回RDD中的n个元素,并尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。

rdd.take(2)

结果:{1,2}

 

5、foreach(fuc):对RDD中的每个元素使用给定的函数。

rdd.foreach(func)

 

6、saveAsTextFile:saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

复制代码
var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r--   2 lxw1234 supergroup        0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--   2 lxw1234 supergroup        21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
 
hadoop fs -cat /tmp/lxw1234.com/part-00000
1
2
3
4
5
复制代码

 

Pair RDD的action操作:

1、countByKey ():对每个键对应的元素分别计数

rdd.countBykey()

结果:{(1,1),(3,2)}

 

2、collectAsMap():将结果以映射的形式返回,以便查询。

rdd.collectAsMap()

结果:Map{(1,2),(3,6)}

 

3、lookup(key)返回给定键对应的所有值

rdd.lookup(3)

结果:[4, 6]

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM