Spark RDD :Spark API--圖解Spark API


面試題引出:

簡述Spark的寬窄依賴,以及Spark如何划分stage,每個stage又根據什么決定task個數? 

Stage:根據RDD之間的依賴關系的不同將Job划分成不同的Stage,遇到一個寬依賴則划分一個Stage。

Task:Stage是一個TaskSet,將Stage根據分區數划分成一個個的Task。

 請列舉Spark的transformation算子(不少於8個),並簡述功能

1)map(func):返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成.

2)mapPartitions(func):類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。

3)reduceByKey(func,[numTask]):在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用定的reduce函數,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置。

4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv對的RDD中,,按key將value進行分組合並,合並時,將每個value和初始值作為seq函數的參數,進行計算,返回的結果作為一個新的kv對,然后再將結果按照key進行合並,最后將每個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果作為一個新的kv對輸出。

 5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):

對相同K,把V合並成一個集合。

1.createCombiner: combineByKey() 會遍歷分區中的所有元素,因此每個元素的鍵要么還沒有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫作createCombiner()的函數來創建那個鍵對應的累加器的初始值

2.mergeValue: 如果這是一個在處理當前分區之前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合並

3.mergeCombiners: 由於每個分區都是獨立處理的, 因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應同一個鍵的累加器, 就需要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合並。

根據自身情況選擇比較熟悉的算子加以介紹。

4.10.8 請列舉Spark的action算子(不少於6個),並簡述功能

1)reduce:

2)collect:

3)first:

4)take:

5)aggregate:

6)countByKey:

7)foreach:

8)saveAsTextFile:

4.10.9 請列舉會引起Shuffle過程的Spark算子,並簡述功能。

reduceBykey:

groupByKey:

…ByKey:

初識spark,需要對其API有熟悉的了解才能方便開發上層應用。本文用圖形的方式直觀表達相關API的工作特點,並提供了解新的API接口使用的方法。例子代碼全部使用python實現。

 

 

 

 

1. 數據源准備

准備輸入文件:

$ cat /tmp/in apple bag bag cat cat cat

啟動pyspark:

$ ./spark/bin/pyspark

使用textFile創建RDD:

>>> txt = sc.textFile("file:///tmp/in", 2)

查看RDD分區與數據:

>>> txt.glom().collect() [[u'apple', u'bag bag'], [u'cat cat cat']]

2. transformation

flatMap

處理RDD的每一行,一對多映射。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).collect() [u'apple', u'bag', u'bag', u'cat', u'cat', u'cat']

示意圖:

 

 

 

map

處理RDD的每一行,一對一映射。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect() [(u'apple', 1), (u'bag', 1), (u'bag', 1), (u'cat', 1), (u'cat', 1), (u'cat', 1)]

示意圖:

 

 

 

filter

處理RDD的每一行,過濾掉不滿足條件的行。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !='bag').collect() [u'apple', u'cat', u'cat', u'cat']


 

 

mapPartitions

逐個處理每一個partition,使用迭代器it訪問每個partition的行。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect() [3, 3]

示意圖:

 

 

mapPartitionsWithIndex

逐個處理每一個partition,使用迭代器it訪問每個partition的行,index保存partition的索引,等價於mapPartitionsWithSplit(過期函數)。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect() [0, 1]

示意圖:

 

 

sample

根據采樣因子指定的比例,對數據進行采樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。第一個參數表示是否放回抽樣,第二個參數表示抽樣比例,第三個參數表示隨機數seed。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect() [u'bag', u'bag', u'cat', u'cat']

示意圖:

 

 

union

合並RDD,不去重。

代碼示例:

>>> txt.union(txt).collect() [u'apple', u'bag bag', u'cat cat cat', u'apple', u'bag bag', u'cat cat cat']

示意圖:

 

 

distinct

對RDD去重。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).distinct().collect() [u'bag', u'apple', u'cat']

示意圖:

 

 

groupByKey

在一個(K,V)對的數據集上調用,返回一個(K,Seq[V])對的數據集。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect() [(u'bag', <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u'apple', <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u'cat', <pyspark.resultiterable.ResultIterable object at 0x13234d0>)] >>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data [1, 1]

示意圖:

 

 

reduceByKey

在一個(K,V)對的數據集上調用時,返回一個(K,V)對的數據集,使用指定的reduce函數,將相同key的值聚合到一起。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect() [(u'bag', 2), (u'apple', 1), (u'cat', 3)]

示意圖:

 

 

aggregateByKey

自定義聚合函數,類似groupByKey。在一個(K,V)對的數據集上調用,不過可以返回一個(K,Seq[U])對的數據集。

代碼示例(實現groupByKey的功能):

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).aggregateByKey([], lambda seq, elem: seq + [elem], lambda a, b: a + b).collect() [(u'bag', [1, 1]), (u'apple', [1]), (u'cat', [1, 1, 1])]

sortByKey

在一個(K,V)對的數據集上調用,K必須實現Ordered接口,返回一個按照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey().collect() [(u'apple', 1), (u'bag', 2), (u'cat', 3)]

示意圖:

 

 

join

在類型為(K,V)和(K,W)類型的數據集上調用時,返回一個相同key對應的所有元素對在一起的(K, (V, W))數據集。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey() >>> sorted_txt.join(sorted_txt).collect() [(u'bag', (2, 2)), (u'apple', (1, 1)), (u'cat', (3, 3))]

示意圖:

 

 

cogroup

在類型為(K,V)和(K,W)的數據集上調用,返回一個 (K, (Seq[V], Seq[W]))元組的數據集。這個操作也可以稱之為groupwith。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey() >>> sorted_txt.cogroup(sorted_txt).collect() [(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))] >>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data [2]

示意圖:

 

cartesian

笛卡爾積,在類型為 T 和 U 類型的數據集上調用時,返回一個 (T, U)對數據集(兩兩的元素對)。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey() >>> sorted_txt.cogroup(sorted_txt).collect() [(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))] >>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data [2]

示意圖:

 

 

 

 

pipe

處理RDD的每一行作為shell命令輸入,shell命令結果為輸出。

代碼示例:

>>> txt.pipe("awk '{print $1}'").collect() [u'apple', u'bag', u'cat']

示意圖:

 

 

coalesce

減少RDD分區數。

代碼示例:

>>> txt.coalesce(1).collect() [u'apple', u'bag bag', u'cat cat cat']

示意圖:

 

 

repartition

對RDD重新分區,類似於coalesce。

代碼示例:

>>> txt.repartition(1).collect() [u'apple', u'bag bag', u'cat cat cat']

zip

合並兩個RDD序列為元組,要求序列長度相等。

代碼示例:

>>> txt.zip(txt).collect() [(u'apple', u'apple'), (u'bag bag', u'bag bag'), (u'cat cat cat', u'cat cat cat')]

示意圖:

 

 

3. action

reduce

聚集數據集中的所有元素。

代碼示例:

>>> txt.reduce(lambda a, b: a + " " + b) u'apple bag bag cat cat cat'

示意圖:

 

 

collect

以數組的形式,返回數據集的所有元素。

代碼示例:

>>> txt.collect() [u'apple', u'bag bag', u'cat cat cat']

count

返回數據集的元素的個數。

代碼示例:

>>> txt.count() 3

first

返回數據集第一個元素。

代碼示例:

>>> txt.first() u'apple'

take

返回數據集前n個元素。

代碼示例:

>>> txt.take(2) [u'apple', u'bag bag']

takeSample

采樣返回數據集前n個元素。第一個參數表示是否放回抽樣,第二個參數表示抽樣個數,第三個參數表示隨機數seed。

代碼示例:

>>> txt.takeSample(False, 2, 1) [u'cat cat cat', u'bag bag']

takeOrdered

排序返回前n個元素。

代碼示例:

>>> txt.takeOrdered(2) [u'apple', u'bag bag']

saveAsTextFile

將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。

代碼示例:

>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).saveAsTextFile("file:///tmp/out")

查看輸出文件:

$cat /tmp/out/part-00001 (u'bag', 2) (u'apple', 1) (u'cat', 3)

saveAsSequenceFile

將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable接口,或者隱式的可以轉換為Writable的RDD。

countByKey

對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key對應的元素個數。

代碼示例:

>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).countByKey() defaultdict(<type 'int'>, {u'bag': 2, u'apple': 1, u'cat': 3})

foreach

在數據集的每一個元素上,運行函數func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互。

代碼示例:

>>> def func(line): print line >>> txt.foreach(lambda line: func(line)) apple bag bag cat cat cat


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM