Spark系列:Python版Spark編程指南


目錄

一、介紹

二、連接Spark

三、創建RDD

四、RDD常用的轉換 Transformation

五、RDD 常用的執行動作 Action

 

二、連接Spark

Spark1.3.0只支持Python2.6或更高的版本(但不支持Python3)。它使用了標准的CPython解釋器,所以諸如NumPy一類的C庫也是可以使用的。

通過Spark目錄下的bin/spark-submit腳本你可以在Python中運行Spark應用。這個腳本會載入Spark的Java/Scala庫然后讓你將應用提交到集群中。你可以執行bin/pyspark來打開Python的交互命令行。

如果你希望訪問HDFS上的數據,你需要為你使用的HDFS版本建立一個PySpark連接。常見的HDFS版本標簽都已經列在了這個第三方發行版頁面。

最后,你需要將一些Spark的類import到你的程序中。加入如下這行:

from pyspark import SparkContext, SparkConf

在一個Spark程序中要做的第一件事就是創建一個SparkContext對象來告訴Spark如何連接一個集群。為了創建SparkContext,你首先需要創建一個SparkConf對象,這個對象會包含你的應用的一些相關信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName參數是在集群UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN集群的URL,如果你在本地運行那么這個參數應該是特殊的”local”字符串。在實際使用中,當你在集群中運行你的程序,你一般不會把master參數寫死在代碼中,而是通過用spark-submit運行程序來獲得這個參數。但是,在本地測試以及單元測試時,你仍需要自行傳入”local”來運行Spark程序。

 

三、創建RDD

Spark是以RDD概念為中心運行的。RDD是一個容錯的、可以被並行操作的元素集合。創建一個RDD有兩個方法:在你的驅動程序中並行化一個已經存在的集合;從外部存儲系統中引用一個數據集,這個存儲系統可以是一個共享文件系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的數據來源。

並行化集合

並行化集合是通過在驅動程序中一個現有的迭代器或集合上調用SparkContext的parallelize方法建立的。為了創建一個能夠並行操作的分布數據集,集合中的元素都會被拷貝。比如,以下語句創建了一個包含1到5的並行化集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

分布數據集(distData)被建立起來之后,就可以進行並行操作了。比如,我們可以調用disData.reduce(lambda a, b: a+b)來對元素進行疊加。在后文中我們會描述分布數據集上支持的操作。

並行集合的一個重要參數是將數據集划分成分片的數量。對每一個分片,Spark會在集群中運行一個對應的任務。 典型情況下,集群中的每一個CPU將對應運行2-4個分片。一般情況下,Spark會根據當前集群的情況自行設定分片數量。但是,你也可以通過將第二個參 數傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動確定分片數量。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下兼容性。

一個簡單的示例:

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd=sc.parallelize([1,2,3,4,5])
rdd1=rdd.map(lambda r:r+10) #map對每一個元素操作
print(rdd1.collect())

 

外部數據集

PySpark可以通過Hadoop支持的外部數據源(包括本地文件系統、HDFS、 Cassandra、HBase、 亞馬遜S3等等)建立分布數據集。Spark支持文本文件、 序列文件以及其他任何 Hadoop輸入格式文件。

通過文本文件創建RDD要使用SparkContext的textFile方法。這個方法會使用一個文件的URI(或本地文件路徑,hdfs://、s3n://這樣的URI等等)然后讀入這個文件建立一個文本行的集合。以下是一個例子:

>>> distFile = sc.textFile("data.txt")

建立完成后distFile上就可以調用數據集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

在Spark中讀入文件時有幾點要注意:

  • 如果使用了本地文件路徑時,要保證在worker節點上這個文件也能夠通過這個路徑訪問。這點可以通過將這個文件拷貝到所有worker上或者使用網絡掛載的共享文件系統來解決。
  • 包括textFile在內的所有基於文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑作為參數。比如,以下代碼都是合法的:
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
  • textFile方法也可以傳入第二個可選參數來控制文件的分片數量。默認情況下,Spark會為文件的每一個塊(在HDFS中塊的大小默認是64MB) 創建一個分片。但是你也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小於文件塊的數量。

除了文本文件之外,Spark的Python API還支持多種其他數據格式:

  • SparkContext.wholeTextFiles能夠讀入包含多個小文本文件的目錄,然后為每一個文件返回一個(文件名,內容)對。這是與textFile方法為每一個文本行返回一條記錄相對應的。
  • RDD.saveAsPickleFile和SparkContext.pickleFile支持將RDD以串行化的Python對象格式存儲起來。串行化的過程中會以默認10個一批的數量批量處理。
  • 序列文件和其他Hadoop輸入輸出格式。
注意

這個特性目前仍處於試驗階段,被標記為Experimental,目前只適用於高級用戶。這個特性在未來可能會被基於Spark SQL的讀寫支持所取代,因為Spark SQL是更好的方式。

可寫類型支持

PySpark序列文件支持利用Java作為中介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,然后使用 Pyrolite將java結果對象串行化。當將一個鍵值對RDD儲存到一個序列文件中時PySpark將會運行上述過程的相反過程。首先將Python對象反串行化成Java對象,然后轉化成可寫類型。以下可寫類型會自動轉換:

| 可寫類型 | Python類型 |

  • | Text | unicode str|
  • | IntWritable | int |
  • | FloatWritable | float |
  • | DoubleWritable | float |
  • | BooleanWritable | bool |
  • | BytesWritable | bytearray |
  • | NullWritable | None |
  • | MapWritable | dict |

數組是不能自動轉換的。用戶需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自定義的ArrayWritable子 類型轉化成Java的Object[],之后串行化成Python的元組。為了獲得Python的array.array類型來使用主要類型的數組,用戶 需要自行指定轉換器。

保存和讀取序列文件

和文本文件類似,序列文件可以通過指定路徑來保存與讀取。鍵值類型都可以自行指定,但是對於標准可寫類型可以不指定。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
保存和讀取其他Hadoop輸入輸出格式

PySpark同樣支持寫入和讀出其他Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。如果有必要,一個Hadoop配置可以以Python字典的形式傳入。以下是一個例子,使用了Elasticsearch ESInputFormat:

$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

 

四、RDD常用的轉換 Transformation

 RDD支持兩類操作:轉化操作,用於從已有的數據集轉化產生新的數據集;啟動操作,用於在計算結束后向驅動程序返回結果。舉個例子,map是一個轉化操作,可以將數據集中每一個元素傳給一個函數,同時將計算結果作為一個新的RDD返回。另一方面,reduce操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,並且向驅動程序返回最終結果(同時還有一個並行的reduceByKey操作可以返回一個分布數據集)。

在Spark所有的轉化操作都是惰性求值的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作對象(比如:一個文件)。只有當一個啟動操作被執行,要向驅動程序返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運行更加高效——比如,我們會發覺由map操作產生的數據集將會在reduce操作中用到,之后僅僅是返回了reduce的最終的結果而不是map產生的龐大數據集。

在默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist(或cache)方法來將RDD持久化到內存中,這樣Spark就可以在下次使用這個數據集時快速獲得。Spark同樣提供了對將RDD持久化到硬盤上或在多個節點間復制的支持。

下面的表格列出了Spark支持的常用轉化操作。欲知細節,請查閱RDD API文檔(ScalaJavaPython)和鍵值對RDD函數文檔(ScalaJava)。

轉化操作 | 作用
————| ——
map(func) | 返回一個新的分布數據集,由原數據集元素經func處理后的結果組成
filter(func) | 返回一個新的數據集,由傳給func返回True的原數據集元素組成
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是迭代器
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然后替換或不替換
union(otherDataset) | 返回新的數據集,包括原數據集和參數數據集的所有元素
intersection(otherDataset) | 返回新數據集,是兩個集的交集
distinct([numTasks]) | 返回新的集,包括原集中的不重復元素
groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的數據集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算
sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個參數決定
join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD
cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) | 用於T和U類型RDD時返回(T, U)對類型鍵值對RDD
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片
coalesce(numPartitions) | 把RDD的分片數量降低到參數大小
repartition(numPartitions) | 重新打亂RDD中元素順序並重新分片,數量由參數決定
repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序

具體示例:

map

將函數作用於數據集的每一個元素上,生成一個分布式的數據集返回

Return a new RDD by applying a function to each element of this RDD.

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]

一個完整的例子:

from <span class='wp_keywordlink_affiliate'><a href="https://www.168seo.cn/tag/pyspark" title="View all posts in pyspark" target="_blank">pyspark</a></span> import SparkConf,SparkContext

#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
data = range(10)

print(list(data))
r1 = sc.parallelize(data)

r2 = r1.map(lambda x:x+1)

print(r2.collect())
sc.stop()

結果是:

Pyspark rdd 常用的轉換 Transformation  Pyspark(二)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

filter

返回所有 funtion 返回值為True的函數,生成一個分布式的數據集返回

flatMap

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

一個完整的例子:

from pyspark import SparkConf,SparkContext

#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)

data = ["hello zeropython","hello 168seo.cn"]

# print(list(data))
r1 = sc.parallelize(data)

r2 = r1.flatMap(lambda x:x.split(" "))
r3 = r1.map(lambda x:x.split(" "))

print(r2.collect())
print(r3.collect())


sc.stop()
RDD, and then flattening the results.

結果是:

 

 

 

groupBykey

按照相同key的數據分成一組

from _operator import add
 
from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
"""
  Return a new RDD by first applying a function to all elements of this
        RDD, and then flattening the results.
"""
data = ["hello zeropython","hello 168seo.cn"]
 
# print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda y:(y,1))
print("r2",r2.collect())
r3 = r2.groupByKey()
print("r3",r3.collect())
 
r4 = r3.map(lambda x:{x[0]:list(x[1])})
 
print("r4",r4.collect())
 
 
print(r2.reduceByKey(add).collect())
 
sc.stop()
 

結果是:

 

 

 groupBy運算
groupBy運算可以按照傳入匿名函數的規則,將數據分為多個Array。比如下面的代碼將intRDD分為偶數和奇數:

result = intRDD.groupBy(lambda x : x % 2).collect()
print (sorted([(x, sorted(y)) for (x, y) in result]))

輸出為:

[(0, [2]), (1, [1, 3, 5, 5])]

reduceBykey

把相同的key 的數據分發到一起 並進行運算

from _operator import add
 
from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
 
data = ["hello zeropython","hello 168seo.cn"]
 
# print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1))
 
print("r2",r2.collect())
r3 = r2.reduceByKey(lambda x,y:x+y)
 
print("r3",r3.collect())
 
sc.stop()
 

結果是:

 

 

 

 

sortbykey

Sorts this RDD, which is assumed to consist of (key, value) pairs.

from _operator import add
 
from pyspark import SparkConf,SparkContext
 
#配置
conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]")
sc = SparkContext(conf=conf)
# sc.setLogLevel("FATAL")
# sc.setLogLevel("ERROR")
sc.setLogLevel("ERROR")
data = ["hello zeropython","hwlldsf world","168seo.cn","168seo.cn","hello 168seo.cn"]
 
# print(list(data))
r1 = sc.parallelize(data)
 
r2 = r1.flatMap(lambda x:x.split(" "))\
    .map(lambda y:(y,1))\
    .reduceByKey(lambda x,y:x+y)\
    .sortByKey(lambda x:x[1])
    # sortByKey排序根據關鍵詞的值進行排序
    # reduceByKey 讓[("a",[1,1,1,1])] 轉換成 [("a",3)]
 
print(r2.collect())
 
sc.stop()

結果是:

 

 

union

 

 

distinct

 

 

join

 

 

leftOuterJoin

 

 

rightOuterJoin

 

 

fullOuterJoin

 

 

randomSplit運算

randomSplit 運算將整個集合以隨機數的方式按照比例分為多個RDD,比如按照0.4和0.6的比例將intRDD分為兩個RDD,並輸出:

 

 

輸出為:

 

多個RDD轉換運算

RDD也支持執行多個RDD的運算,這里,我們定義三個RDD:

 

 

並集運算

可以使用union函數進行並集運算:

 

 

輸出為:

 

 

交集運算

可以使用intersection進行交集運算:

 

 

兩個集合中只有一個相同元素5,所以輸出為:

 

 

差集運算

subtract(減去 去除)
可以使用subtract函數進行差集運算:

 

 

由於兩個RDD的重復部分為5,所以輸出為[1,2,3]:

 

 

笛卡爾積運算

笛卡爾乘積是指在數學中,兩個集合X和Y的笛卡尓積(Cartesian product),又稱直積,表示為X × Y,第一個對象是X的成員而第二個對象是Y的所有可能有序對的其中一個成員

笛卡爾積又叫笛卡爾乘積,是一個叫笛卡爾的人提出來的。 簡單的說就是兩個集合相乘的結果。
假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積為{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。

可以使用cartesian函數進行笛卡爾乘積運算:

 

 

由於兩個RDD分別有5個元素和2個元素,所以返回結果有10各元素:

 

 五、RDD 常用的執行動作 Action

下面的表格列出了Spark支持的部分常用啟動操作。欲知細節,請查閱RDD API文檔(ScalaJavaPython)和鍵值對RDD函數文檔(ScalaJava)。
啟動操作 | 作用
reduce(func) | 使用func進行聚集計算,func的參數是兩個,返回值一個,兩次func運行應當是完全解耦的,這樣才能正確地並行運算
collect() | 向驅動程序返回數據集的元素組成的數組
count() | 返回數據集元素的數量
first() | 返回數據集的第一個元素
take(n) | 返回前n個元素組成的數組
takeSample(withReplacement, num, [seed]) | 返回一個由原數據集中任意num個元素的suzuki,並且替換之
takeOrder(n, [ordering]) | 返回排序后的前n個元素
saveAsTextFile(path) | 將數據集的元素寫成文本文件
saveAsSequenceFile(path) | 將數據集的元素寫成序列文件,這個API只能用於Java和Scala程序
saveAsObjectFile(path) | 將數據集的元素使用Java的序列化特性寫到文件中,這個API只能用於Java和Scala程序
countByCount() | 只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數
foreach(func) | 對數據集的每個元素執行func, 通常用於完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲交互等

Action(執行):觸發Spark作業的運行,真正觸發轉換算子的計算

Pyspark rdd 常用的轉換 Transformation Pyspark(二)

https://www.168seo.cn/pyspark/24806.html

Pyspark rdd 常用的執行動作 Action  Pyspark(三)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

 

 

基本“動作”運算

讀取元素

可以使用下列命令讀取RDD內的元素,這是Actions運算,所以會馬上執行:

 

 

輸出為:

 

 

Pyspark rdd 常用的執行動作 Action  Pyspark(三)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

統計功能

可以將RDD內的元素進行統計運算:

 

 

輸出為:
Pyspark rdd 常用的執行動作 Action  Pyspark(三)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

RDD Key-Value基本“轉換”運算

Spark RDD支持鍵值對運算,Key-Value運算時mapreduce運算的基礎,本節介紹RDD鍵值的基本“轉換”運算。

初始化

我們用元素類型為tuple元組的數組初始化我們的RDD,這里,每個tuple的第一個值將作為鍵,而第二個元素將作為值。
作為值

 

 

得到key和value值
可以使用keys和values函數分別得到RDD的鍵數組和值數組:

 

 

輸出為:

Pyspark rdd 常用的執行動作 Action  Pyspark(三)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

篩選元素

可以按照鍵進行元素篩選,也可以通過值進行元素篩選,和之前的一樣,使用filter函數,這里要注意的是,雖然RDD中是以鍵值對形式存在,但是本質上還是一個二元組,二元組的第一個值代表鍵,第二個值代表值,所以按照如下的代碼既可以按照鍵進行篩選,我們篩選鍵值小於5的數據:

 

 

輸出為:

 

 

同樣,將x[0]替換為x[1]就是按照值進行篩選,我們篩選值小於5的數據:

 

 

輸出為:

 

 

值運算

我們可以使用mapValues方法處理value值,下面的代碼將value值進行了平方處理:

 

 

輸出為:

 

 

按照key排序

可以使用sortByKey按照key進行排序,傳入參數的默認值為true,是按照從小到大排序,也可以傳入參數false,表示從大到小排序:

 

 

輸出為:

 

 

合並相同key值的數據

使用reduceByKey函數可以對具有相同key值的數據進行合並。比如下面的代碼,由於RDD中存在(3,4)和(3,6)兩條key值均為3的數據,他們將被合為一條數據:

 

 

輸出為

 

 

多個RDD Key-Value“轉換”運算

初始化
首先我們初始化兩個k-v的RDD:

 

 

內連接運算

join運算可以實現類似數據庫的內連接,將兩個RDD按照相同的key值join起來,kvRDD1與kvRDD2的key值唯一相同的是3,kvRDD1中有兩條key值為3的數據(3,4)和(3,6),而kvRDD2中只有一條key值為3的數據(3,8),所以join的結果是(3,(4,8)) 和(3,(6,8)):

 

 

輸出為:

 

 

左外連接

使用leftOuterJoin可以實現類似數據庫的左外連接,如果kvRDD1的key值對應不到kvRDD2,就會顯示None

 

 

輸出為:

 

 

右外連接
使用rightOuterJoin可以實現類似數據庫的右外連接,如果kvRDD2的key值對應不到kvRDD1,就會顯示None

 

 

輸出為:

 

 

刪除相同key值數據

使用subtractByKey運算會刪除相同key值得數據:

 

 

結果為:

 

 

Key-Value“動作”運算

讀取數據
可以使用下面的幾種方式讀取RDD的數據:

 

 

輸出為:

 

 

按key值統計:

使用countByKey函數可以統計各個key值對應的數據的條數:

 

 

輸出為:

 

 

lookup查找運算

使用lookup函數可以根據輸入的key值來查找對應的Value值:

 

 

輸出為:

 

 

持久化操作

spark RDD的持久化機制,可以將需要重復運算的RDD存儲在內存中,以便大幅提升運算效率,有兩個主要的函數:

持久化

使用persist函數對RDD進行持久化:

 

 

在持久化的同時我們可以指定持久化存儲等級:

Pyspark rdd 常用的執行動作 Action  Pyspark(三)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

首先我們導入相關函數:

 

 

在scala中可以直接使用上述的持久化等級關鍵詞,但是在pyspark中封裝為了一個類,
StorageLevel類,並在初始化時指定一些參數,通過不同的參數組合,可以實現上面的不同存儲等級。StorageLevel類的初始化函數如下:

 

 

那么不同的存儲等級對應的參數為:

 

 

取消持久化

使用unpersist函數對RDD進行持久化:

 

 

整理回顧
哇,有關pyspark的RDD的基本操作就是上面這些啦,想要了解更多的盆友們可以參照官網給出的官方文檔:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

今天主要介紹了兩種RDD,基本的RDD和Key-Value形式的RDD,介紹了他們的幾種“轉換”運算和“動作”運算,整理如下:
Pyspark rdd 常用的執行動作 Action  Pyspark(三)-Python 技術分享 Java技術分享 Python 爬蟲技術_微信公眾號:zeropython—昊天博客

 

 

 

refer:

https://www.168seo.cn/pyspark/24806.html

https://www.168seo.cn/pyspark/24809.html

https://www.csdn.net/article/2015-04-24/2824552


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM