一、map
- map:對RDD中每個元素都執行一個指定函數從而形成一個新的RDD
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
def func(x):
return x*2
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
mapRdd2 = rdd.map(func)
print(mapRdd2.collect())# [2, 4, 6, 8, 10]
map依賴圖關系如下,紅框代表整個數據集,黑框代表一個RDD分區,里面是每個分區的數據集
- flatMap:與map類似,但是每一個輸入元素會被映射成0個或多個元素,最后達到扁平化效果
data = [[1,2],[3],[4],[5]]
rdd = sc.parallelize(data)
print(rdd.collect()) # [[1, 2], [3], [4], [5]]
flatMapRdd = rdd.flatMap(lambda x: x)
print(flatMapRdd.collect())# [1, 2, 3, 4, 5]
flatMap依賴關系圖如下
map和flatMap對比
rdd = sc.parallelize([("A",1),("B",2),("C",3)])
flatMaprdd = rdd.flatMap(lambda x:x)
print(flatMaprdd.collect()) # ['A', 1, 'B', 2, 'C', 3]
maprdd = rdd.map(lambda x:x)
print(maprdd.collect()) # [('A', 1), ('B', 2), ('C', 3)]
- mapPartitions:是map的一個變種,map對每個元素執行指定函數,mapPartitions對每個分區數據執行指定函數
rdd = sc.parallelize([1, 2, 3, 4],2)
def f(iterator):
yield sum(iterator)
print(rdd.mapPartitions(f).collect()) # [3, 7] 兩個分區,第一個分區為 [1,2],第二個分區為[3,4]
其實mapPartitions可以當成map來用:如果涉及到連接數據庫的操作,可以在mapParition所用的函數里建立連接。然后得到的結果保存到列表中返回。
from pyspark import SparkContext
sc = SparkContext("local", "First App")
rdd1 = sc.parallelize([("a",1), ("b",1), ("c",2), ("d",3),("e",6),("f",7),("g",8)])
rdd2 = sc.parallelize([("a",2), ("b",3), ("c",4), ("d",5)])
rdd4 = sc.parallelize([("a",2), ("b",3), ("c",4), ("d",5)])
rdd3 = rdd2.union(rdd1).union(rdd4)
print(rdd3.collect())
rdd5 = rdd2.union(rdd1).union(rdd4).repartition(3)
print(rdd5.getNumPartitions())
def myfunc(x):
res = []
for item in x:
res.append(item[0])
return res
rdd6 = rdd5.mapPartitions(myfunc) # 傳入列表,然后返回列表。最終rdd6里面就是一個列表
print(rdd6.collect())
二、filter
1.filter:按照條件進行過濾
rdd = sc.parallelize([("a",2), ("b",3), ("c",4), ("d",5)])
# 條件為True的元素留下,舍棄為False的元素
rdd = rdd.filter(lambda x: x[1]>=5)
對於已經排序好的rdd,配合zipWithIndex(),可以使用 filter()來獲取前N個數據組成的RDD,而不是take()或者top()這些行動算子。
三、sort
1.sortBy:排序
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
"""
Sorts this RDD by the given keyfunc
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
"""
return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
源碼中,默認是正序排列,一般業務需要倒敘排列,參數為False,即大的數在前面。
排序條件可以設置多個,當第一個相同時,以此按照后續條件進行排序
from pyspark import SparkContext
sc = SparkContext("local", "First App")
rdd = sc.parallelize([(1,"a",1),(1,"b",2),(3,"d",4),(3,"c",3)])
rdd = rdd.sortBy(lambda x:(x[0],x[1],x[2]),False)
print(rdd.collect())
2.sortByKey
針對 key-value數據,根據key進行排序
list = ["14", "134", "1244"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word: (word, len(word)))
# x是key,x[1] 是key對應字符串中索引為1的字母
aa = pairRDD.sortByKey(keyfunc=lambda x: x[1])
aa.foreach(print)
# ('1244', 4)
# ('134', 3)
# ('14', 2)
#注意上下的結果區別
list = ["14", "134", "1244"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word: (word, len(word)))
aa = pairRDD.sortBy(keyfunc=lambda x: x[1])
aa.foreach(print)
# ('14', 2)
# ('134', 3)
# ('1244', 4)
四、zip
1、zip
兩個RDD具有相同個數的分區,並且每個分區內的個數相等
例子:
x=sc.parallelize(range(5),2)
y=sc.parallelize(range(1000,1005),2)
a=x.zip(y).glom().collect()
print(a)
a=x.zip(y).collect()
print(a)
2、zipWithIndex()
給RDD的每個元素加上索引。排序后的RDD加上元素對應的順序序號
# zipWithIndex()的結果為[((1,2),0),((1,3),1)]
sortBy(lambda x: (x[0], x[1]), False).zipWithIndex().map(lambda x: (x[0][0],x[1]+1))
3、zipWithUniqueId
返回k-v,與分區有關系
k, n+k, 2n+k,
n為分區總數,下例 n=2
k其屬於第幾個分區,從0開始計數。下例 k=0,1
對於k=0的分區:\(0+0*2,0+1*2,0+2*2\)
rdd=sc.parallelize(list('123456'),2)
print(rdd.glom().collect()) # [['1', '2', '3'], ['4', '5', '6']]
a=rdd.zipWithUniqueId().glom().collect()
print(a) # [[('1', 0), ('2', 2), ('3', 4)], [('4', 1), ('5', 3), ('6', 5)]]