- 工作方式 單機 分布式
- 內存緩存 單機緩存 persist() or cache()將轉換的RDDs保存在內存
- df可變性 pandas 是可變的 spark_df中RDDs是不可變的 所以DF不可變
- 創建
- https://www.qedev.com/bigdata/170633.html 詳細對比
- 1
RDD數據結構的常用函數
- 創建RDD
- 1 是textFile加載本地或者集群文件系統中的數據,
- 2 用parallelize方法將Driver中的數據結構並行化成RDD。
- 常用Action操作
- 1 collect 將數據匯集到Driver,數據過大時有超內存風險
- 2 take 將前若干個數據匯集到Driver,比collect安全
- 3 takeSample 可以隨機取若干個到Driver,第一個參數設置是否放回抽樣
- 4 first 取第一個數據
- 5 count 查看RDD元素數量
- 6 reduce 利用二元函數對數據進行規約
- 7 foreach 對每一個元素執行某種操作,不生成新的RDD
- 8 countByKey 對Pair RDD按key統計數量
- 9 saveAsTextFile 保存rdd成text文件到本地
- 常用Transformation操作
- 1 Transformation 轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關系,只有當Action操作觸發到該依賴的時候,它才被計算。
- 2 map 操作對每個元素進行一個映射轉換
- 3 filter 應用過濾條件過濾掉一些數據
- 4 flatMap 操作執行將每個元素生成一個Array后壓平
- 5 sample 對原rdd在每個分區按照比例進行抽樣,第一個參數設置是否可以重復抽樣
- 6 distinct 去重
- 7 subtract 找到屬於前一個rdd而不屬於后一個rdd的元素
- 8 union 合並數據
- 9 intersection 求交集
- 10 cartesian 笛卡爾積
- 11 sortBy 按照某種方式進行排序
- 12 zip 按照拉鏈方式連接兩個RDD,效果類似python的zip函數 需要兩個RDD具有相同的分區,每個分區元素數量相同4
- 13 zipWithIndex 將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
- 常用PairrDD的轉換操作 PairRDD指的是數據為長度為2的tuple類似(k,v)結構的數據類型的RDD,其每個數據的第一個元素被當做key,第二個元素被當做value.
- 1 reduceByKey對相同的key對應的values應用二元歸並操作
- 2 groupByKey將相同的key對應的values收集成一個Iterator 迭代器
- 3 sortByKey按照key排序,可以指定是否降序
- 4 join相當於根據key進行內連接
- 5 rightOuterJoin相當於關系表的右連接
- 6 leftOuterJoin相當於關系表的左連接
- 7 cogroup相當於對兩個輸入分別goupByKey然后再對結果進行groupByKey
- 8 subtractByKey去除x中那些key也在y中的元素
- 9 oldByKey的操作和reduceByKey類似,但是要提供一個初始值
- 緩存操作
- 共享變量
- 分區操作
# 打印版本
import pyspark
print(pyspark.__version__)
3.2.0
# 創建會話 https://www.codingdict.com/article/8885
# 參數配置
conf = pyspark.SparkConf().setAppName("rdd_tutorial")
#主函數
sc=pyspark.SparkContext(conf=conf)
# 創建RDD
# 本地加載數據 https://www.cnblogs.com/ivan1026/p/9047726.html
file="./test.txt"
rdd=sc.textFile(file,3)
#RDD類型的數據轉化為數組 https://blog.csdn.net/chaoshengmingyue/article/details/82021746
rdd.collect()
['hello world,',
"hello spark',",
'spark love jupyter,',
'spark love pandas,',
'spark love sql']
#parallelize將Driver中的數據結構生成RDD,第二個參數指定分區數,即將數據放多幾個分布式端 數據和分區
# 並行集合的一個重要參數是slices,表示數據集切分的份數。Spark將會在集群上為每一份數據起一個任務
rdd = sc.parallelize(range(1,11),2)
rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 常用action操作
# Action操作將觸發基於RDD依賴關系的計算
rdd=sc.parallelize(range(10))
#collect操作將數據匯集到Driver,數據過大時有超內存風險
all_data = rdd.collect()
all_data
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 有序去前n個數據
rdd.take(4)
[0, 1, 2, 3]
# 隨機取,第一個設置是否放回 2 num 3 隨機種子
rdd.takeSample(False,7,1)
[6, 8, 9, 7, 5, 3, 0]
# 查看某函數用法
rdd.takeSample?
[1;31mSignature:[0m [0mrdd[0m[1;33m.[0m[0mtakeSample[0m[1;33m([0m[0mwithReplacement[0m[1;33m,[0m [0mnum[0m[1;33m,[0m [0mseed[0m[1;33m=[0m[1;32mNone[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Return a fixed-size sampled subset of this RDD.
Notes
-----
This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
Examples
--------
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
>>> len(rdd.takeSample(False, 5, 2))
5
>>> len(rdd.takeSample(False, 15, 3))
10
[1;31mFile:[0m g:\anaconda\ana2\lib\site-packages\pyspark\rdd.py
[1;31mType:[0m method
rdd.first()
0
rdd.count()
10
#foreach對每一個元素執行某種操作,不生成新的RDD
#累加器用法詳見共享變量
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
45
pairRdd = sc.parallelize([(1,1),(1,4),(1,1),(2,16)])
pairRdd.countByKey()
defaultdict(int, {1: 3, 2: 1})
pairRdd.countByValue()
defaultdict(int, {(1, 1): 2, (1, 4): 1, (2, 16): 1})
rdd.collect()
[0, 1, 2, 3, 4]
#saveAsTextFile保存rdd成text文件到本地 理論上可以保存成功
text_file = "./rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-40-d076c4237135> in <module>
----> 1 rdd.saveAsTextFile(text_file)
G:\anaconda\ana2\lib\site-packages\pyspark\rdd.py in saveAsTextFile(self, path, compressionCodecClass)
1826 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
1827 else:
-> 1828 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
1829
1830 # Pair functions
G:\anaconda\ana2\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
G:\anaconda\ana2\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o331.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/G:/VScode/pyspark_learning/rdd.txt already exists
at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:298)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1578)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1578)
at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1564)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1564)
at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:551)
at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:550)
at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
# 常用Transformation操作
rdd=sc.parallelize(range(20))
rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# map
rdd.map(lambda x:x+2).collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
#filter
rdd.filter(lambda x:x%2==1).collect()
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
# flatMap
#flatMap操作執行將每個元素生成一個Array后壓平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()
[['hello', 'world'], ['hello', 'China']]
rdd=rdd.flatMap(lambda x:x.split(" ")).collect()
#sample
rdd=sc.parallelize(range(20))
rdd.sample(False,5,0).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()
[1, 2, 3, 4, 5]
# 屬於A不屬於B的元素 補集
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
b.collect()
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
#補集
a.subtract(b).collect()
[0, 1, 2, 3, 4]
# 並集
a.union(b).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
# 交集
a.intersection(b).collect()
[5, 6, 7, 8, 9]
# 笛卡爾積
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()
[('LiLei', 'HanMeiMei'),
('LiLei', 'Lily'),
('Tom', 'HanMeiMei'),
('Tom', 'Lily')]
a.sortBy(lambda x:x,ascending=False).collect()
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()
[(4, 1, 1), (3, 2, 2), (1, 2, 3)]
rdd.sortBy?
[1;31mSignature:[0m [0mrdd[0m[1;33m.[0m[0msortBy[0m[1;33m([0m[0mkeyfunc[0m[1;33m,[0m [0mascending[0m[1;33m=[0m[1;32mTrue[0m[1;33m,[0m [0mnumPartitions[0m[1;33m=[0m[1;32mNone[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m
Sorts this RDD by the given keyfunc
Examples
--------
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
[1;31mFile:[0m g:\anaconda\ana2\lib\site-packages\pyspark\rdd.py
[1;31mType:[0m method
# zip
rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])
rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())
[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
#zip withindex
#將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
# 常用PairRDD的轉換操作
rdd = sc.parallelize([("hello",1),("world",2),
("hello",3),("world",5)])
rdd.collect()
[('hello', 1), ('world', 2), ('hello', 3), ('world', 5)]
rdd.reduceByKey(lambda x,y: x*y).collect()
[('hello', 3), ('world', 10)]
rdd.groupByKey().collect()
[('hello', <pyspark.resultiterable.ResultIterable at 0x1ab3f6892b0>),
('world', <pyspark.resultiterable.ResultIterable at 0x1ab3f689100>)]
# join 內連接
age = sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.join(gender).collect()
[('HanMeiMei', (16, 'female')), ('LiLei', (18, 'male'))]
#leftOuterJoin和rightOuterJoin
age.rightOuterJoin(gender).collect()
[('HanMeiMei', (16, 'female')),
('LiLei', (18, 'male')),
('Lucy', (None, 'female'))]
#leftOuterJoin和rightOuterJoin
age.leftOuterJoin(gender).collect()
[('HanMeiMei', (16, 'female')), ('LiLei', (18, 'male')), ('Jim', (20, None))]
#cogroup connect_group相當於對兩個輸入分別goupByKey然后再對結果進行groupByKey
x = sc.parallelize([("a",1),("b",2),("a",3)])
y = sc.parallelize([("a",2),("b",3),("b",5)])
result = x.cogroup(y).collect()
result
[('b',
(<pyspark.resultiterable.ResultIterable at 0x1ab3f832250>,
<pyspark.resultiterable.ResultIterable at 0x1ab3f832310>)),
('a',
(<pyspark.resultiterable.ResultIterable at 0x1ab3f8215b0>,
<pyspark.resultiterable.ResultIterable at 0x1ab3f8286d0>))]
list(result[0][1])
[<pyspark.resultiterable.ResultIterable at 0x1ab3f832250>,
<pyspark.resultiterable.ResultIterable at 0x1ab3f832310>]
#subtractByKey去除x中那些key也在y中的元素
x = sc.parallelize([("a",1),("b",2),("c",3)])
y = sc.parallelize([("a",2),("b",(1,2))])
x.subtractByKey(y).collect()
[('c', 3)]
#foldByKey的操作和reduceByKey類似,但是要提供一個初始值
x = sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda x,y:x*y).collect()
[('a', 3), ('b', 10)]
五,緩存操作
如果一個rdd被多個任務用作中間量,那么對其進行cache緩存到內存中對加快計算會非常有幫助。
聲明對一個rdd進行cache后,該rdd不會被立即緩存,而是等到它第一次被計算出來時才進行緩存。
可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和EMORY_AND_DISK。
如果一個RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執行的。
緩存數據不會切斷血緣依賴關系,這是因為緩存數據某些分區所在的節點有可能會有故障,例如內存溢出或者節點損壞。
這時候可以根據血緣關系重新計算這個分區的數據。
#cache緩存到內存中,使用存儲級別 MEMORY_ONLY。
#MEMORY_ONLY意味着如果內存存儲不下,放棄存儲其余部分,需要時重新計算。
a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a
print(mean_a)
4999.5
#persist緩存到內存或磁盤中,默認使用存儲級別MEMORY_AND_DISK
#MEMORY_AND_DISK意味着如果內存存儲不下,其余部分存儲到磁盤中。
#persist可以指定其它存儲級別,cache相當於persist(MEMORY_ONLY)
from pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a
a.unpersist() #立即釋放緩存
print(mean_a)
4999.5
六,共享變量
當spark集群在許多節點上運行一個函數時,默認情況下會把這個函數涉及到的對象在每個節點生成一個副本。
但是,有時候需要在不同節點或者節點和Driver之間共享變量。
Spark提供兩種類型的共享變量,廣播變量和累加器。
廣播變量是不可變變量,實現在不同節點不同任務之間共享數據。
廣播變量在每個機器上緩存一個只讀的變量,而不是為每個task生成一個副本,可以減少數據的傳輸。
累加器主要是不同節點和Driver之間共享變量,只能實現計數或者累加功能。
累加器的值只有在Driver上是可讀的,在節點上不可見。
# 廣播變量 broadcast 不可變,在所有節點可讀
rdd = sc.parallelize(range(10))
broads = sc.broadcast(100)
print(rdd.map(lambda x:x+broads.value).collect())
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
print(broads.value)
100
#累加器 只能在Driver上可讀,在其它節點只能進行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value
45
# 計算數據的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)
def func(x):
total.add(x)
count.add(1)
rdd.foreach(func)
total.value/count.value
2.5999999999999996
七,分區操作
分區操作包括改變分區操作,以及針對分區執行的一些轉換操作。
glom:將一個分區內的數據轉換為一個列表作為一行。
coalesce:shuffle可選,默認為False情況下窄依賴,不能增加分區。repartition和partitionBy調用它實現。
repartition:按隨機數進行shuffle,相同key不一定在同一個分區
partitionBy:按key進行shuffle,相同key放入同一個分區
HashPartitioner:默認分區器,根據key的hash值進行分區,相同的key進入同一分區,效率較高,key不可為Array.
RangePartitioner:只在排序相關函數中使用,除相同的key進入同一分區,相鄰的key也會進入同一分區,key必須可排序。
TaskContext: 獲取當前分區id方法 TaskContext.get.partitionId
mapPartitions:每次處理分區內的一批數據,適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支
mapPartitionsWithIndex:類似mapPartitions,提供了分區索引,輸入參數為(i,Iterator)
foreachPartition:類似foreach,但每次提供一個Partition的一批數據
#glom將一個分區內的數據轉換為一個列表作為一行。
a = sc.parallelize(range(10),4)
# a 10個數據存儲在n個分區上
b = a.glom()
b.collect()
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
a.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
#coalesce 默認shuffle為False,不能增加分區,只能減少分區
#如果要增加分區,要設置shuffle = true
#parallelize等許多操作可以指定分區數
a = sc.parallelize(range(10),3)
print(a.getNumPartitions())
print(a.glom().collect())
3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
b = a.coalesce(2)
b.getNumPartitions()
2
#repartition按隨機數進行shuffle,相同key不一定在一個分區,可以增加分區
#repartition實際上調用coalesce實現,設置了shuffle = True
a = sc.parallelize(range(10),3)
c = a.repartition(4)
print(c.glom().collect())
[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]
c = a.repartition(2)
print(c.glom().collect())
[[0, 1, 2, 6, 7, 8, 9], [3, 4, 5]]
#partitionBy按key進行shuffle,相同key一定在一個分區
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c = a.partitionBy(2)
print(c.glom().collect())
[[('c', 3)], [('a', 1), ('a', 1), ('a', 2)]]
#mapPartitions可以對每個分區分別執行操作
#每次處理分區內的一批數據,適合需要按批處理數據的情況
#例如將數據寫入數據庫時,可以極大的減少連接次數。
#mapPartitions的輸入分區內數據組成的Iterator,其輸出也需要是一個Iterator
#以下例子查看每個分區內的數據,相當於用mapPartitions實現了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
#mapPartitionsWithIndex可以獲取兩個參數
#即分區id和每個分區內的數據組成的Iterator
a = sc.parallelize(range(11),2)
def func(pid,it):
s = sum(it)
return(iter([str(pid) + "|" + str(s)]))
[str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()
['0|10', '1|45']
#利用TaskContext可以獲取當前每個元素的分區
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()
[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]
#foreachPartition對每個分區分別執行操作
#范例:求每個分區內最大值的和
total = sc.accumulator(0.0)
a = sc.parallelize(range(1,101),3)
def func(it):
total.add(max(it))
a.foreachPartition(func)
total.value
199.0
#aggregate是一個Action操作
#aggregate比較復雜,先對每個分區執行一個函數,再對每個分區結果執行一個合並函數。
#例子:求元素之和以及元素個數
#三個參數,第一個參數為初始值,第二個為分區執行函數,第三個為結果合並執行函數。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):
return((t[0]+x,t[1]+1))
def outer_func(p,q):
return((p[0]+q[0],p[1]+q[1]))
rdd.aggregate((0,0),inner_func,outer_func)
(210, 20)
#aggregateByKey的操作和aggregate類似,但是會對每個key分別進行操作
#第一個參數為初始值,第二個參數為分區內歸並函數,第三個參數為分區間歸並函數
a = sc.parallelize([("a",1),("b",1),("c",2),
("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),
lambda x,y:max(x,y))
b.collect()
[('b', 3), ('a', 2), ('c', 2)]