Spark 基本函數學習筆記一¶
spark的函數主要分兩類,Transformations和Actions。
Transformations為一些數據轉換類函數,actions為一些行動類函數:
- 轉換:轉換的返回值是一個新的RDD集合,而不是單個值。調用一個變換方法,
不會有任何求值計算,它只獲取一個RDD作為參數,然后返回一個新的RDD。 - 行動:行動操作計算並返回一個新的值。當在一個RDD對象上調用行動函數時,
會在這一時刻計算全部的數據處理查詢並返回結果值。
這里介紹pyspark中常用函數功能以及代碼示例。
官方文檔鏈接:http://spark.apache.org/docs/2.3.3/api/python/pyspark.html#pyspark.RDD
文檔github鏈接:Spark基本函數學習
RDD下面的Transformations函數,這些函數適用於RDD集合操作:
- map(func)
- flatMap(func)
- mapPartitions(func)
- mapPartitionsWithIndex(func)
- foreach(f)
- foreachPartition(f)
- filter(func)
- sample()
- union()
- intersection()
- distinct()
- groupBy()
- groupByKey()
- reduce
- reduceByKey()
- aggregate
- aggregateByKey()
- sortBy
- sortByKey()
- join()
- cogroup()
- cartesian()
- coalesce()
- Pipe()
- Repartition()
- rePartitionAndSortWithinPartitions()
from pyspark.sql import SparkSession
import numpy as np
from pyspark import SparkContext
spark = SparkSession.Builder().getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize(range(10))
rdd.collect()
map(func)轉換¶
map(func) 與python的map函數功能一樣,都是對每個元素執行func函數的計算。
返回一個新的數據集,數據集的每個元素都是經過func函數處理的
我們這里以對每個元素乘以10計算示例
# 使用lambda 函數
temp = rdd.map(lambda x: x*10)
temp.collect()
# 使用自定義函數
def multi_10(x):
return x * 10
temp = rdd.map(multi_10)
temp.collect()
flatMap(func)¶
類似於map(func), 但是不同的是map對每個元素處理完后返回與原數據集相同元素數量的數據集,而flatMap返回的元素數不一定和原數據集相同
rdd = sc.parallelize([[1,2],[2,3],[3,4]])
d = rdd.flatMap(lambda x: x)
d.collect()
mapPartitions(func)¶
mapPartitions是map的一個變種。
map的輸入函數是應用於RDD中每個元素,而mapPartitions的輸入函數是應用於每個分區,
也就是把每個分區中的內容作為整體來處理的。
rdd = sc.parallelize([1,2,3,4,5], 3)
def f(iterator):
yield sum(iterator)
rdd.mapPartitions(f).collect()
glom()函數就是要顯示出RDD對象的分區情況
可以看到rdd分了三個區,每個區的數據為: [[1], [2, 3], [4, 5]]
所以上面的例子中mapPartitions計算sum的結果是三個,
每個分區求和結果是[1,5,9]
rdd.glom().collect()
mapPartitionsWithIndex(func)¶
與mapPartition相比,mapPartitionWithIndex能夠保留分區索引,函數的傳入參數也是分區索引和iterator構成的鍵值對。
def f1(partitionIndex,iterator):
yield (partitionIndex,sum(iterator))
def f2(partitionIndex,iterator):
yield sum(iterator)
rf1 = rdd.mapPartitionsWithIndex(f1)
rf2 = rdd.mapPartitionsWithIndex(f2)
# f1 的返回值可以保留分區索引
print(rf1.glom().collect())
print(rf2.glom().collect())
rdd = sc.parallelize([1,2,3,4,5])
res = rdd.foreach(lambda x: x*2)
print(res) # 打印結果為None
rdd.collect() # 輸出為 [1, 2, 3, 4, 5]
# 打印元素,如果使用jupyter,在啟動頁面可以看到打印
def f(x):
print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
foreachPartition(f)¶
與foreach一樣,只是操作的對象為RDD的每個數據分區
def f(iterator):
for x in iterator:
print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
filter(func)函數¶
返回一個新的數據集,這個數據集中的元素是通過func函數篩選后返回為true的元素
簡單的說就是,對數據集中的每個元素進行篩選,如果符合條件則返回true,不符合返回false,
最后將返回為true的元素組成新的數據集返回
# 選擇偶數
d = rdd.filter(lambda x: x % 2 ==0)
d.collect()
# 整除3
def three(x):
return x % 3 == 0
d = rdd.filter(three)
d.collect()
rdd.sample(False, 0.4, 40).collect()
union¶
union(otherDataset)是數據合並,返回一個新的數據集,由原數據集和otherDataset聯合而成。
rdd1 = rdd.map(lambda x: x + 10)
rdd.union(rdd1).collect()
intersection()¶
返回兩個數據集的交集
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()
distinct()¶
返回數據集中不同值的列表,即去除數據集中重復的元素
rdd = sc.parallelize([1,2,3,4,1,2,3])
rdd.distinct().collect()
groupBy¶
給定一個分組條件,返回分組后的key value數據集
rdd = sc.parallelize(range(10))
# 按照模2結果來分組
res = rdd.groupBy(lambda x: x % 2).collect()
[(k, sorted(v)) for k,v in res]
# 按照模3結果來分組
res = rdd.groupBy(lambda x: x % 3).collect()
[(k, sorted(v)) for k,v in res]
rdd = sc.parallelize([('a', 1), ('a',1), ('b',1), ('b',1), ('b',1), ('c',1),('c',1),('c',1)])
rdd.collect()
# 分組后求值的和
rdd.groupByKey().mapValues(sum).collect()
from operator import add
# 累計求和
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
reduceByKey¶
功能與reduce函數一樣。不過輸入的數據為key value格式,按照key分組進行函數操作。
在WordCount例子中,使用reduceByKey來統計單詞的個數。 鏈接:https://www.cnblogs.com/StitchSun/p/10535822.html
rdd = sc.parallelize([('a', 1), ('a',1), ('b',1), ('b',1), ('b',1), ('c',1),('c',1),('c',1)])
rdd.reduceByKey(add).collect()
aggregate()¶
函數原型:aggregate(zeroValue, seqOp, combOp)
aggregate函數操作比較復雜,有兩個函數。seqOp函數是對每個分區的元素與zoroValue進行計算,
然后由combOp函數對所有分區的結果進行計算。
將初始值和第一個分區中的第一個元素傳遞給seq函數進行計算,然后將計算結果和第二個元素傳遞給seq函數,直到計算到最后一個值。
第二個分區中也是同理操作。
最后將初始值、所有分區的結果經過combine函數進行計算(先將前兩個結果進行計算,將返回結果和下一個結果傳給combine函數,以此類推),並返回最終結果。
data = sc.parallelize((1,2,3,4,5,6),2)
# 如果使用jupyter,打印結果在jupyter頁面可以看到
def seq(a,b):
print('seqOp:'+str(a)+"\t"+str(b))
return min(a,b)
def combine(a,b):
print('comOp:'+str(a)+"\t"+str(b))
return a+b
# 例子解析:
# 先在每個分區中元素中兩兩操作,找最小的元素。
# 計算完成后,由combine函數計算兩兩分區結果的和
# 計算步驟1:初始值為3,與分區[1,2,3]元素一一進行seq最小值運算,得到結果為 1
# 計算步驟2:初始值為3,與分區[4,5,6]元素一一進行seq最小值運算,得到結果為 3
# 計算步驟3:初始值為3,與分區結果1,分區結果3進行combine相加運算,得到結果為 3 + 1 + 3, 結果為7
print(data.glom().collect())
data.aggregate(3,seq, combine)
aggregateByKey()¶
函數原型 aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash>)
在kv對的RDD中,按key將value進行分組合並,合並時,將每個value和初始值作為seq函數的參數,進行計算,
返回的結果作為一個新的kv對,然后再將結果按照key進行合並,
最后將每個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),
將key與計算結果作為一個新的kv對輸出
data = sc.parallelize([(1,3),(1,2),(1,4),(2,3)])
def seqFunc(a, b):
print("seqFunc:%s,%s" %(a,b))
return max(a, b) #取最大值
def combFunc(a, b):
print("combFunc:%s,%s" %(a ,b))
return a + b #累加起來
# aggregateByKey這個算子內部有分組
# 這里numPartitions值為4,將數據分為四個區,seqFunc計算結果為 (1,3),(1,3),(1,4)和(2,3)
# 然后按照key分組進行comFunc計算,得到結果[(1, 10), (2,3)]
data.aggregateByKey(3, seqFunc, combFunc, 4).collect()
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
x0 = sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
x1 = sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
print(x0)
print(x1)
sortByKey¶
函數原型 sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.>)
根據key進行排序,輸入的數據必須為key value格式
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().collect()
join()¶
兩個數據集按照key內連接,返回數據集中有相同key的元素組成的新的數據集,
數據集A中的元素與數據集B中相同key元素一一組合,生成新的數據集
格式為(key, (value1, value2))
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()
cogroup()¶
將多個RDD中同一個Key對應的Value組合到一起。如果RDD中沒有對應的key,則會生成一個空值
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
for k, v in x.cogroup(y).collect():
print(k, tuple(map(list, v)))
rdd = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4, 5])
rdd.cartesian(rdd2).collect()
coalesce()¶
按照給定的數量重新分區數據集
old = sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
print(old)
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(2).glom().collect()
pipe¶
對rdd數據集的每個元素,調用外部程序
# cat 文件內容
# 先在目錄下創建一個for_test.txt文件,然后來讀取文件內容
sc.parallelize(['1', '2', '', '3']).pipe('cat for_test.txt').collect()
repartition¶
數據集重新分區
# 創建默認為四個分區的數據集
rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
print(rdd.glom().collect())
# 重新分為兩個分區
rdd.repartition(2).glom().collect()
rePartitionAndSortWithinPartitions()¶
根據給定的分區程序對RDD進行重新分區,並在每個生成的分區內按鍵對記錄進行排序。
這比調用重新分區,然后在每個分區內進行排序更有效率,因為它可以將排序壓入洗牌機器。
應用場景:
- 如果需要重分區,並且想要對分區中的數據進行升序排序。
- 提高性能,替換repartition和sortBy
rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True)
rdd2.glom().collect()