SparkCore算子簡介
SparkCore中的算子可以分為2類:Transformations Operation 和 Action Operation
在Spark的提交過程中,會將RDD及作用於其上的一系列算子(即:RDD及其之間的依賴關系)構建成一個DAG有向無環視圖。當遇到action算子的時候就會觸發一個job的提交,而Driver程序 則會將觸發的job提交給DAGScheduler,由DAGSchedule將job構建成一張DAG
因此,action類算子就是spark application程序分為job的依據,也就是觸發job提交的決定性因素
Spark的RDD空間
- Stage:
stage是對job的划分,遇到shuffle就划分,一個stage有多個tasks,同一個job間的stage具有依賴關系,前者必須結束才能進行后者的計算。
- RDD的創建
(1)通過集合創建RDD,主要用於進行測試,可以在實際部署到集群運行之前,自己使用集合構造測試數據,來測試后面的spark應用的流程
(2) 使用本地文件創建RDD ,在本地臨時性地處理一些存儲了大量數據的文件
(3)HDFS文件創建RDD,主要用於測試大量數據
其實本地創建RDD和HDFS文件創建RDD是一樣的,只是在路徑上,要指明是HDFS
hdfs://spark1:9000/data.txt
scala> val linesLength = sc.textFile("hdfs://spark1:9000/rdd.txt").map(line => line.length).reduce(_+_) linesLength: Int = 9
-
常用的算子
算子 | 描述 |
collect() | 無參,以數組的形式返回RDD中的所有的元素:本質上:是將executor中運算得到的RDD--->拉取到Driver程序中來,形成Scala的集合 |
take(n) | 返回RDD中的前n個元素,無參時,默認為前10個 |
takeOrdered(n, [ordering]) | 和top類似,先排序(升序/降序),再取前n,不過獲取的元素的順序與top相反 |
takeSample(withReplacement, num, [seed]) | 返回一個隨機采樣數組,該數組由從RDD中采樣獲得,可以選擇是否用隨機數來替換不足的部分,seed用於指定隨機數生成器的種子 |
first() | 返回RDD的第一個元素(並不一定是最大的,看RDD是否排序,類似於take(1)) |
top(n) | 返回由RDD前n個最大的元素構成的數組(最大:元素具備可比性,默認取字典順序最大的) |
reduce(func) | 通過func函數來聚集RDD中的所有元素,且要求func必須滿足:1.可交換;2.可並聯。 |
reduceByKeyLocally(func:(V, V)=>V) | 功能與reduceByKey相同,以key為組進行聚合,但是 唯一不同的是:該算子返回的是一個Map[K, V]的集合 |
sum() | 只能作用於純數值形式的RDD,返回元素的總和 |
count() | 無參,()可以省略,返回RDD的元素個數 |
countByValue() | 無參,針對於任意類型的RDD,統計RDD中各種元素值及其出現的次數,返回Map[value,count]集合 |
countByKey() | 無參,針對於PairRDD,返回每種Key對應的元素的個數,返回Map[key, count]形式的Map集合 |
算子 | 描述 |
foreach(func) | 針對於RDD中的每一個元素,運行func進行更新 func 的沒有返回值(Unit) |
foreachPartition(func) | 以Partition為單位進行遍歷,遍歷每個分區。foreachPartition(func: Iterator[T]=>Unit): Unit |
算子 | 描述 |
saveAsTextFile(path) | 將RDD數據集中的元素,以textFile的格式保存到HDFS或者其他文件系統中去。對於每個元素,Spark程序都會調用toString()方法,將元素轉換為文本格式 |
saveAsSequenceFile(path) | 將數據集的元素以Hadoop SequenceFile的格式保存到指定的目錄中,可以使用HDFS或者其他Hadoop支持的文件系統 |
saveAsObjectFile(path) | 用於將RDD中的元素序列化為對象,存儲於磁盤中。對於HDFS,默認采用SequenceFile的格式存儲 |
-
collect算子
collect(): 收集數據,將RDD轉換為Scala的 Array數組 * 本質上:是將executor中運算得到的RDD--->拉取到Driver程序中來,形成Scala的集合
-
take 算子
* take(n): 獲取 RDD的前n個元素 * ---返回前n個元素組成的數組,而不是返回新的RDD(屬於Action類的算子)
-
first 算子
first(): 返回RDD的第一個元素(Scala變量),== take(1) * 並不會排序(區別於top())
-
top 算子
top(n):取 RDD的最大的前 n個元素,返回Array集合,屬於action算子; * * 1) 當為普通單值RDD時:按照元素值的字典順序,取最大的前n個; * * 2) 當為PairRDD時:先按照key值進行降序排序,當KEY值相同時,再按照value降序排序, * 最后取最大的前N個;
-
count算子
* count():統計 RDD的元素個數!
-
countByKey 算子:
* countByKey(): 針對於PairRDD,按照key統計每一種 key的元素的個數 * 統計的是每一種 key的數量(與value無關)
-
countByValue()
* countByValue(): 針對於各種RDD,統計其中每一種唯一的元素 的出現次數!! * 此處的value指的是RDD的元素,並不是k,v中的value; * 與是PairRDD、還是普通單值RDD無關! * ---返回Map集合,屬於action類算子
-
foreach 算子
* foreach(func): 用於遍歷 RDD,將函數func應用於每一個元素。 * -- 無返回值(不會返回新的RDD,也不會返回scala集合) * func必須為沒有返回值的方法(返回值為 Unit類型)
-
foreachPartition 算子
/** * foreachPartition():func中的參數iterator包含了一個分區中的所有元素構成的迭代器; * ---與foreach的效果類似,但是能夠以分區為單位進行處理,對於多數場景的處理效率要高! */
wc案例演示
scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").collect file: Array[String] = Array(word ruoze jepson, xiaohai ruoze word, dashu xiaohai jepson, xiaoshiqi) scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).collect file: Array[String] = Array(word, ruoze, jepson, xiaohai, ruoze, word, dashu, xiaohai, jepson, xiaoshiqi) scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).map((_,1)).collect file: Array[(String, Int)] = Array((word,1), (ruoze,1), (jepson,1), (xiaohai,1), (ruoze,1), (word,1), (dashu,1),
(xiaohai,1), (jepson,1), (xiaoshiqi,1)) scala> val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).collect file: Array[(String, Int)] = Array((word,2), (jepson,2), (ruoze,2), (xiaohai,2), (xiaoshiqi,1), (dashu,1))
scala>val file = sc.textFile("/home/hadoop/data/ruozedata.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).
sortBy(_._2,false).collect
scala> file
res0: Array[(String, Int)] = Array((word,2), (jepson,2), (ruoze,2), (xiaohai,2), (xiaoshiqi,1), (dashu,1))
下一節出常用Transiformation算子
參考博客:https://blog.csdn.net/qq_16759443/article/details/82801332