pyspark基礎入門


  • 工作方式 單機 分布式
  • 內存緩存 單機緩存 persist() or cache()將轉換的RDDs保存在內存
  • df可變性 pandas 是可變的 spark_df中RDDs是不可變的 所以DF不可變
  • 創建
  • https://www.qedev.com/bigdata/170633.html 詳細對比

  • 1

RDD數據結構的常用函數

  • 創建RDD
    • 1 是textFile加載本地或者集群文件系統中的數據,
    • 2 用parallelize方法將Driver中的數據結構並行化成RDD。
  • 常用Action操作
    • 1 collect 將數據匯集到Driver,數據過大時有超內存風險
    • 2 take 將前若干個數據匯集到Driver,比collect安全
    • 3 takeSample 可以隨機取若干個到Driver,第一個參數設置是否放回抽樣
    • 4 first 取第一個數據
    • 5 count 查看RDD元素數量
    • 6 reduce 利用二元函數對數據進行規約
    • 7 foreach 對每一個元素執行某種操作,不生成新的RDD
    • 8 countByKey 對Pair RDD按key統計數量
    • 9 saveAsTextFile 保存rdd成text文件到本地
  • 常用Transformation操作
    • 1 Transformation 轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關系,只有當Action操作觸發到該依賴的時候,它才被計算。
    • 2 map 操作對每個元素進行一個映射轉換
    • 3 filter 應用過濾條件過濾掉一些數據
    • 4 flatMap 操作執行將每個元素生成一個Array后壓平
    • 5 sample 對原rdd在每個分區按照比例進行抽樣,第一個參數設置是否可以重復抽樣
    • 6 distinct 去重
    • 7 subtract 找到屬於前一個rdd而不屬於后一個rdd的元素
    • 8 union 合並數據
    • 9 intersection 求交集
    • 10 cartesian 笛卡爾積
    • 11 sortBy 按照某種方式進行排序
    • 12 zip 按照拉鏈方式連接兩個RDD,效果類似python的zip函數 需要兩個RDD具有相同的分區,每個分區元素數量相同4
    • 13 zipWithIndex 將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
  • 常用PairrDD的轉換操作 PairRDD指的是數據為長度為2的tuple類似(k,v)結構的數據類型的RDD,其每個數據的第一個元素被當做key,第二個元素被當做value.
    • 1 reduceByKey對相同的key對應的values應用二元歸並操作
    • 2 groupByKey將相同的key對應的values收集成一個Iterator 迭代器
    • 3 sortByKey按照key排序,可以指定是否降序
    • 4 join相當於根據key進行內連接
    • 5 rightOuterJoin相當於關系表的右連接
    • 6 leftOuterJoin相當於關系表的左連接
    • 7 cogroup相當於對兩個輸入分別goupByKey然后再對結果進行groupByKey
    • 8 subtractByKey去除x中那些key也在y中的元素
    • 9 oldByKey的操作和reduceByKey類似,但是要提供一個初始值
  • 緩存操作
  • 共享變量
  • 分區操作
# 打印版本
import pyspark
print(pyspark.__version__)
3.2.0
# 創建會話 https://www.codingdict.com/article/8885
# 參數配置
conf = pyspark.SparkConf().setAppName("rdd_tutorial")
#主函數
sc=pyspark.SparkContext(conf=conf)
# 創建RDD
# 本地加載數據 https://www.cnblogs.com/ivan1026/p/9047726.html
file="./test.txt"
rdd=sc.textFile(file,3)
#RDD類型的數據轉化為數組 https://blog.csdn.net/chaoshengmingyue/article/details/82021746
rdd.collect()
['hello world,',
 "hello spark',",
 'spark love jupyter,',
 'spark love pandas,',
 'spark love sql']
#parallelize將Driver中的數據結構生成RDD,第二個參數指定分區數,即將數據放多幾個分布式端 數據和分區
# 並行集合的一個重要參數是slices,表示數據集切分的份數。Spark將會在集群上為每一份數據起一個任務
rdd = sc.parallelize(range(1,11),2)
rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 常用action操作
# Action操作將觸發基於RDD依賴關系的計算
rdd=sc.parallelize(range(10))
#collect操作將數據匯集到Driver,數據過大時有超內存風險
all_data = rdd.collect()
all_data
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 有序去前n個數據
rdd.take(4)
[0, 1, 2, 3]

# 隨機取,第一個設置是否放回 2 num 3 隨機種子
rdd.takeSample(False,7,1)
[6, 8, 9, 7, 5, 3, 0]
# 查看某函數用法
rdd.takeSample?
Signature: rdd.takeSample(withReplacement, num, seed=None)
Docstring:
Return a fixed-size sampled subset of this RDD.

Notes
-----
This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.

Examples
--------
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
>>> len(rdd.takeSample(False, 5, 2))
5
>>> len(rdd.takeSample(False, 15, 3))
10
File:      g:\anaconda\ana2\lib\site-packages\pyspark\rdd.py
Type:      method
rdd.first()
0
rdd.count()
10
#foreach對每一個元素執行某種操作,不生成新的RDD
#累加器用法詳見共享變量
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
45
pairRdd = sc.parallelize([(1,1),(1,4),(1,1),(2,16)]) 
pairRdd.countByKey()
defaultdict(int, {1: 3, 2: 1})
pairRdd.countByValue()
defaultdict(int, {(1, 1): 2, (1, 4): 1, (2, 16): 1})
rdd.collect()
[0, 1, 2, 3, 4]
#saveAsTextFile保存rdd成text文件到本地 理論上可以保存成功
text_file = "./rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-40-d076c4237135> in <module>
----> 1 rdd.saveAsTextFile(text_file)


G:\anaconda\ana2\lib\site-packages\pyspark\rdd.py in saveAsTextFile(self, path, compressionCodecClass)
   1826             keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
   1827         else:
-> 1828             keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   1829 
   1830     # Pair functions


G:\anaconda\ana2\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 


G:\anaconda\ana2\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)


Py4JJavaError: An error occurred while calling o331.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/G:/VScode/pyspark_learning/rdd.txt already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:298)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1578)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1578)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1564)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:551)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
# 常用Transformation操作
rdd=sc.parallelize(range(20))
rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# map
rdd.map(lambda x:x+2).collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
#filter 
rdd.filter(lambda x:x%2==1).collect()
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
# flatMap
#flatMap操作執行將每個元素生成一個Array后壓平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()
[['hello', 'world'], ['hello', 'China']]
rdd=rdd.flatMap(lambda x:x.split(" ")).collect()
#sample
rdd=sc.parallelize(range(20))
rdd.sample(False,5,0).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()
[1, 2, 3, 4, 5]
# 屬於A不屬於B的元素 補集
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
b.collect()
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
#補集
a.subtract(b).collect()
[0, 1, 2, 3, 4]
# 並集
a.union(b).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
# 交集
a.intersection(b).collect()
[5, 6, 7, 8, 9]
# 笛卡爾積
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()
[('LiLei', 'HanMeiMei'),
 ('LiLei', 'Lily'),
 ('Tom', 'HanMeiMei'),
 ('Tom', 'Lily')]
a.sortBy(lambda x:x,ascending=False).collect()
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()
[(4, 1, 1), (3, 2, 2), (1, 2, 3)]
rdd.sortBy?
Signature: rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
Docstring:
Sorts this RDD by the given keyfunc

Examples
--------
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
File:      g:\anaconda\ana2\lib\site-packages\pyspark\rdd.py
Type:      method
# zip
rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])

rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())
[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
#zip withindex
#將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
# 常用PairRDD的轉換操作
rdd = sc.parallelize([("hello",1),("world",2),
                               ("hello",3),("world",5)])
rdd.collect()
[('hello', 1), ('world', 2), ('hello', 3), ('world', 5)]
rdd.reduceByKey(lambda x,y: x*y).collect()
[('hello', 3), ('world', 10)]
rdd.groupByKey().collect()
[('hello', <pyspark.resultiterable.ResultIterable at 0x1ab3f6892b0>),
 ('world', <pyspark.resultiterable.ResultIterable at 0x1ab3f689100>)]
# join 內連接
age = sc.parallelize([("LiLei",18),
                        ("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),
                        ("HanMeiMei","female"),("Lucy","female")])
age.join(gender).collect()
[('HanMeiMei', (16, 'female')), ('LiLei', (18, 'male'))]
#leftOuterJoin和rightOuterJoin
age.rightOuterJoin(gender).collect()
[('HanMeiMei', (16, 'female')),
 ('LiLei', (18, 'male')),
 ('Lucy', (None, 'female'))]
#leftOuterJoin和rightOuterJoin
age.leftOuterJoin(gender).collect()
[('HanMeiMei', (16, 'female')), ('LiLei', (18, 'male')), ('Jim', (20, None))]
#cogroup connect_group相當於對兩個輸入分別goupByKey然后再對結果進行groupByKey

x = sc.parallelize([("a",1),("b",2),("a",3)])
y = sc.parallelize([("a",2),("b",3),("b",5)])

result = x.cogroup(y).collect()
result
[('b',
  (<pyspark.resultiterable.ResultIterable at 0x1ab3f832250>,
   <pyspark.resultiterable.ResultIterable at 0x1ab3f832310>)),
 ('a',
  (<pyspark.resultiterable.ResultIterable at 0x1ab3f8215b0>,
   <pyspark.resultiterable.ResultIterable at 0x1ab3f8286d0>))]
list(result[0][1])
[<pyspark.resultiterable.ResultIterable at 0x1ab3f832250>,
 <pyspark.resultiterable.ResultIterable at 0x1ab3f832310>]
#subtractByKey去除x中那些key也在y中的元素

x = sc.parallelize([("a",1),("b",2),("c",3)])
y = sc.parallelize([("a",2),("b",(1,2))])

x.subtractByKey(y).collect()
[('c', 3)]
#foldByKey的操作和reduceByKey類似,但是要提供一個初始值
x = sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda x,y:x*y).collect()
[('a', 3), ('b', 10)]

五,緩存操作

如果一個rdd被多個任務用作中間量,那么對其進行cache緩存到內存中對加快計算會非常有幫助。

聲明對一個rdd進行cache后,該rdd不會被立即緩存,而是等到它第一次被計算出來時才進行緩存。

可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和EMORY_AND_DISK。

如果一個RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執行的。

緩存數據不會切斷血緣依賴關系,這是因為緩存數據某些分區所在的節點有可能會有故障,例如內存溢出或者節點損壞。

這時候可以根據血緣關系重新計算這個分區的數據。

#cache緩存到內存中,使用存儲級別 MEMORY_ONLY。
#MEMORY_ONLY意味着如果內存存儲不下,放棄存儲其余部分,需要時重新計算。
a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

print(mean_a)


4999.5
#persist緩存到內存或磁盤中,默認使用存儲級別MEMORY_AND_DISK
#MEMORY_AND_DISK意味着如果內存存儲不下,其余部分存儲到磁盤中。
#persist可以指定其它存儲級別,cache相當於persist(MEMORY_ONLY)
from  pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

a.unpersist() #立即釋放緩存
print(mean_a)

4999.5

六,共享變量

當spark集群在許多節點上運行一個函數時,默認情況下會把這個函數涉及到的對象在每個節點生成一個副本。

但是,有時候需要在不同節點或者節點和Driver之間共享變量。

Spark提供兩種類型的共享變量,廣播變量和累加器。

廣播變量是不可變變量,實現在不同節點不同任務之間共享數據。

廣播變量在每個機器上緩存一個只讀的變量,而不是為每個task生成一個副本,可以減少數據的傳輸。

累加器主要是不同節點和Driver之間共享變量,只能實現計數或者累加功能。

累加器的值只有在Driver上是可讀的,在節點上不可見。

# 廣播變量 broadcast 不可變,在所有節點可讀
rdd = sc.parallelize(range(10))
broads = sc.broadcast(100)
print(rdd.map(lambda x:x+broads.value).collect())
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
print(broads.value)
100
#累加器 只能在Driver上可讀,在其它節點只能進行累加

total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)

rdd.foreach(lambda x:total.add(x))
total.value

45
# 計算數據的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)

def func(x):
    total.add(x)
    count.add(1)
    
rdd.foreach(func)

total.value/count.value

2.5999999999999996

七,分區操作

分區操作包括改變分區操作,以及針對分區執行的一些轉換操作。

glom:將一個分區內的數據轉換為一個列表作為一行。

coalesce:shuffle可選,默認為False情況下窄依賴,不能增加分區。repartition和partitionBy調用它實現。

repartition:按隨機數進行shuffle,相同key不一定在同一個分區

partitionBy:按key進行shuffle,相同key放入同一個分區

HashPartitioner:默認分區器,根據key的hash值進行分區,相同的key進入同一分區,效率較高,key不可為Array.

RangePartitioner:只在排序相關函數中使用,除相同的key進入同一分區,相鄰的key也會進入同一分區,key必須可排序。

TaskContext: 獲取當前分區id方法 TaskContext.get.partitionId

mapPartitions:每次處理分區內的一批數據,適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支

mapPartitionsWithIndex:類似mapPartitions,提供了分區索引,輸入參數為(i,Iterator)

foreachPartition:類似foreach,但每次提供一個Partition的一批數據

#glom將一個分區內的數據轉換為一個列表作為一行。
a = sc.parallelize(range(10),4)
# a 10個數據存儲在n個分區上
b = a.glom()
b.collect() 

[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
a.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
#coalesce 默認shuffle為False,不能增加分區,只能減少分區
#如果要增加分區,要設置shuffle = true
#parallelize等許多操作可以指定分區數
a = sc.parallelize(range(10),3)  
print(a.getNumPartitions())
print(a.glom().collect())
3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
b = a.coalesce(2) 
b.getNumPartitions()
2
#repartition按隨機數進行shuffle,相同key不一定在一個分區,可以增加分區
#repartition實際上調用coalesce實現,設置了shuffle = True
a = sc.parallelize(range(10),3)  
c = a.repartition(4) 
print(c.glom().collect())
[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]
c = a.repartition(2) 
print(c.glom().collect())
[[0, 1, 2, 6, 7, 8, 9], [3, 4, 5]]
#partitionBy按key進行shuffle,相同key一定在一個分區
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.partitionBy(2)
print(c.glom().collect())
[[('c', 3)], [('a', 1), ('a', 1), ('a', 2)]]
#mapPartitions可以對每個分區分別執行操作
#每次處理分區內的一批數據,適合需要按批處理數據的情況
#例如將數據寫入數據庫時,可以極大的減少連接次數。
#mapPartitions的輸入分區內數據組成的Iterator,其輸出也需要是一個Iterator
#以下例子查看每個分區內的數據,相當於用mapPartitions實現了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
#mapPartitionsWithIndex可以獲取兩個參數
#即分區id和每個分區內的數據組成的Iterator
a = sc.parallelize(range(11),2)

def func(pid,it):
    s = sum(it)
    return(iter([str(pid) + "|" + str(s)]))
    [str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()

['0|10', '1|45']
#利用TaskContext可以獲取當前每個元素的分區
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()


[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]
#foreachPartition對每個分區分別執行操作
#范例:求每個分區內最大值的和
total = sc.accumulator(0.0)

a = sc.parallelize(range(1,101),3)

def func(it):
    total.add(max(it))
    
a.foreachPartition(func)
total.value

199.0
#aggregate是一個Action操作
#aggregate比較復雜,先對每個分區執行一個函數,再對每個分區結果執行一個合並函數。
#例子:求元素之和以及元素個數
#三個參數,第一個參數為初始值,第二個為分區執行函數,第三個為結果合並執行函數。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):
    return((t[0]+x,t[1]+1))

def outer_func(p,q):
    return((p[0]+q[0],p[1]+q[1]))

rdd.aggregate((0,0),inner_func,outer_func)


(210, 20)
#aggregateByKey的操作和aggregate類似,但是會對每個key分別進行操作
#第一個參數為初始值,第二個參數為分區內歸並函數,第三個參數為分區間歸並函數

a = sc.parallelize([("a",1),("b",1),("c",2),
                             ("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),
                            lambda x,y:max(x,y))
b.collect()

[('b', 3), ('a', 2), ('c', 2)]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM