Spark常用RDD操作總結


aggregate

  • 函數原型:aggregate(zeroValue, seqOp, combOp)
  • seqOp相當於Map
  • combOp相當於Reduce
  • zeroValue是seqOp每一個partion的初始值,是一個元組,默認為0。
  • 計算列表中總數:
sc.parallelize( [[123],[4,5,6],[7,8,9]] )\
.aggregate(0, lambda: x,y: sum(y)+x, lambda x,y: x+y)
# [('world', 1), ('hello', 2), ('morning', 1)]

seqOp的輸入值為x,y,其中x為初始值或中間值(如果當前partion有多個列表那就有中間值了,即上一個seqOp返回的值),而y就是第一個輸入,比如:[1,2,3],[4,5,6],[7,8,9]。。。 combOp的輸入值也為x,y,其中y為初始值或中間值(超過2個partion時肯定會產生中間值),x為輸入值。比如1+2+3=6,4+5+6=15,7+8+9=24。那6,15,24都會作為輸入值計算。當然此處的combOp在調用過程中也並非是串行的挨個把6,15,24加起來,中間也會有先匯總再求和的過程。但對用戶來說此處是透明的。 我們看到aggregate中把每一個物理上的輸入行作為一個計算單位輸入並輸出,他比較適合計算總數,行數等類似與列中所蘊含值無關的統計維度。


aggregateByKey

  • 函數原型:aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)
  • 參數與aggregate相同
  • 根據key進行合並

  • 上例稍加改動可以完成一個wordcounts

sc.parallelize(["hello world", "hello morning"])\
.flatMap(lambda line: line.split())\
.map(lambda letter: (letter, 1)).aggregateByKey(0, lambda x,y: y+x, lambda x,y: x+y)\
.collect()
# [(1, 1), (1, 2), (2, 1), (2, 2)]

cartesian

  • 返回兩個rdd的笛卡兒積
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4, 5])
rdd1.catesian(rdd2).cellect()
# [(1, 1), (1, 2), (2, 1), (2, 2)]

glom

  • 將一個一維橫向列表,划分為多個塊
sc.parallelize([1,2,3,4,5], 1).collect()
# [1, 2, 3, 4, 5]
sc.parallelize([1,2,3,4,5], 1).glom().collect()
# [[1, 2, 3, 4, 5]]
sc.parallelize([1,2,3,4,5], 2).glom().collect()
# [[1, 2], [3, 4, 5]]

coalesce

  • 將多個塊組合成n個大的列表
sc.parallelize([1,2,3,4,5], 3).coalesce(2).glom().collect()
# [[1], [2, 3, 4, 5]]
sc.parallelize([1,2,3,4,5], 3).coalesce(2).collect()
# [1, 2, 3, 4, 5]
sc.parallelize([1,2,3,4,5], 3).glom().collect()
# [[1], [2, 3], [4, 5]]

cogroup

  • 函數原型:cogroup(other, numPartitions=None)

  • 按key聚合后,求兩個RDD的並集。

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
# [('a', ([1], [2])), ('b', ([4], []))]

collectAsMap

  • 將rdd數據按KV對形式返回
sc.parallelize([(1,2), (3,4)]).collectAsMap()
# {1: 2, 3: 4}
sc.parallelize([(1, (2, 6666)), (3, 4)]).collectAsMap()
# {1: (2, 6666), 3: 4}

combineByKey

  • 函數原型:combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)

  • 根據key進行


count

  • 返回rdd中元素的數目
sc.parallelize([2,3,4]).count()
# 3

countByKey

  • 按key聚合后計數
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.countByKey().items()
# [('a', 2), ('b', 1)]

countByValue

  • 按value聚合后再計數
sc.parallelize(["hello", "world", "hello", "china", "hello"]).countByValue().items()
# [('world', 1), ('china', 1), ('hello', 3)]

countApprox

  • countApprox(timeout, confidence=0.95) 貌似在公司版本中還未提供 count的一個升級版(實驗中),當超過timeout時,返回一個未完成的結果。
rdd = sc.parallelize(range(1000), 10)
rdd.countApprox(1000, 1.0)
# 1000

distinct

  • distinct(numPartitions=None) 返回rdd中unique的元素
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()
# [1, 2, 3]

filter

  • 過濾一個RDD中,其每一行必須瞞住filter的條件
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x%2==0).collect()
# [2, 4]

first

  • 返回rdd中的第一個元素
sc.parallelize([2, 3, 4]).first()

flatMap

  • flatMap(f, preservesPartitioning=False) 返回rdd中的所有元素,並把flatMap中返回的列表拉平。
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
# [1, 1, 1, 2, 2, 3]

flatMapValues

  • 同flatMap,但按照key進行flat,並最終拉平。
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): return x
x.flatMapValues(f).collect()
# [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

fold

  • fold(zeroValue, op) 聚合RDD的每一個分區,最后再合並計算,每一個函數默認值為"zeroValue"。 op(t1,t2)函數可以更改t1並且將更改后的t1作為返回值返回以減少對象內存占用。切記不可個性t2的值。
def add(x,y): return x+y
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
# 15


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM