目錄
一、介紹
二、連接Spark
三、創建RDD
四、RDD常用的轉換 Transformation
五、RDD 常用的執行動作 Action
二、連接Spark
Spark1.3.0只支持Python2.6或更高的版本(但不支持Python3)。它使用了標准的CPython解釋器,所以諸如NumPy一類的C庫也是可以使用的。
通過Spark目錄下的bin/spark-submit腳本你可以在Python中運行Spark應用。這個腳本會載入Spark的Java/Scala庫然后讓你將應用提交到集群中。你可以執行bin/pyspark來打開Python的交互命令行。
如果你希望訪問HDFS上的數據,你需要為你使用的HDFS版本建立一個PySpark連接。常見的HDFS版本標簽都已經列在了這個第三方發行版頁面。
最后,你需要將一些Spark的類import到你的程序中。加入如下這行:
from pyspark import SparkContext, SparkConf
在一個Spark程序中要做的第一件事就是創建一個SparkContext對象來告訴Spark如何連接一個集群。為了創建SparkContext,你首先需要創建一個SparkConf對象,這個對象會包含你的應用的一些相關信息。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
appName參數是在集群UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN集群的URL,如果你在本地運行那么這個參數應該是特殊的”local”字符串。在實際使用中,當你在集群中運行你的程序,你一般不會把master參數寫死在代碼中,而是通過用spark-submit運行程序來獲得這個參數。但是,在本地測試以及單元測試時,你仍需要自行傳入”local”來運行Spark程序。
三、創建RDD
Spark是以RDD概念為中心運行的。RDD是一個容錯的、可以被並行操作的元素集合。創建一個RDD有兩個方法:在你的驅動程序中並行化一個已經存在的集合;從外部存儲系統中引用一個數據集,這個存儲系統可以是一個共享文件系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的數據來源。
並行化集合
並行化集合是通過在驅動程序中一個現有的迭代器或集合上調用SparkContext的parallelize方法建立的。為了創建一個能夠並行操作的分布數據集,集合中的元素都會被拷貝。比如,以下語句創建了一個包含1到5的並行化集合:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
分布數據集(distData)被建立起來之后,就可以進行並行操作了。比如,我們可以調用disData.reduce(lambda a, b: a+b)
來對元素進行疊加。在后文中我們會描述分布數據集上支持的操作。
並行集合的一個重要參數是將數據集划分成分片的數量。對每一個分片,Spark會在集群中運行一個對應的任務。 典型情況下,集群中的每一個CPU將對應運行2-4個分片。一般情況下,Spark會根據當前集群的情況自行設定分片數量。但是,你也可以通過將第二個參 數傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動確定分片數量。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下兼容性。
一個簡單的示例:
import findspark findspark.init() from pyspark import SparkContext from pyspark import SparkConf conf = SparkConf().setAppName("miniProject").setMaster("local[*]") sc=SparkContext.getOrCreate(conf) rdd=sc.parallelize([1,2,3,4,5]) rdd1=rdd.map(lambda r:r+10) #map對每一個元素操作 print(rdd1.collect())
外部數據集
PySpark可以通過Hadoop支持的外部數據源(包括本地文件系統、HDFS、 Cassandra、HBase、 亞馬遜S3等等)建立分布數據集。Spark支持文本文件、 序列文件以及其他任何 Hadoop輸入格式文件。
通過文本文件創建RDD要使用SparkContext的textFile方法。這個方法會使用一個文件的URI(或本地文件路徑,hdfs://、s3n://這樣的URI等等)然后讀入這個文件建立一個文本行的集合。以下是一個例子:
>>> distFile = sc.textFile("data.txt")
建立完成后distFile上就可以調用數據集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
在Spark中讀入文件時有幾點要注意:
- 如果使用了本地文件路徑時,要保證在worker節點上這個文件也能夠通過這個路徑訪問。這點可以通過將這個文件拷貝到所有worker上或者使用網絡掛載的共享文件系統來解決。
- 包括textFile在內的所有基於文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑作為參數。比如,以下代碼都是合法的:
textFile("/my/directory") textFile("/my/directory/*.txt") textFile("/my/directory/*.gz")
- textFile方法也可以傳入第二個可選參數來控制文件的分片數量。默認情況下,Spark會為文件的每一個塊(在HDFS中塊的大小默認是64MB) 創建一個分片。但是你也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小於文件塊的數量。
除了文本文件之外,Spark的Python API還支持多種其他數據格式:
- SparkContext.wholeTextFiles能夠讀入包含多個小文本文件的目錄,然后為每一個文件返回一個(文件名,內容)對。這是與textFile方法為每一個文本行返回一條記錄相對應的。
- RDD.saveAsPickleFile和SparkContext.pickleFile支持將RDD以串行化的Python對象格式存儲起來。串行化的過程中會以默認10個一批的數量批量處理。
- 序列文件和其他Hadoop輸入輸出格式。
注意
這個特性目前仍處於試驗階段,被標記為Experimental,目前只適用於高級用戶。這個特性在未來可能會被基於Spark SQL的讀寫支持所取代,因為Spark SQL是更好的方式。
可寫類型支持
PySpark序列文件支持利用Java作為中介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,然后使用 Pyrolite將java結果對象串行化。當將一個鍵值對RDD儲存到一個序列文件中時PySpark將會運行上述過程的相反過程。首先將Python對象反串行化成Java對象,然后轉化成可寫類型。以下可寫類型會自動轉換:
| 可寫類型 | Python類型 |
- | Text | unicode str|
- | IntWritable | int |
- | FloatWritable | float |
- | DoubleWritable | float |
- | BooleanWritable | bool |
- | BytesWritable | bytearray |
- | NullWritable | None |
- | MapWritable | dict |
數組是不能自動轉換的。用戶需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自定義的ArrayWritable子 類型轉化成Java的Object[],之后串行化成Python的元組。為了獲得Python的array.array類型來使用主要類型的數組,用戶 需要自行指定轉換器。
保存和讀取序列文件
和文本文件類似,序列文件可以通過指定路徑來保存與讀取。鍵值類型都可以自行指定,但是對於標准可寫類型可以不指定。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
保存和讀取其他Hadoop輸入輸出格式
PySpark同樣支持寫入和讀出其他Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。如果有必要,一個Hadoop配置可以以Python字典的形式傳入。以下是一個例子,使用了Elasticsearch ESInputFormat:
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
四、RDD常用的轉換 Transformation
RDD支持兩類操作:轉化操作,用於從已有的數據集轉化產生新的數據集;啟動操作,用於在計算結束后向驅動程序返回結果。舉個例子,map
是一個轉化操作,可以將數據集中每一個元素傳給一個函數,同時將計算結果作為一個新的RDD返回。另一方面,reduce
操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,並且向驅動程序返回最終結果(同時還有一個並行的reduceByKey
操作可以返回一個分布數據集)。
在Spark所有的轉化操作都是惰性求值的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作對象(比如:一個文件)。只有當一個啟動操作被執行,要向驅動程序返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運行更加高效——比如,我們會發覺由map
操作產生的數據集將會在reduce
操作中用到,之后僅僅是返回了reduce
的最終的結果而不是map
產生的龐大數據集。
在默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist
(或cache
)方法來將RDD持久化到內存中,這樣Spark就可以在下次使用這個數據集時快速獲得。Spark同樣提供了對將RDD持久化到硬盤上或在多個節點間復制的支持。
下面的表格列出了Spark支持的常用轉化操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。
轉化操作 | 作用
————| ——
map(func) | 返回一個新的分布數據集,由原數據集元素經func處理后的結果組成
filter(func) | 返回一個新的數據集,由傳給func返回True的原數據集元素組成
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是迭代器
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然后替換或不替換
union(otherDataset) | 返回新的數據集,包括原數據集和參數數據集的所有元素
intersection(otherDataset) | 返回新數據集,是兩個集的交集
distinct([numTasks]) | 返回新的集,包括原集中的不重復元素
groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的數據集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算
sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個參數決定
join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD
cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) | 用於T和U類型RDD時返回(T, U)對類型鍵值對RDD
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片
coalesce(numPartitions) | 把RDD的分片數量降低到參數大小
repartition(numPartitions) | 重新打亂RDD中元素順序並重新分片,數量由參數決定
repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序
具體示例:
map
將函數作用於數據集的每一個元素上,生成一個分布式的數據集返回
Return a new RDD by applying a function to each element of this RDD. >>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)]
一個完整的例子:
from <span class='wp_keywordlink_affiliate'><a href="https://www.168seo.cn/tag/pyspark" title="View all posts in pyspark" target="_blank">pyspark</a></span> import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = range(10) print(list(data)) r1 = sc.parallelize(data) r2 = r1.map(lambda x:x+1) print(r2.collect()) sc.stop()
結果是:

filter
返回所有 funtion 返回值為True的函數,生成一個分布式的數據集返回
Return a new RDD containing only the elements that satisfy a predicate. >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4]
一個完整的例子:
from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = range(10) print(list(data)) r1 = sc.parallelize(data) r2 = r1.filter(lambda x:x>5) print(r2.collect()) sc.stop()
結果是:
flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
一個完整的例子:
from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")) r3 = r1.map(lambda x:x.split(" ")) print(r2.collect()) print(r3.collect()) sc.stop() RDD, and then flattening the results.
結果是:
groupBykey
按照相同key的數據分成一組
from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") """ Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. """ data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda y:(y,1)) print("r2",r2.collect()) r3 = r2.groupByKey() print("r3",r3.collect()) r4 = r3.map(lambda x:{x[0]:list(x[1])}) print("r4",r4.collect()) print(r2.reduceByKey(add).collect()) sc.stop()
結果是:
groupBy運算
groupBy運算可以按照傳入匿名函數的規則,將數據分為多個Array。比如下面的代碼將intRDD分為偶數和奇數:
result = intRDD.groupBy(lambda x : x % 2).collect() print (sorted([(x, sorted(y)) for (x, y) in result]))
輸出為:
[(0, [2]), (1, [1, 3, 5, 5])]
reduceBykey
把相同的key 的數據分發到一起 並進行運算
from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)) print("r2",r2.collect()) r3 = r2.reduceByKey(lambda x,y:x+y) print("r3",r3.collect()) sc.stop()
結果是:
sortbykey
Sorts this RDD, which is assumed to consist of (key, value) pairs.
from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) # sc.setLogLevel("FATAL") # sc.setLogLevel("ERROR") sc.setLogLevel("ERROR") data = ["hello zeropython","hwlldsf world","168seo.cn","168seo.cn","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" "))\ .map(lambda y:(y,1))\ .reduceByKey(lambda x,y:x+y)\ .sortByKey(lambda x:x[1]) # sortByKey排序根據關鍵詞的值進行排序 # reduceByKey 讓[("a",[1,1,1,1])] 轉換成 [("a",3)] print(r2.collect()) sc.stop()
結果是:
union
1
2
3
4
5
6
7
8
|
"""
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
"""
|
distinct
1
2
3
4
5
6
7
|
"""
Return a new RDD containing the distinct elements in this RDD.
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
"""
|
join
1
2
3
4
5
|
>>> a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
>>> b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
>>> a.join(b).collect()
[('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]
|
leftOuterJoin
1
2
3
|
>>> a.leftOuterJoin(b).collect()
[('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]
|
rightOuterJoin
1
2
3
|
>>> a.rightOuterJoin(b).collect()
[('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]
|
fullOuterJoin
1
2
3
4
|
>>> a.fullOuterJoin(b).collect()
[('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))]
>>>
|
randomSplit運算
randomSplit 運算將整個集合以隨機數的方式按照比例分為多個RDD,比如按照0.4和0.6的比例將intRDD分為兩個RDD,並輸出:
1
2
3
4
5
6
7
|
intRDD = sc.parallelize([3,1,2,5,5])
stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])
sRDD = intRDD.randomSplit([0.4,0.6])
print (len(sRDD))
print (sRDD[0].collect())
print (sRDD[1].collect())
|
輸出為:
1
2
3
4
|
2
[3, 1]
[2, 5, 5]
|
多個RDD轉換運算
RDD也支持執行多個RDD的運算,這里,我們定義三個RDD:
1
2
3
4
|
intRDD1 = sc.parallelize([3,1,2,5,5])
intRDD2 = sc.parallelize([5,6])
intRDD3 = sc.parallelize([2,7])
|
並集運算
可以使用union函數進行並集運算:
1
2
|
print (intRDD1.union(intRDD2).union(intRDD3).collect())
|
輸出為:
1
2
|
[3, 1, 2, 5, 5, 5, 6, 2, 7]
|
交集運算
可以使用intersection進行交集運算:
1
2
|
print(intRDD1.intersection(intRDD2).collect())
|
兩個集合中只有一個相同元素5,所以輸出為:
1
2
|
[5]
|
差集運算
subtract(減去 去除)
可以使用subtract函數進行差集運算:
1
2
|
print (intRDD1.subtract(intRDD2).collect())
|
由於兩個RDD的重復部分為5,所以輸出為[1,2,3]:
1
2
|
[2, 1, 3]
|
笛卡爾積運算
笛卡爾乘積是指在數學中,兩個集合X和Y的笛卡尓積(Cartesian product),又稱直積,表示為X × Y,第一個對象是X的成員而第二個對象是Y的所有可能有序對的其中一個成員
笛卡爾積又叫笛卡爾乘積,是一個叫笛卡爾的人提出來的。 簡單的說就是兩個集合相乘的結果。
假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積為{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。
可以使用cartesian函數進行笛卡爾乘積運算:
1
2
|
print (intRDD1.cartesian(intRDD2).collect())
|
由於兩個RDD分別有5個元素和2個元素,所以返回結果有10各元素:
1
2
|
[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)]
|
五、RDD 常用的執行動作 Action
下面的表格列出了Spark支持的部分常用啟動操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。
啟動操作 | 作用
reduce(func) | 使用func進行聚集計算,func的參數是兩個,返回值一個,兩次func運行應當是完全解耦的,這樣才能正確地並行運算
collect() | 向驅動程序返回數據集的元素組成的數組
count() | 返回數據集元素的數量
first() | 返回數據集的第一個元素
take(n) | 返回前n個元素組成的數組
takeSample(withReplacement, num, [seed]) | 返回一個由原數據集中任意num個元素的suzuki,並且替換之
takeOrder(n, [ordering]) | 返回排序后的前n個元素
saveAsTextFile(path) | 將數據集的元素寫成文本文件
saveAsSequenceFile(path) | 將數據集的元素寫成序列文件,這個API只能用於Java和Scala程序
saveAsObjectFile(path) | 將數據集的元素使用Java的序列化特性寫到文件中,這個API只能用於Java和Scala程序
countByCount() | 只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數
foreach(func) | 對數據集的每個元素執行func, 通常用於完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲交互等
Action(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
Pyspark rdd 常用的轉換 Transformation Pyspark(二)
https://www.168seo.cn/pyspark/24806.html

1
2
3
|
intRDD = sc.parallelize([3,1,2,5,5])
stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])
|
基本“動作”運算
讀取元素
可以使用下列命令讀取RDD內的元素,這是Actions運算,所以會馬上執行:
1
2
3
4
5
6
7
8
9
|
#取第一條數據
print (intRDD.first())
#取前兩條數據
print (intRDD.take(2))
#升序排列,並取前3條數據
print (intRDD.takeOrdered(3))
#降序排列,並取前3條數據
print (intRDD.takeOrdered(3,lambda x:-x))
|
輸出為:
1
2
3
4
5
|
3
[3, 1]
[1, 2, 3]
[5, 5, 3]
|

統計功能
可以將RDD內的元素進行統計運算:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#統計
print (intRDD.stats())
#最小值
print (intRDD.min())
#最大值
print (intRDD.max())
#標准差
print (intRDD.stdev())
#計數
print (intRDD.count())
#求和
print (intRDD.sum())
#平均
print (intRDD.mean())
|
輸出為:
RDD Key-Value基本“轉換”運算
Spark RDD支持鍵值對運算,Key-Value運算時mapreduce運算的基礎,本節介紹RDD鍵值的基本“轉換”運算。
初始化
我們用元素類型為tuple元組的數組初始化我們的RDD,這里,每個tuple的第一個值將作為鍵,而第二個元素將作為值。
作為值
1
2
|
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
|
得到key和value值
可以使用keys和values函數分別得到RDD的鍵數組和值數組:
1
2
3
|
print (kvRDD1.keys().collect())
print (kvRDD1.values().collect())
|
輸出為:

篩選元素
可以按照鍵進行元素篩選,也可以通過值進行元素篩選,和之前的一樣,使用filter函數,這里要注意的是,雖然RDD中是以鍵值對形式存在,但是本質上還是一個二元組,二元組的第一個值代表鍵,第二個值代表值,所以按照如下的代碼既可以按照鍵進行篩選,我們篩選鍵值小於5的數據:
1
2
|
print (kvRDD1.filter(lambda x:x[0] < 5).collect())
|
輸出為:
1
2
|
[(3, 4), (3, 6), (1, 2)]
|
同樣,將x[0]替換為x[1]就是按照值進行篩選,我們篩選值小於5的數據:
1
2
|
print (kvRDD1.filter(lambda x:x[1] < 5).collect())
|
輸出為:
1
2
|
[(3, 4), (1, 2)]
|
值運算
我們可以使用mapValues方法處理value值,下面的代碼將value值進行了平方處理:
1
2
|
print (kvRDD1.mapValues(lambda x:x**2).collect())
|
輸出為:
1
2
|
[(3, 16), (3, 36), (5, 36), (1, 4)]
|
按照key排序
可以使用sortByKey按照key進行排序,傳入參數的默認值為true,是按照從小到大排序,也可以傳入參數false,表示從大到小排序:
1
2
3
4
|
print (kvRDD1.sortByKey().collect())
print (kvRDD1.sortByKey(True).collect())
print (kvRDD1.sortByKey(False).collect())
|
輸出為:
1
2
3
4
|
[(1, 2), (3, 4), (3, 6), (5, 6)]
[(1, 2), (3, 4), (3, 6), (5, 6)]
[(5, 6), (3, 4), (3, 6), (1, 2)]
|
合並相同key值的數據
使用reduceByKey函數可以對具有相同key值的數據進行合並。比如下面的代碼,由於RDD中存在(3,4)和(3,6)兩條key值均為3的數據,他們將被合為一條數據:
1
2
|
print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())
|
輸出為
1
2
|
[(1, 2), (3, 10), (5, 6)]
|
多個RDD Key-Value“轉換”運算
初始化
首先我們初始化兩個k-v的RDD:
1
2
3
|
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2 = sc.parallelize([(3,8)])
|
內連接運算
join運算可以實現類似數據庫的內連接,將兩個RDD按照相同的key值join起來,kvRDD1與kvRDD2的key值唯一相同的是3,kvRDD1中有兩條key值為3的數據(3,4)和(3,6),而kvRDD2中只有一條key值為3的數據(3,8),所以join的結果是(3,(4,8)) 和(3,(6,8)):
1
2
|
print (kvRDD1.join(kvRDD2).collect())
|
輸出為:
1
2
|
[(3, (4, 8)), (3, (6, 8))]
|
左外連接
使用leftOuterJoin可以實現類似數據庫的左外連接,如果kvRDD1的key值對應不到kvRDD2,就會顯示None
1
2
|
print (kvRDD1.leftOuterJoin(kvRDD2).collect())
|
輸出為:
1
2
|
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]
|
右外連接
使用rightOuterJoin可以實現類似數據庫的右外連接,如果kvRDD2的key值對應不到kvRDD1,就會顯示None
1
2
|
print (kvRDD1.rightOuterJoin(kvRDD2).collect())
|
輸出為:
1
2
|
[(3, (4, 8)), (3, (6, 8))]
|
刪除相同key值數據
使用subtractByKey運算會刪除相同key值得數據:
1
2
|
print (kvRDD1.subtractByKey(kvRDD2).collect())
|
結果為:
1
2
|
[(1, 2), (5, 6)]
|
Key-Value“動作”運算
讀取數據
可以使用下面的幾種方式讀取RDD的數據:
1
2
3
4
5
6
7
8
9
|
#讀取第一條數據
print (kvRDD1.first())
#讀取前兩條數據
print (kvRDD1.take(2))
#讀取第一條數據的key值
print (kvRDD1.first()[0])
#讀取第一條數據的value值
print (kvRDD1.first()[1])
|
輸出為:
1
2
3
4
5
|
(3, 4)
[(3, 4), (3, 6)]
3
4
|
按key值統計:
使用countByKey函數可以統計各個key值對應的數據的條數:
1
2
|
print (kvRDD1.countByKey().collect())
|
輸出為:
1
2
|
defaultdict(<type 'int'>, {1: 1, 3: 2, 5: 1})
|
lookup查找運算
使用lookup函數可以根據輸入的key值來查找對應的Value值:
1
2
|
print (kvRDD1.lookup(3))
|
輸出為:
1
2
|
[4, 6]
|
持久化操作
spark RDD的持久化機制,可以將需要重復運算的RDD存儲在內存中,以便大幅提升運算效率,有兩個主要的函數:
持久化
使用persist函數對RDD進行持久化:
1
2
|
kvRDD1.persist()
|
在持久化的同時我們可以指定持久化存儲等級:

首先我們導入相關函數:
1
2
|
from pyspark.storagelevel import StorageLevel
|
在scala中可以直接使用上述的持久化等級關鍵詞,但是在pyspark中封裝為了一個類,
StorageLevel類,並在初始化時指定一些參數,通過不同的參數組合,可以實現上面的不同存儲等級。StorageLevel類的初始化函數如下:
1
2
3
4
5
6
7
|
def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1):
self.useDisk = useDisk
self.useMemory = useMemory
self.useOffHeap = useOffHeap
self.deserialized = deserialized
self.replication = replication
|
那么不同的存儲等級對應的參數為:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)
"""
.. note:: The following four storage level constants are deprecated in 2.0, since the records \
will always be serialized in Python.
"""
StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead."""
StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead."""
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead."""
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead."""
|
取消持久化
使用unpersist函數對RDD進行持久化:
1
2
|
kvRDD1.unpersist()
|
整理回顧
哇,有關pyspark的RDD的基本操作就是上面這些啦,想要了解更多的盆友們可以參照官網給出的官方文檔:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
今天主要介紹了兩種RDD,基本的RDD和Key-Value形式的RDD,介紹了他們的幾種“轉換”運算和“動作”運算,整理如下:
refer:
https://www.168seo.cn/pyspark/24806.html