Spark算子篇 --Spark算子之combineByKey詳解


一。概念

rdd.combineByKey(lambda x:"%d_" %x, lambda a,b:"%s@%s" %(a,b), lambda a,b:"%s$%s" %(a,b))
三個參數(都是函數)
第一個參數:給定一個初始值,用函數生成初始值。
第二個參數:combinbe聚合邏輯。
第三個參數:reduce端聚合邏輯。

二。代碼

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf().setMaster("local").setAppName("CombineByKey")
sc = SparkContext(conf = conf)
rdd = sc.parallelize([("A",1),("B",2),("B",3),("B",4),("B",5),("C",1),("A",2)], 2)
def f(index,items):
    print "partitionId:%d" %index
    for val in items:
        print val
    return items
rdd.mapPartitionsWithIndex(f).count()

combinerRDD = rdd.combineByKey(lambda x:"%d_" %x, lambda a,b:"%s@%s" %(a,b), lambda a,b:"%s$%s" %(a,b))
combinerRDD.foreach(p)
groupByKeyRDD.foreach(p)

sc.stop()

三。解釋

第一個函數作用於每一個組的第一個元素上,將其變為初始值

第二個函數:一開始a是初始值,b是分組內的元素值,比如A[1_],因為沒有b值所以不能調用combine函數,第二組因為函數內元素值是[2_,3]調用combine函數后為2_@3,以此類推

第三個函數:reduce端大聚合,把相同的key的數據拉取到一個節點上,然后分組。

 

四。結果

 五。拓展

1.用combinebykey實現groupbykey的邏輯

1.1 combinebykey的三個參數

第一個應該返回一個列表,初始值

第二個函數中的a依賴於第一個函數的返回值

第三個函數的a,b依賴於第二個函數的返回值

1.2 解釋:

 

1.3 代碼:

def mergeValue(list1,b):
    list1.append(b)
    return list1
   
def mergeCombiners(list1,list2):
    list1.extend(list2)
    return list1
   
groupByKeyRDD = rdd.combineByKey(lambda a:[a],mergeValue,mergeCombiners)

 

1.4結果

 

2.使用combineBykey把相同的key和對應的邏輯相加起來

代碼:

reduceByKeyRDD = rdd.combineByKey(lambda a:a,lambda a,b:a+b,lambda a,b:a+b)

結果:

 

持續更新中。。。。,歡迎大家關注我的公眾號LHWorld.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM