一:RDD簡介
(一)RDD概念
RDD(Resilient Distributed DataSet),彈性分布式數據集,是Spark中最基本,也是最重要的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知度調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能重用工作集,這極大地提升了查詢速度。因為有RDD,所以Spark才支持分布式的計算。RDD由分區組成。
(二)RDD的五個特性
(1)一組分片(Partition),即數據集的基本組成單位。---RDD會被分片處理,用於並行計算
對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。
用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。
默認值就是程序所分配到的CPU Core的數目。
(2)一個計算每個分區的函數。---一個對每個split(數據分區)進行計算的函數,也稱為RDD的算子
Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。
compute函數會對迭代器進行復合,不需要保存每次計算的結果。
(3)RDD之間的依賴關系。(DAG有向無環圖調度構造依賴關系)
RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。
在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
(4)一個Partitioner(分區器),即RDD的分片函數。---用來對RDD的數據做手動分區
當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。
只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。
Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
(5)一個列表,存儲存取每個分片(Partition)的優先位置(preferred location)。
對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。
按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
(三)RDD原理
(四)wordcount程序RDD執行流程
二:RDD創建方式
(一)通過讀取文件生成
由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等
val file = sc.textFile("/spark/input/c.txt")
(二)通過並行化的方式創建RDD
val array = Array(1,2,3,4,5) val rdd = sc.parallelize(array)
(三)其他方式
讀取數據庫等等其他的操作。也可以生成RDD。
RDD可以通過其他的RDD轉換而來的。
三:RDD編程API
Spark支持兩個類型(算子)操作:Transformation和Action
(一)Transformation---不會觸發計算,延時加載(計算)
主要做的是就是將一個已有的RDD生成另外一個RDD。Transformation具有lazy特性(延遲加載)。Transformation算子的代碼不會真正被執行。只有當我們的程序里面遇到一個action算子的時候,代碼才會真正的被執行。這種設計讓Spark更加有效率地運行。
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成 |
filter(func) |
返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成 |
flatMap(func) |
類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是 (Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) |
對源RDD和參數RDD求並集后返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集后返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重后返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
先按分區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(_+_,_+_) 對k/y的RDD進行操作 |
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey類似,但是更靈活 第一個參數是根據什么排序 第二個是怎么排序 false倒序 第三個排序后分區數 默認與原RDD一樣 |
join(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD 相當於內連接(求交集) |
cogroup(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) |
兩個RDD的笛卡爾積 的成很多個K/V |
pipe(command, [envVars]) |
調用外部程序 |
coalesce(numPartitions) |
重新分區 第一個參數是要分多少區,第二個參數是否shuffle 默認false 少分區變多分區 true 多分區變少分區 false |
repartition(numPartitions) |
重新分區 必須shuffle 參數是要分多少區 少變多 |
repartitionAndSortWithinPartitions(partitioner) |
重新分區+排序 比先分區再排序效率高 對K/V的RDD進行操作 |
foldByKey(zeroValue)(seqOp) |
該函數用於K/V做折疊,合並處理 ,與aggregate類似 第一個括號的參數應用於每個V值 第二括號函數是聚合例如:_+_ |
combineByKey |
合並相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) |
partitionBy(partitioner) |
對RDD進行分區 partitioner是分區器 例如new HashPartition(2 |
cache |
RDD緩存,可以避免重復計算從而減少時間,區別:cache內部調用了persist算子,cache默認就一個緩存級別MEMORY-ONLY ,而persist則可以選擇緩存級別 |
persist |
|
Subtract(rdd) |
返回前rdd元素不在后rdd的rdd |
leftOuterJoin |
leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。 |
rightOuterJoin |
rightOuterJoin類似於SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可 |
subtractByKey |
substractByKey和基本轉換操作中的subtract類似只不過這里是針對K的,返回在主RDD中出現,並且不在otherRDD中出現的元素 |
(二)Action:直接觸發計算
觸發代碼的運行,我們一段spark代碼里面至少需要有一個action操作。
動作 |
含義 |
reduce(func) |
通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的 |
collect() |
在驅動程序中,以數組的形式返回數據集的所有元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(類似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) |
返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) |
|
saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
saveAsObjectFile(path) |
|
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) |
在數據集的每一個元素上,運行函數func進行更新。 |
aggregate |
先對分區進行操作,在總體操作 |
reduceByKeyLocally |
|
lookup |
|
top |
|
fold |
|
foreachPartition |
|
(三)RDD算子示例
1.創建一個Int類型的RDD---parallelize
val rdd1 = sc.parallelize(Array(5,1,75,32,647,23,5))
2.對每個元素乘以2(map),並且排序(sortBy)---Transformation延遲計算
val rdd2 = rdd1.map(_*2).sortBy(x=>x,true) 第一個參數是匿名函數,第二個true表示升序,false降序
注意:x=>x是匿名函數,傳參是x,省略了類型,因為從數據中可以知道是Int --- (x:Int)=>{x} --- x=>x 返回值是x 根據返回值的大小進行排序
3.以數組的形式返回數據集的所有元素(collect)---Action立即執行
rdd2.collect
4.過濾掉大於100的數(filter)---Transformation延遲計算
val rdd3 = rdd2.filter(_<100)
5.flatMap嵌套展開,並執行內部函數---Transformation延遲計算
val rdd5 = rdd4.flatMap(_.split(" "))
6. 集合運算---Transformation延遲計算
並集:
val rdd8 = rdd6.union(rdd7)
交集:
val rdd9 = rdd6.intersection(rdd7)
7.分組操作groupByKey--- Transformation延遲計算
8.更多transformation和action算子示例,見:https://www.cnblogs.com/qingyunzong/p/8922135.html
四:RDD緩存機制---Transformation延遲計算
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。
(一)案例實驗
在Spark shell中讀入文件,並進行文件行的計數,然后我們對數據進行緩存后再執行計算,注意,這里執行了count之后才會觸發計算,將數據緩存下來,當再次執行count的時候,就會用到緩存的結果
第二次執行效率有所提高
五:RDD寬依賴和窄依賴
(一)寬依賴和窄依賴概念
每組依賴中左邊為父RDD,右邊為子RDD
窄依賴:是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操作都會產生窄依賴;(獨生子女)
寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產生寬依賴;(超生)
需要特別說明的是對join操作有兩種情況:
(1)圖中左半部分join:如果兩個RDD在進行join操作時,一個RDD的partition僅僅和另一個RDD中已知個數的Partition進行join,那么這種類型的join操作就是窄依賴,例如圖1中左半部分的join操作(join with inputs co-partitioned);---多個子RDD,對於每個子RDD都滿足窄依賴
(2)圖中右半部分join:其它情況的join操作就是寬依賴,例如圖1中右半部分的join操作(join with inputs not co-partitioned),由於是需要父RDD的所有partition進行join的轉換,這就涉及到了shuffle,因此這種類型的join操作也是寬依賴。
總之:沒有依賴關系的stage是可以並行執行的,DAG根據寬依賴來划分stage,每個寬依賴的處理均會是一個stage的划分點。同一個stage中的多個操作會在一個task中完成。因為子RDD的分區僅依賴於父RDD的一個分區,因此這些步驟可以串行執行。
(二)依賴關系下的數據流視圖
在spark中,會根據RDD之間的依賴關系將DAG圖(有向無環圖)划分為不同的階段,
對於窄依賴,由於partition依賴關系的確定性,partition的轉換處理就可以在同一個線程里完成,窄依賴就被spark划分到同一個stage中,
而對於寬依賴,只能等父RDD shuffle處理完成后,下一個stage才能開始接下來的計算。
因此spark划分stage的整體思路是:
從左往右推,遇到寬依賴就斷開,划分為一個stage;
遇到窄依賴就將這個RDD加入該stage中。
因此在圖2中RDD C,RDD D,RDD E,RDDF被構建在一個stage中,RDD A被構建在一個單獨的Stage中,而RDD B和RDD G又被構建在同一個stage中。
在spark中,Task的類型分為2種:ShuffleMapTask和ResultTask;
簡單來說,DAG的最后一個階段會為每個結果的partition生成一個ResultTask,即每個Stage里面的Task的數量是由該Stage中最后一個RDD的Partition的數量所決定的!而其余所有階段都會生成ShuffleMapTask;之所以稱之為ShuffleMapTask是因為它需要將自己的計算結果通過shuffle到下一個stage中;也就是說上圖中的stage1和stage2相當於mapreduce中的Mapper,而ResultTask所代表的stage3就相當於mapreduce中的reducer。
在之前動手操作了一個wordcount程序,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過區別在於:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據Key進行reduce,但spark除了這兩個算子還有其他的算子;因此從這個意義上來說,Spark比Hadoop的計算算子更為豐富。
六:RDD的Checkpoint(檢查點)機制:容錯機制
(一)檢查點是輔助RDD的lineage(血統)進行容錯的管理。
任務后面的步驟依賴於前面的步驟,如果一旦出現錯誤,則無法往后繼續計算,就必須從頭開始,這樣必然會使得效率降低。整個計算的周期就叫lineage,lineage越長,出錯的概率越大。所以我們可以設置一個檢查點,如果后面的計算過程出錯,則不需要從頭重新進行計算,則可以從檢查點的地方再次開始計算。
(二)RDD的檢查點有兩種類型
本地目錄(即檢查點的信息保存在本地的文件夾,一般用於開發和測試),HDFS目錄(保存在HDFS,多用於生產環境)
本地目錄:需要把spark-shell運行在本地模式上
sc.setCheckpointDir("/root/temp/sparkcheckpoint")
HDFS目錄:需要把spark-shell運行在集群模式上
sc.setCheckpointDir("hdfs://ns1/sparkcheckpoint")
七:Spark程序編寫---WordCount
(一)配置eclipse環境
https://blog.csdn.net/xummgg/article/details/50651867
(二)使用scala語言編寫---本地模式
package com.dt.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //設置運行模式為本地運行,不然默認是集群模式 conf.setMaster("local") //設置任務名 conf.setAppName("WordCount") //設置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) //讀取文件,生成RDD val file:RDD[String] = sc.textFile("E:\\storm-kafka\\a.txt") //將每一行數據進行嵌套展開、分割 val words:RDD[String] = file.flatMap(_.split(",")) //進行單詞映射 val wordOne:RDD[(String,Int)] = words.map((_,1)) //進行單詞計數 val wordCount:RDD[(String,Int)] = wordOne.reduceByKey(_+_) //按照單詞出現次數,降序排序 val wordSort:RDD[(String,Int)] = wordCount.sortBy(x=>x._2, false) //將結果保存 //wordSort.foreach(wordNumberPair => println(wordNumberPair._1 + " : " +wordNumberPair._2))//在命令行中打印該結果 wordSort.saveAsTextFile("E:\\Output") sc.stop() } }
(二)使用scala語言編寫---集群模式
package com.dt.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //設置運行模式為本地運行,不然默認是集群模式 //conf.setMaster("local") //默認是集群模式 master,是一個Spark、Mesos或者Yarn集群的URL,或者是local[*]; //設置任務名 conf.setAppName("WordCount") //appName,是用來在Spark UI上顯示的應用名稱; //設置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) //讀取文件,生成RDD val file:RDD[String] = sc.textFile("/spark/input/c.txt") //hdfs文件系統 //將每一行數據進行嵌套展開、分割 val words:RDD[String] = file.flatMap(_.split(" ")) //進行單詞映射 val wordOne:RDD[(String,Int)] = words.map((_,1)) //進行單詞計數 val wordCount:RDD[(String,Int)] = wordOne.reduceByKey(_+_) //按照單詞出現次數,降序排序 val wordSort:RDD[(String,Int)] = wordCount.sortBy(x=>x._2, false) //將結果顯示 wordSort.collect().foreach(wordNumberPair => println(wordNumberPair._1 + " : " +wordNumberPair._2))//在命令行中打印該結果 //wordSort.saveAsTextFile("E:\\Output") sc.stop() } }
導出jar包,進行job提交。需要指定url
[hadoop@hadoopH1 ~]$ spark-submit --class com.dt.spark.WordCount --master spark://hadoopH2:7077 --executor-memory 500m --total-executor-cores 1 ./wc.jar
(三)使用Java7編寫
package cn.spark.wc; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class WordCount { public static void main(String[] args) { //初始化sparkcontext SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("WordCount"); JavaSparkContext sc = new JavaSparkContext(conf); //讀取文件,生成RDD JavaRDD<String> fileRDD = sc.textFile("E:\\storm-kafka\\a.txt"); //將每一行數據按照","分割 JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(",")).iterator(); //返回列表迭代器 } }); //進行數據映射 JavaPairRDD<String,Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { //第一個是傳入參數:單詞,后面兩個參數:是返回的元組參數 @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<>(word,1); } }); //進行reduce操作 JavaPairRDD<String,Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { //前兩個是傳入參數,后一個類型是返回參數 @Override public Integer call(Integer i1, Integer i2) throws Exception { // TODO Auto-generated method stub return i1 + i2; } }); //下面進行排序操作sortByKey,由於是按照key進行排序,所以我們需要先將原來的元組<string,Integer>,變為<Integer,string>。---使用maptopair JavaPairRDD<Integer,String> countWordRDD = wordCountRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { // TODO Auto-generated method stub return new Tuple2<>(t._2,t._1); } }); //開始使用sortByKey進行排序 JavaPairRDD<Integer,String> sortCWRDD = countWordRDD.sortByKey(false); //默認升序,false降序 //將我們處理的數據從<Integer,String>轉換為<String,Integer> JavaPairRDD<String,Integer> resultRDD = sortCWRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { // TODO Auto-generated method stub return new Tuple2<>(t._2,t._1); } }); //結果輸出 resultRDD.saveAsTextFile("E:\\storm-kafka\\res"); } }
補充:運行常見錯誤(一)無法運行,出現(launch error)
配置項目運行時參數:
-Dspark.master=local
補充:運行常見錯誤(二)無法運行,出現(unsupported major.minor version 52.0)運行環境錯誤
項目編譯得到的class文件的版本高於運行環境中jre的版本號,高版本JDK編譯的class不能在低版本的jvm虛擬機下運行,否則就會報這類錯,因此無法運行!49,50,51,52是Java編譯器內部的版本號,版本對應信息如下:
Unsupported major.minor version 52.0 對應於 JDK1.8(JRE1.8) Unsupported major.minor version 51.0 對應於 JDK1.7(JRE1.7) Unsupported major.minor version 50.0 對應於 JDK1.6(JRE1.6) Unsupported major.minor version 49.0 對應於 JDK1.5(JRE1.5)
因此出現問題的原因就是:編譯產生的class文件是jdk1.8版本的,而jvm環境低於1.8,因此報錯:
修改Java運行環境:
運行結果查看:
(四)使用Java8編寫---lambda表達式
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class SparkWordCountWithJava8 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WortCount"); conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("E:\\storm-kafka\\a.txt"); JavaRDD<String> wordRdd = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator()); JavaPairRDD<String, Integer> wordOneRDD = wordRdd.mapToPair(word -> new Tuple2<>(word, 1)); JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((x, y) -> x + y); JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false); JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); resultRDD.saveAsTextFile("E:\\storm-kafka\\res"); }