
fold 操作 區別 與 co 1.mapValus 2.flatMapValues 3.comineByKey 4.foldByKey 5.reduceByKey 6.groupByKey 7.sortByKey 8.cogroup 9.join 10.LeftOutJoin 11.RightOutJoin 1.map(func) 2.flatMap(func) 3.mapPartitions(func) 4.mapPartitionsWithIndex(func) 5.simple(withReplacement,fraction,seed) 6.union(ortherDataset) 7.intersection(otherDataset) 8.distinct([numTasks]) 9.cartesian(otherDataset) 10.coalesce(numPartitions,shuffle) 11.repartition(numPartition) 12.glom() 13.randomSplit(weight:Array[Double],seed)
RDD簡介 在集群背后,有一個非常重要的分布式數據架構,即彈性分布式數據集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的並能夠被並行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到內存中,每次對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來說,效率提升比較大。 (1)RDD的特點 1)創建:只能通過轉換 ( transformation ,如map/filter/groupBy/join 等,區別於動作 action) 從兩種數據源中創建 RDD 1 )穩定存儲中的數據; 2 )其他 RDD。 2)只讀:狀態不可變,不能修改。 3)分區:支持使 RDD 中的元素根據那個 key 來分區 ( partitioning ) ,保存到多個結點上。還原時只會重新計算丟失分區的數據,而不會影響整個系統。 4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的信息關於它是如何從其他 RDD 產生而來的。 5)持久化:支持將會被重用的 RDD 緩存 ( 如 in-memory 或溢出到磁盤 )。 6)延遲計算: Spark 也會延遲計算 RDD ,使其能夠將轉換管道化 (pipeline transformation)。 7)操作:豐富的轉換(transformation)和動作 ( action ) , count/reduce/collect/save 等。 執行了多少次transformation操作,RDD都不會真正執行運算(記錄lineage),只有當action操作被執行時,運算才會觸發。 (2)RDD的好處 1)RDD只能從持久存儲或通過Transformations操作產生,相比於分布式共享內存(DSM)可以更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。 2)RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。 3)RDD的數據分區特性,可以通過數據的本地性來提高性能,這不Hadoop MapReduce是一樣的。 4)RDD都是可序列化的,在內存不足時可自動降級為磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的下降但不會差於現在的MapReduce。 5)批量操作:任務能夠根據數據本地性 (data locality) 被分配,從而提高性能。 (3)RDD的內部屬性 通過RDD的內部屬性,用戶可以獲取相應的元數據信息。通過這些信息可以支持更復雜的算法或優化。 1)分區列表:通過分區列表可以找到一個RDD中包含的所有分區及其所在地址。 2)計算每個分片的函數:通過函數可以對每個數據塊進行RDD需要進行的用戶自定義函數運算。 3)對父RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。 4)可選:key-value型的RDD是根據哈希來分區的,類似於mapreduce當中的Paritioner接口,控制key分到哪個reduce。 5)可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,可以將處理的分區“本地化”) [java] view plain copy //只計算一次 protected def getPartitions: Array[Partition] //對一個分片進行計算,得出一個可遍歷的結果 def compute(split: Partition, context: TaskContext): Iterator[T] //只計算一次,計算RDD對父RDD的依賴 protected def getDependencies: Seq[Dependency[_]] = deps //可選的,分區的方法,針對第4點,類似於mapreduce當中的Paritioner接口,控制key分到哪個reduce @transient val partitioner: Option[Partitioner] = None //可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil (4)RDD的存儲與分區 1)用戶可以選擇不同的存儲級別存儲RDD以便重用。 2)當前RDD默認是存儲於內存,但當內存不足時,RDD會spill到disk。 3)RDD在需要進行分區把數據分布於集群中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。 RDD根據useDisk、useMemory、useOffHeap、deserialized、replication參數的組合定義了以下存儲級別: [java] view plain copy //存儲等級定義: val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) (5)RDD的容錯機制 RDD的容錯機制實現分布式數據集容錯方法有兩種:數據檢查點和記錄更新,RDD采用記錄更新的方式:記錄所有更新點的成本很高。所以,RDD只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操作,然后創建某個RDD的變換序列(血統 lineage)存儲下來;變換序列指,每個RDD都包含了它是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統”容錯。 要實現這種“血統”容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關系。實際上依賴關系可以分兩種,窄依賴和寬依賴。窄依賴:子RDD中的每個數據塊只依賴於父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊可以依賴於父RDD中的所有數據塊。例如:map變換,子RDD中的數據塊只依賴於父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴於多塊父RDD中的數據塊,因為一個key可能分布於父RDD的任何一個數據塊中, 將依賴關系分類的兩個特性:第一,窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成之后,並且父RDD的計算結果進行hash並傳到對應節點上之后才能計算子RDD。第二,數據丟失時,對於窄依賴只需要重新計算丟失的那一塊數據來恢復;對於寬依賴則要將祖先RDD中的所有數據塊全部重新計算來恢復。所以在“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查點。也是這兩個特性要求對於不同依賴關系要采取不同的任務調度機制和容錯恢復機制。 (6)Spark計算工作流 圖1-5中描述了Spark的輸入、運行轉換、輸出。在運行轉換中通過算子對RDD進行轉換。算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。 ·輸入:在Spark程序運行中,數據從外部數據空間(例如,HDFS、Scala集合或數據)輸入到Spark,數據就進入了Spark運行時數據空間,會轉化為Spark中的數據塊,通過BlockManager進行管理。 ·運行:在Spark數據輸入形成RDD后,便可以通過變換算子fliter等,對數據操作並將RDD轉化為新的RDD,通過行動(Action)算子,觸發Spark提交作業。如果數據需要復用,可以通過Cache算子,將數據緩存到內存。 ·輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS)或Scala數據或集合中(collect輸出到Scala集合,count返回Scala Int型數據)。 Spark的核心數據模型是RDD,但RDD是個抽象類,具體由各子類實現,如MappedRDD、ShuffledRDD等子類。Spark將常用的大數據操作都轉化成為RDD的子類。 RDD編程模型 來看一段代碼:textFile算子從HDFS讀取日志文件,返回“file”(RDD);filter算子篩出帶“ERROR”的行,賦給 “errors”(新RDD);cache算子把它緩存下來以備未來使用;count算子返回“errors”的行數。RDD看起來與Scala集合類型 沒有太大差別,但它們的數據和運行模型大相迥異。 上圖給出了RDD數據模型,並將上例中用到的四個算子映射到四種算子類型。Spark程序工作在兩個空間中:Spark RDD空間和Scala原生數據空間。在原生數據空間里,數據表現為標量(scalar,即Scala基本類型,用橘色小方塊表示)、集合類型(藍色虛線 框)和持久存儲(紅色圓柱)。 下圖描述了Spark運行過程中通過算子對RDD進行轉換, 算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。 圖1 兩個空間的切換,四類不同的RDD算子 輸入算子(橘色箭頭)將Scala集合類型或存儲中的數據吸入RDD空間,轉為RDD(藍色實線框)。輸入算子的輸入大致有兩類:一類針對 Scala集合類型,如parallelize;另一類針對存儲數據,如上例中的textFile。輸入算子的輸出就是Spark空間的RDD。 因為函數語義,RDD經過變換(transformation)算子(藍色箭頭)生成新的RDD。變換算子的輸入和輸出都是RDD。RDD會被划分 成很多的分區 (partition)分布到集群的多個節點中,圖1用藍色小方塊代表分區。注意,分區是個邏輯概念,變換前后的新舊分區在物理上可能是同一塊內存或存 儲。這是很重要的優化,以防止函數式不變性導致的內存需求無限擴張。有些RDD是計算的中間結果,其分區並不一定有相應的內存或存儲與之對應,如果需要 (如以備未來使用),可以調用緩存算子(例子中的cache算子,灰色箭頭表示)將分區物化(materialize)存下來(灰色方塊)。 一部分變換算子視RDD的元素為簡單元素,分為如下幾類: 輸入輸出一對一(element-wise)的算子,且結果RDD的分區結構不變,主要是map、flatMap(map后展平為一維RDD); 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union(兩個RDD合為一個)、coalesce(分區減少); 從輸入中選擇部分元素的算子,如filter、distinct(去除冗余元素)、subtract(本RDD有、它RDD無的元素留下來)和sample(采樣)。 另一部分變換算子針對Key-Value集合,又分為: 對單個RDD做element-wise運算,如mapValues(保持源RDD的分區方式,這與map不同); 對單個RDD重排,如sort、partitionBy(實現一致性的分區划分,這個對數據本地性優化很重要,后面會講); 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey; 對兩個RDD基於key進行join和重組,如join、cogroup。 后三類操作都涉及重排,稱為shuffle類操作。 從RDD到RDD的變換算子序列,一直在RDD空間發生。這里很重要的設計是lazy evaluation:計算並不實際發生,只是不斷地記錄到元數據。元數據的結構是DAG(有向無環圖),其中每一個“頂點”是RDD(包括生產該RDD 的算子),從父RDD到子RDD有“邊”,表示RDD間的依賴性。Spark給元數據DAG取了個很酷的名字,Lineage(世系)。這個 Lineage也是前面容錯設計中所說的日志更新。 Lineage一直增長,直到遇上行動(action)算子(圖1中的綠色箭頭),這時 就要evaluate了,把剛才累積的所有算子一次性執行。行動算子的輸入是RDD(以及該RDD在Lineage上依賴的所有RDD),輸出是執行后生 成的原生數據,可能是Scala標量、集合類型的數據或存儲。當一個算子的輸出是上述類型時,該算子必然是行動算子,其效果則是從RDD空間返回原生數據空間。 RDD運行邏輯 如圖所示,在Spark應用中,整個執行流程在邏輯上運算之間會形成有向無環圖。Action算子觸發之后會將所有累積的算子形成一個有向無環圖,然后由調度器調度該圖上的任務進行運算。Spark的調度方式與MapReduce有所不同。Spark根據RDD之間不同的依賴關系切分形成不同的階段(Stage),一個階段包含一系列函數進行流水線執行。圖中的A、B、C、D、E、F、G,分別代表不同的RDD,RDD內的一個方框代表一個數據塊。數據從HDFS輸入Spark,形成RDD A和RDD C,RDD C上執行map操作,轉換為RDD D,RDD B和RDD F進行join操作轉換為G,而在B到G的過程中又會進行Shuffle。最后RDD G通過函數saveAsSequenceFile輸出保存到HDFS中。 RDD依賴關系 RDD的依賴關系如下圖所示: 窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每個分區都只被子 RDD 的一個分區所使用,例如map、filter。相應的,那么寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴,例如groupByKey、reduceByKey等操作。如果父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,否則的話就是寬依賴。 這種划分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基於一對一的關系,可以在 filter 之后執行 map 。其次,窄依賴支持更高效的故障還原。因為對於窄依賴,只有丟失的父 RDD 的分區需要重新計算。而對於寬依賴,一個結點的故障可能導致來自所有父 RDD 的分區丟失,因此就需要完全重新執行。因此對於寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出一樣。 特別說明:對於join操作有兩種情況,如果join操作的使用每個partition僅僅和已知的Partition進行join,此時的join操作就是窄依賴;其他情況的join操作就是寬依賴;因為是確定的Partition數量的依賴關系,所以就是窄依賴,得出一個推論,窄依賴不僅包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨着RDD數據規模的改變而改變) 如何划分Stage如下圖所示: Stage划分的依據就是寬依賴,什么時候產生寬依賴呢?例如reduceByKey,groupByKey等Action。 1.從后往前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到Stage中; 2.每個Stage里面的Task的數量是由該Stage中最后一個RDD的Partition數量決定的; 3.最后一個Stage里面的任務的類型是ResultTask,前面所有其他Stage里面的任務類型都是ShuffleMapTask; 4.代表當前Stage的算子一定是該Stage的最后一個計算步驟; 補充:Hadoop中的MapReduce操作中的Mapper和Reducer在Spark中基本等量算子是:map、reduceByKey;在一個Stage內部,首先是算子合並,也就是所謂的函數式編程的執行的時候最終進行函數的展開從而把一個Stage內部的多個算子合並成為一個大算子(其內部包含了當前Stage中所有算子對數據的計算邏輯);其次是由於Transformation操作的Lazy特性!!在具體算子交給集群的Executor計算之前,首先會通過Spark Framework(DAGScheduler)進行算子的優化。 RDD如何操作 (1)RDD的創建方式 1)從Hadoop文件系統(或與Hadoop兼容的其他持久化存儲系統,如Hive、Cassandra、HBase)輸入(例如HDFS)創建。 2)從父RDD轉換得到新RDD。 3)通過parallelize或makeRDD將單機數據創建為分布式RDD。 (2)RDD的兩種操作算子 對於RDD可以有兩種操作算子:轉換(Transformation)與行動(Action)。 1)轉換(Transformation):Transformation操作是延遲計算的,也就是說從一個RDD轉換生成另一個RDD的轉換操作不是馬上執行,需要等到有Action操作的時候才會真正觸發運算。 2)行動(Action):Action算子會觸發Spark提交作業(Job),並將數據輸出Spark系統。 1.Transformation具體內容: 2.Action具體內容: 總結 相比MapReduce,Spark提供了更加優化和復雜的執行流。讀者還可以深入了解Spark的運行機制與Spark算子,這樣能更加直觀地了解API的使用。Spark提供了更加豐富的函數式算子,這樣就為Spark上層組件的開發奠定了堅實的基礎。后續文章將詳細介紹Spark算子源代碼及示例。
最近在閱讀源碼,發現這篇博客內容非常好,有助於快速理解代碼。 1、什么是RDD? 上一章講了Spark提交作業的過程,這一章我們要講RDD。簡單的講,RDD就是Spark的input,知道input是啥吧,就是輸入的數據。 RDD的全名是Resilient Distributed Dataset,意思是容錯的分布式數據集,每一個RDD都會有5個特征: 1、有一個分片列表。就是能被切分,和hadoop一樣的,能夠切分的數據才能並行計算。 2、有一個函數計算每一個分片,這里指的是下面會提到的compute函數。 3、對其他的RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。 4、可選:key-value型的RDD是根據哈希來分區的,類似於mapreduce當中的Paritioner接口,控制key分到哪個reduce。 5、可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。 對應着上面這幾點,我們在RDD里面能找到這4個方法和1個屬性,別着急,下面我們會慢慢展開說這5個東東。 //只計算一次 protected def getPartitions: Array[Partition] //對一個分片進行計算,得出一個可遍歷的結果 def compute(split: Partition, context: TaskContext): Iterator[T] //只計算一次,計算RDD對父RDD的依賴 protected def getDependencies: Seq[Dependency[_]] = deps //可選的,分區的方法,針對第4點,類似於mapreduce當中的Paritioner接口,控制key分到哪個reduce @transient val partitioner: Option[Partitioner] = None //可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil 2、多種RDD之間的轉換 下面用一個實例講解一下吧,就拿我們常用的一段代碼來講吧,然后會把我們常用的RDD都會講到。 val hdfsFile = sc.textFile(args(1)) val flatMapRdd = hdfsFile.flatMap(s => s.split(" ")) val filterRdd = flatMapRdd.filter(_.length == 2) val mapRdd = filterRdd.map(word => (word, 1)) val reduce = mapRdd.reduceByKey(_ + _) 這里涉及到很多個RDD,textFile是一個HadoopRDD經過map后的MappredRDD,經過flatMap是一個FlatMappedRDD,經過filter方法之后生成了一個FilteredRDD,經過map函數之后,變成一個MappedRDD,通過隱式轉換成 PairRDD,最后經過reduceByKey。 我們首先看textFile的這個方法,進入SparkContext這個方法,找到它。 def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) } 看它的輸入參數,path,TextInputFormat,LongWritable,Text,同志們聯想到什么?寫過mapreduce的童鞋都應該知道哈。 1、hdfs的地址 2、InputFormat的類型 3、Mapper的第一個類型 4、Mapper的第二類型 這就不難理解為什么立馬就對hadoopFile后面加了一個map方法,取pair的第二個參數了,最后在shell里面我們看到它是一個MappredRDD了。 那么現在如果大家要用的不是textFile,而是一個別的hadoop文件類型,大家會不會使用hadoopFile來得到自己要得到的類型呢,不要告訴我不會哈,不會的趕緊回去復習mapreduce。 言歸正傳,默認的defaultMinPartitions的2太小了,我們用的時候還是設置大一點吧。 2.1 HadoopRDD 我們繼續追殺下去,看看hadoopFile方法,里面我們看到它做了3個操作。 1、把hadoop的配置文件保存到廣播變量里。 2、設置路徑的方法 3、new了一個HadoopRDD返回 好,我們接下去看看HadoopRDD這個類吧,我們重點看看它的getPartitions、compute、getPreferredLocations。 先看getPartitions,它的核心代碼如下: val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } 它調用的是inputFormat自帶的getSplits方法來計算分片,然后把分片HadoopPartition包裝到到array里面返回。 這里順便順帶提一下,因為1.0又出來一個NewHadoopRDD,它使用的是mapreduce新api的inputformat,getSplits就不要有minPartitions了,別的邏輯都是一樣的,只是使用的類有點區別。 我們接下來看compute方法,它的輸入值是一個Partition,返回是一個Iterator[(K, V)]類型的數據,這里面我們只需要關注2點即可。 1、把Partition轉成HadoopPartition,然后通過InputSplit創建一個RecordReader 2、重寫Iterator的getNext方法,通過創建的reader調用next方法讀取下一個值。 // 轉換成HadoopPartition val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null val jobConf = getJobConf() val inputFormat = getInputFormat(jobConf) context.stageId, theSplit.index, context.attemptId.toInt, jobConf) // 通過Inputform的getRecordReader來創建這個InputSpit的Reader reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // 調用Reader的next方法 val key: K = reader.createKey() val value: V = reader.createValue() override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) } 從這里我們可以看得出來compute方法是通過分片來獲得Iterator接口,以遍歷分片的數據。 getPreferredLocations方法就更簡單了,直接調用InputSplit的getLocations方法獲得所在的位置。 2.2 依賴 下面我們看RDD里面的map方法 def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) 直接new了一個MappedRDD,還把匿名函數f處理了再傳進去,我們繼續追殺到MappedRDD。 private[spark] class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) extends RDD[U](prev) { override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f) } MappedRDD把getPartitions和compute給重寫了,而且都用到了firstParent[T],這個firstParent是何須人也?我們可以先點擊進入RDD[U](prev)這個構造函數里面去。 def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) 就這樣你會發現它把RDD復制給了deps,HadoopRDD成了MappedRDD的父依賴了,這個OneToOneDependency是一個窄依賴,子RDD直接依賴於父RDD,繼續看firstParent。 protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]] } 由此我們可以得出兩個結論: 1、getPartitions直接沿用了父RDD的分片信息 2、compute函數是在父RDD遍歷每一行數據時套一個匿名函數f進行處理 好吧,現在我們可以理解compute函數真正是在干嘛的了 它的兩個顯著作用: 1、在沒有依賴的條件下,根據分片的信息生成遍歷數據的Iterable接口 2、在有前置依賴的條件下,在父RDD的Iterable接口上給遍歷每個元素的時候再套上一個方法 我們看看點擊進入map(f)的方法進去看一下 def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] { def hasNext = self.hasNext def next() = f(self.next()) } 看黃色的位置,看它的next函數,不得不說,寫得真的很妙! 我們接着看RDD的flatMap方法,你會發現它和map函數幾乎沒什么區別,只是RDD變成了FlatMappedRDD,但是flatMap和map的效果還是差別挺大的。 比如((1,2),(3,4)), 如果是調用了flatMap函數,我們訪問到的就是(1,2,3,4)4個元素;如果是map的話,我們訪問到的就是(1,2),(3,4)兩個元素。 有興趣的可以去看看FlatMappedRDD和FilteredRDD這里就不講了,和MappedRDD類似。 2.3 reduceByKey 前面的RDD轉換都簡單,可是到了reduceByKey可就不簡單了哦,因為這里有一個同相同key的內容聚合的一個過程,所以它是最復雜的那一類。 那reduceByKey這個方法在哪里呢,它在PairRDDFunctions里面,這是個隱式轉換,所以比較隱蔽哦,你在RDD里面是找不到的。 def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } 它調用的是combineByKey方法,過程過程蠻復雜的,折疊起來,喜歡看的人看看吧。 def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { // 一般的RDD的partitioner是None,這個條件不成立,即使成立只需要對這個數據做一次按key合並value的操作即可 self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { // 默認是走的這個方法,需要map端的combinber. val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // 不需要map端的combine,直接就來shuffle val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } } 按照一個比較標准的流程來看的話,應該是走的中間的這條路徑,它干了三件事: 1、給每個分片的數據在外面套一個combineValuesByKey方法的MapPartitionsRDD。 2、用MapPartitionsRDD來new了一個ShuffledRDD出來。 3、對ShuffledRDD做一次combineCombinersByKey。 下面我們先看MapPartitionsRDD,我把和別的RDD有別的兩行給拿出來了,很明顯的區別,f方法是套在iterator的外邊,這樣才能對iterator的所有數據做一個合並。 override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def compute(split: Partition, context: TaskContext) = f(context, split.index, firstParent[T].iterator(split, context)) } 接下來我們看Aggregator的combineValuesByKey的方法吧。 def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { // 是否使用外部排序,是由參數spark.shuffle.spill,默認是true if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // 用map來去重,用update方法來更新值,如果沒值的時候,返回值,如果有值的時候,通過mergeValue方法來合並 // mergeValue方法就是我們在reduceByKey里面寫的那個匿名函數,在這里就是(_ + _) while (iter.hasNext) { kv = iter.next() combiners.changeValue(kv._1, update) } combiners.iterator } else { // 用了一個外部排序的map來去重,就不停的往里面插入值即可,基本原理和上面的差不多,區別在於需要外部排序 val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { val (k, v) = iter.next() combiners.insert(k, v) } combiners.iterator } 這個就是一個很典型的按照key來做合並的方法了,我們繼續看ShuffledRDD吧。 ShuffledRDD和之前的RDD很明顯的特征是 1、它的依賴傳了一個Nil(空列表)進去,表示它沒有依賴。 2、它的compute計算方式比較特別,這個在之后的文章說,過程比較復雜。 3、它的分片默認是采用HashPartitioner,數量和前面的RDD的分片數量一樣,也可以不一樣,我們可以在reduceByKey的時候多傳一個分片數量即可。 在new完ShuffledRDD之后又來了一遍mapPartitionsWithContext,不過調用的匿名函數變成了combineCombinersByKey。 combineCombinersByKey和combineValuesByKey的邏輯基本相同,只是輸入輸出的類型有區別。combineCombinersByKey只是做單純的合並,不會對輸入輸出的類型進行改變,combineValuesByKey會把iter[K, V]的V值變成iter[K, C]。 case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) ...... } 這個方法會根據我們傳進去的匿名方法的參數的類型做一個自動轉換。 到這里,作業都沒有真正執行,只是將RDD各種嵌套,我們通過RDD的id和類型的變化觀測到這一點,RDD[1]->RDD[2]->RDD[3]...... 3、其它RDD 平常我們除了從hdfs上面取數據之后,我們還可能從數據庫里面取數據,那怎么辦呢?沒關系,有個JdbcRDD! val rdd = new JdbcRDD( sc, () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 100, 3, (r: ResultSet) => { r.getInt(1) } ).cache() 前幾個參數大家都懂,我們重點說一下后面1, 100, 3是咋回事? 在這個JdbcRDD里面它默認我們是會按照一個long類型的字段對數據進行切分,(1,100)分別是最小值和最大值,3是分片的數量。 比如我們要一次查ID為1-1000,000的的用戶,分成10個分片,我們就填(1, 1000,000, 10)即可,在sql語句里面還必須有"? <= ID AND ID <= ?"的句式,別嘗試着自己造句哦! 最后是怎么處理ResultSet的方法,自己愛怎么處理怎么處理去吧。不過確實覺着用得不方便的可以自己重寫一個RDD。 小結: 這一章重點介紹了各種RDD那5個特征,以及RDD之間的轉換,希望大家可以對RDD有更深入的了解,下一章我們將要講作業的運行過程,敬請關注! 岑玉海 轉載請注明出處,謝謝!