圖解Spark API


初識spark,需要對其API有熟悉的了解才能方便開發上層應用。本文用圖形的方式直觀表達相關API的工作特點,並提供了解新的API接口使用的方法。例子代碼全部使用python實現。

1. 數據源准備

准備輸入文件:

$ cat /tmp/in
apple
bag bag
cat cat cat

啟動pyspark:

$ ./spark/bin/pyspark

使用textFile創建RDD:

>>> txt = sc.textFile("file:///tmp/in", 2)

查看RDD分區與數據:

>>> txt.glom().collect()
[[u'apple', u'bag bag'], [u'cat cat cat']]

2. transformation

flatMap

處理RDD的每一行,一對多映射。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).collect()
[u'apple', u'bag', u'bag', u'cat', u'cat', u'cat']

示意圖:

map

處理RDD的每一行,一對一映射。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect()
[(u'apple', 1), (u'bag', 1), (u'bag', 1), (u'cat', 1), (u'cat', 1), (u'cat', 1)]

示意圖:

filter

處理RDD的每一行,過濾掉不滿足條件的行。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !='bag').collect()
[u'apple', u'cat', u'cat', u'cat']

示意圖:

mapPartitions

逐個處理每一個partition,使用迭代器it訪問每個partition的行。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect()
[3, 3]

示意圖:

mapPartitionsWithIndex

逐個處理每一個partition,使用迭代器it訪問每個partition的行,index保存partition的索引,等價於mapPartitionsWithSplit(過期函數)。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect()
[0, 1]

示意圖:

sample

根據采樣因子指定的比例,對數據進行采樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。第一個參數表示是否放回抽樣,第二個參數表示抽樣比例,第三個參數表示隨機數seed。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect()
[u'bag', u'bag', u'cat', u'cat']

示意圖:

union

合並RDD,不去重。

代碼示例:

>>> txt.union(txt).collect()
[u'apple', u'bag bag', u'cat cat cat', u'apple', u'bag bag', u'cat cat cat']

示意圖:

distinct

對RDD去重。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).distinct().collect()
[u'bag', u'apple', u'cat']

示意圖:

groupByKey

在一個(K,V)對的數據集上調用,返回一個(K,Seq[V])對的數據集。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()
[(u'bag', <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u'apple', <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u'cat', <pyspark.resultiterable.ResultIterable object at 0x13234d0>)]
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data
[1, 1]

示意圖:

reduceByKey

在一個(K,V)對的數據集上調用時,返回一個(K,V)對的數據集,使用指定的reduce函數,將相同key的值聚合到一起。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()
[(u'bag', 2), (u'apple', 1), (u'cat', 3)]

示意圖:

aggregateByKey

自定義聚合函數,類似groupByKey。在一個(K,V)對的數據集上調用,不過可以返回一個(K,Seq[U])對的數據集。

代碼示例(實現groupByKey的功能):

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).aggregateByKey([], lambda seq, elem: seq + [elem], lambda a, b: a + b).collect()
[(u'bag', [1, 1]), (u'apple', [1]), (u'cat', [1, 1, 1])]

sortByKey

在一個(K,V)對的數據集上調用,K必須實現Ordered接口,返回一個按照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定。

代碼示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey().collect()
[(u'apple', 1), (u'bag', 2), (u'cat', 3)]

示意圖:

join

在類型為(K,V)和(K,W)類型的數據集上調用時,返回一個相同key對應的所有元素對在一起的(K, (V, W))數據集。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.join(sorted_txt).collect()
[(u'bag', (2, 2)), (u'apple', (1, 1)), (u'cat', (3, 3))]

示意圖:

cogroup

在類型為(K,V)和(K,W)的數據集上調用,返回一個 (K, (Seq[V], Seq[W]))元組的數據集。這個操作也可以稱之為groupwith。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.cogroup(sorted_txt).collect()
[(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))]
>>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data
[2]

示意圖:

cartesian

笛卡爾積,在類型為 T 和 U 類型的數據集上調用時,返回一個 (T, U)對數據集(兩兩的元素對)。

代碼示例:

>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey()
>>> sorted_txt.cogroup(sorted_txt).collect()
[(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))]
>>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data
[2]

示意圖:

pipe

處理RDD的每一行作為shell命令輸入,shell命令結果為輸出。

代碼示例:

>>> txt.pipe("awk '{print $1}'").collect()
[u'apple', u'bag', u'cat']

示意圖:

coalesce

減少RDD分區數。

代碼示例:

>>> txt.coalesce(1).collect()
[u'apple', u'bag bag', u'cat cat cat']

示意圖:

repartition

對RDD重新分區,類似於coalesce。

代碼示例:

>>> txt.repartition(1).collect()
[u'apple', u'bag bag', u'cat cat cat']

zip

合並兩個RDD序列為元組,要求序列長度相等。

代碼示例:

>>> txt.zip(txt).collect()
[(u'apple', u'apple'), (u'bag bag', u'bag bag'), (u'cat cat cat', u'cat cat cat')]

示意圖:

3. action

reduce

聚集數據集中的所有元素。

代碼示例:

>>> txt.reduce(lambda a, b: a + " " + b)
u'apple bag bag cat cat cat'

示意圖:

collect

以數組的形式,返回數據集的所有元素。

代碼示例:

>>> txt.collect()
[u'apple', u'bag bag', u'cat cat cat']

count

返回數據集的元素的個數。

代碼示例:

>>> txt.count()
3

first

返回數據集第一個元素。

代碼示例:

>>> txt.first()
u'apple'

take

返回數據集前n個元素。

代碼示例:

>>> txt.take(2)
[u'apple', u'bag bag']

takeSample

采樣返回數據集前n個元素。第一個參數表示是否放回抽樣,第二個參數表示抽樣個數,第三個參數表示隨機數seed。

代碼示例:

>>> txt.takeSample(False, 2, 1)
[u'cat cat cat', u'bag bag']

takeOrdered

排序返回前n個元素。

代碼示例:

>>> txt.takeOrdered(2)
[u'apple', u'bag bag']

saveAsTextFile

將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。

代碼示例:

>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).saveAsTextFile("file:///tmp/out")

查看輸出文件:

$cat /tmp/out/part-00001
(u'bag', 2)
(u'apple', 1)
(u'cat', 3)

saveAsSequenceFile

將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable接口,或者隱式的可以轉換為Writable的RDD。

countByKey

對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key對應的元素個數。

代碼示例:

>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).countByKey()
defaultdict(<type 'int'>, {u'bag': 2, u'apple': 1, u'cat': 3})

foreach

在數據集的每一個元素上,運行函數func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互。

代碼示例:

>>> def func(line): print line
>>> txt.foreach(lambda line: func(line))
apple
bag bag
cat cat cat

4. 其他

文中未提及的transformation和action函數可以通過如下命令查詢:

>>> dir(txt)
['__add__', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__getnewargs__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_computeFractionForSampleSize', '_defaultReducePartitions', '_id', '_jrdd', '_jrdd_deserializer', '_memory_limit', '_pickled', '_reserialize', '_to_java_object_rdd', 'aggregate', 'aggregateByKey', 'cache', 'cartesian', 'checkpoint', 'coalesce', 'cogroup', 'collect', 'collectAsMap', 'combineByKey', 'context', 'count', 'countApprox', 'countApproxDistinct', 'countByKey', 'countByValue', 'ctx', 'distinct', 'filter', 'first', 'flatMap', 'flatMapValues', 'fold', 'foldByKey', 'foreach', 'foreachPartition', 'fullOuterJoin', 'getCheckpointFile', 'getNumPartitions', 'getStorageLevel', 'glom', 'groupBy', 'groupByKey', 'groupWith', 'histogram', 'id', 'intersection', 'isCheckpointed', 'isEmpty', 'is_cached', 'is_checkpointed', 'join', 'keyBy', 'keys', 'leftOuterJoin', 'lookup', 'map', 'mapPartitions', 'mapPartitionsWithIndex', 'mapPartitionsWithSplit', 'mapValues', 'max', 'mean', 'meanApprox', 'min', 'name', 'partitionBy', 'partitioner', 'persist', 'pipe', 'randomSplit', 'reduce', 'reduceByKey', 'reduceByKeyLocally', 'repartition', 'repartitionAndSortWithinPartitions', 'rightOuterJoin', 'sample', 'sampleByKey', 'sampleStdev', 'sampleVariance', 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopDataset', 'saveAsNewAPIHadoopFile', 'saveAsPickleFile', 'saveAsSequenceFile', 'saveAsTextFile', 'setName', 'sortBy', 'sortByKey', 'stats', 'stdev', 'subtract', 'subtractByKey', 'sum', 'sumApprox', 'take', 'takeOrdered', 'takeSample', 'toDF', 'toDebugString', 'toLocalIterator', 'top', 'treeAggregate', 'treeReduce', 'union', 'unpersist', 'values', 'variance', 'zip', 'zipWithIndex', 'zipWithUniqueId']

查詢具體函數的使用文檔:

>>> help(txt.zipWithIndex)
Help on method zipWithIndex in module pyspark.rdd:

zipWithIndex(self) method of pyspark.rdd.RDD instance
    Zips this RDD with its element indices.
    
    The ordering is first based on the partition index and then the
    ordering of items within each partition. So the first item in
    the first partition gets index 0, and the last item in the last
    partition receives the largest index.
    
    This method needs to trigger a spark job when this RDD contains
    more than one partitions.
    
    >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
    [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
(END)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM