Spark Core
一、什么是Spark?(官網:http://spark.apache.org)
1、什么是Spark?

我的翻譯:Spark是一個針對大規模數據處理的快速通用引擎。
Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成為Apache孵化項目,2014年2月成為Apache頂級項目。目前,Spark生態系統已經發展成為一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark部署在大量廉價硬件之上,形成集群。Spark得到了眾多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了很多生產系統的推薦算法;騰訊Spark集群達到8000台的規模,是當前已知的世界上最大的Spark集群。
2、為什么要學習Spark?
(*)Hadoop的MapReduce計算模型存在的問題:
學習過Hadoop的MapReduce的學員都知道,MapReduce的核心是Shuffle(洗牌)。在整個Shuffle的過程中,至少會產生6次的I/O。下圖是我們在講MapReduce的時候,畫的Shuffle的過程。
中間結果輸出:基於MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。另外,當一些查詢(如:Hive)翻譯到MapReduce任務時,往往會產生多個Stage(階段),而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每一個Stage的輸出結果,而I/O的效率往往較低,從而影響了MapReduce的運行速度。
(*)Spark的最大特點:基於內存
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。
3、Spark的特點:快、易用、通用、兼容性
(*)快
與Hadoop的MapReduce相比,Spark基於內存的運算速度要快100倍以上,即使,Spark基於硬盤的運算也要快10倍。Spark實現了高效的DAG執行引擎,從而可以通過內存來高效處理數據流。
(*)易用
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構建不同的應用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法。

(*)通用
Spark提供了統一的解決方案。Spark可以用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平台去處理遇到的問題,減少開發和維護的人力成本和部署平台的物力成本。
另外Spark還可以很好的融入Hadoop的體系結構中可以直接操作HDFS,並提供Hive on Spark、Pig on Spark的框架集成Hadoop。
(*)兼容性
Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,器,並且可以處理所有Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spark的強大處理能力。Spark也可以不依賴於第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。
二、Spark的體系結構與安裝部署
1、Spark集群的體系結構
官方的一張圖:
我自己的一張圖:
2、Spark的安裝與部署
Spark的安裝部署方式有以下幾種模式:
Standalone
YARN
Mesos
Amazon EC2
(*)Spark Standalone偽分布的部署
l 配置文件:conf/spark-env.sh
export JAVA_HOME=/root/training/jdk1.7.0_75
export SPARK_MASTER_HOST=spark81
export SPARK_MASTER_PORT=7077
下面的可以不寫,默認
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
spark81
(*)Spark Standalone全分布的部署
l 配置文件:conf/spark-env.sh
export JAVA_HOME=/root/training/jdk1.7.0_75
export SPARK_MASTER_HOST=spark82
export SPARK_MASTER_PORT=7077
下面的可以不寫,默認
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
spark83
spark84
(*)啟動Spark集群:start-all.sh(會和hadoop的start-all.sh有沖突,可以設置他們的環境變量為不同的名字)
3、Spark HA的實現
(*)基於文件系統的單點恢復
主要用於開發或測試環境。當spark提供目錄保存spark Application和worker的注冊信息,並將他們的恢復狀態寫入該目錄中,這時,一旦Master發生故障,就可以通過重新啟動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的注冊信息。
基於文件系統的單點恢復,主要是在spark-en.sh里對SPARK_DAEMON_JAVA_OPTS設置
| 配置參數 |
參考值 |
| spark.deploy.recoveryMode |
設置為FILESYSTEM開啟單點恢復功能,默認值:NONE |
| spark.deploy.recoveryDirectory |
Spark 保存恢復狀態的目錄 |
參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
測試:
1、在spark82上啟動Spark集群
2、在spark83上啟動spark shell
MASTER=spark://spark82:7077 spark-shell
3、在spark82上停止master
stop-master.sh
4、觀察spark83上的輸出:
5、在spark82上重啟master
start-master.sh
(*)基於Zookeeper的Standby Masters
ZooKeeper提供了一個Leader Election機制,利用這個機制可以保證雖然集群存在多個Master,但是只有一個是Active的,其他的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。由於集群的信息,包括Worker, Driver和Application的信息都已經持久化到ZooKeeper,因此在切換的過程中只會影響新Job的提交,對於正在進行的Job沒有任何的影響。加入ZooKeeper的集群整體架構如下圖所示。
|
|
參考值 |
| spark.deploy.recoveryMode |
設置為ZOOKEEPER開啟單點恢復功能,默認值:NONE |
| spark.deploy.zookeeper.url |
ZooKeeper集群的地址 |
| spark.deploy.zookeeper.dir |
Spark信息在ZK中的保存目錄,默認:/spark |
l 參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"
l 另外:每個節點上,需要將以下兩行注釋掉。
l ZooKeeper中保存的信息

三、執行Spark Demo程序
1、執行Spark Example程序
(*)示例程序:$SPARK_HOME/examples/jars/spark-examples_2.11-2.1.0.jar
(*)所有的示例程序:$EXAMPLE_HOME/examples/src/main
有Java、Scala等等等
(*)Demo:蒙特卡羅求PI
命令:
spark-submit --master spark://spark81:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
2、使用Spark Shell
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶可以在該命令行下用scala編寫spark程序。
(*)啟動Spark Shell:spark-shell
也可以使用以下參數:
參數說明:
--master spark://spark81:7077 指定Master的地址
--executor-memory 2g 指定每個worker可用內存為2G
--total-executor-cores 2 指定整個集群使用的cup核數為2個
例如:
spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
(*)注意:
如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。
請注意local模式和集群模式的日志區別:
(*)在Spark Shell中編寫WordCount程序
程序如下:
sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
說明:
sc是SparkContext對象,該對象時提交spark程序的入口
textFile("hdfs://192.168.88.111:9000/data/data.txt")是hdfs中讀取數據
flatMap(_.split(" "))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(_+_)按照key進行reduce,並將value累加
saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")將結果寫入到hdfs中
3、在IDEA中編寫WordCount程序
(*)需要的jar包:$SPARK_HOME/jars/*.jar
(*)創建Scala Project,並創建Scala Object、或者Java Class
(*)書寫源代碼,並打成jar包,上傳到Linux
==========================Scala版本==========================
(*)運行程序:
spark-submit --master spark://spark81:7077 --class mydemo.WordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt hdfs://192.168.88.111:9000/output/spark/wc1
====================Java版本(直接輸出在屏幕)====================
(*)運行程序:
spark-submit --master spark://spark81:7077 --class mydemo.JavaWordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt
四、Spark運行機制及原理分析
1、WordCount執行的流程分析
需要看源碼一步步看。
2、Spark提交任務的流程

3.Spark工作機制


五、Spark的算子
1、RDD基礎
- 什么是RDD?
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
- RDD的屬性(源碼中的一段話)

² 一組分片(Partition),即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。
² 一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。
² RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
² 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
² 一個列表,存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
RDD的創建方式
- 通過外部的數據文件創建,如HDFS
val rdd1 = sc.textFile(“hdfs://192.168.88.111:9000/data/data.txt”)
- 通過sc.parallelize進行創建
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
- RDD的類型:Transformation和Action
RDD 的基本原理
2、Transformation
RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
| 轉換 |
含義 |
| map(func) |
返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成 |
| filter(func) |
返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成 |
| flatMap(func) |
類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
| mapPartitions(func) |
類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
| mapPartitionsWithIndex(func) |
類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U] |
| sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
| union(otherDataset) |
對源RDD和參數RDD求並集后返回一個新的RDD |
| intersection(otherDataset) |
對源RDD和參數RDD求交集后返回一個新的RDD |
| distinct([numTasks])) |
對源RDD進行去重后返回一個新的RDD |
| groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
| reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
| aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) |
|
| sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
| sortBy(func,[ascending], [numTasks]) |
與sortByKey類似,但是更靈活 |
| join(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
| cogroup(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
| cartesian(otherDataset) |
笛卡爾積 |
| pipe(command, [envVars]) |
|
| coalesce(numPartitions) |
|
| repartition(numPartitions) |
|
| repartitionAndSortWithinPartitions(partitioner) |
|
3、Action
| 動作 |
含義 |
| reduce(func) |
通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的 |
| collect() |
在驅動程序中,以數組的形式返回數據集的所有元素 |
| count() |
返回RDD的元素個數 |
| first() |
返回RDD的第一個元素(類似於take(1)) |
| take(n) |
返回一個由數據集的前n個元素組成的數組 |
| takeSample(withReplacement,num, [seed]) |
返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
| takeOrdered(n, [ordering]) |
|
| saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
| saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
| saveAsObjectFile(path) |
|
| countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
| foreach(func) |
在數據集的每一個元素上,運行函數func進行更新。 |
4、RDD的緩存機制
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
Demo示例:
通過UI進行監控:
5、RDD的Checkpoint(檢查點)機制:容錯機制
檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。
設置checkpoint的目錄,可以是本地的文件夾、也可以是HDFS。一般是在具有容錯能力,高可靠的文件系統上(比如HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據。
分別舉例說明:
l 本地目錄
注意:這種模式,需要將spark-shell運行在本地模式上

l HDFS的目錄
注意:這種模式,需要將spark-shell運行在集群模式上
l 源碼中的一段話
6、RDD的依賴關系和Spark任務中的Stage
l RDD的依賴關系
RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
- 窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
總結:窄依賴我們形象的比喻為獨生子女
- 寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
總結:窄依賴我們形象的比喻為超生
l Spark任務中的Stage
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG划分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據。
7、RDD基礎練習
- 練習1:
//通過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1里的每一個元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以數組的方式在客戶端顯示
rdd3.collect
- 練習2:
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1里面的每一個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
- 練習3:
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
- 練習4:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
rdd4.groupByKey
rdd4.collect
- 練習5:
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect
- 練習6:
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
- 練習7:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
六、Spark RDD的高級算子
1、mapPartitionsWithIndex
把每個partition中的分區號和對應的值拿出來
接收一個函數參數:
l 第一個參數:分區號
l 第二個參數:分區中的元素
示例:將每個分區中的元素和分區號打印出來。
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
創建一個函數返回RDD中的每個分區號和元素:
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
調用:rdd1.mapPartitionsWithIndex(func1).collect
2、aggregate
先對局部聚合,再對全局聚合
示例:val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
查看每個分區中的元素:
將每個分區中的最大值求和,注意:初始值是0;
如果初始值時候10,則結果為:30
如果是求和,注意:初始值是0:
如果初始值是10,則結果是:45
一個字符串的例子:
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
修改一下剛才的查看分區元素的函數
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
兩個分區中的元素:
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
運行結果:
更復雜一點的例子
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
結果可能是:”24”,也可能是:”42”
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
結果是:”10”,也可能是”01”,
原因:注意有個初始值””,其長度0,然后0.toString變成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
結果是:”11”,原因同上。
3、aggregateByKey
准備數據:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
兩個分區中的元素:
示例:
將每個分區中的動物最多的個數求和
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
將每種動物個數求和
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
這個例子也可以使用:reduceByKey
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
4、coalesce與repartition
都是將RDD中的分區進行重分區。
區別是:coalesce默認不會進行shuffle(false);而repartition會進行shuffle(true),即:會將數據真正通過網絡進行重分區。
示例:
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
下面兩句話是等價的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) --->如果是false,查看RDD的length依然是2
5、其他高級算子
參考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
