Spark算子篇 --Spark算子之aggregateByKey詳解


一。基本介紹

rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函數是初始值

3代表每次分完組之后的每個組的初始值。

seqFunc代表combine的聚合邏輯

每一個mapTask的結果的聚合成為combine

combFunc reduce端大聚合的邏輯

ps:aggregateByKey默認分組

二。代碼

from pyspark import SparkConf,SparkContext
from __builtin__ import str
conf = SparkConf().setMaster("local").setAppName("AggregateByKey")
sc = SparkContext(conf = conf)

rdd = sc.parallelize([(1,1),(1,2),(2,1),(2,3),(2,4),(1,7)],2)

def f(index,items):
    print "partitionId:%d" %index
    for val in items:
        print val
    return items
    
rdd.mapPartitionsWithIndex(f, False).count()


def seqFunc(a,b):
    print "seqFunc:%s,%s" %(a,b)
    return max(a,b) #取最大值
def combFunc(a,b):
    print "combFunc:%s,%s" %(a ,b)
    return a + b #累加起來
'''
    aggregateByKey這個算子內部肯定有分組
'''
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
rest = aggregateRDD.collectAsMap()
for k,v in rest.items():
    print k,v

sc.stop()

 

三。詳細邏輯

PS:

seqFunc函數 combine篇。

3是每個分組的最大值,所以把3傳進來,在combine函數中也就是seqFunc中第一次調用 3代表a,b即1,max(a,b)即3 第二次再調用則max(3.1)中的最大值3即輸入值,2即b值 所以結果則為(1,3)

底下類似。combine函數調用的次數與分組內的數據個數一致。

 

combFunc函數 reduce聚合

在reduce端大聚合拉完數據后也是先分組,然后再調用combFunc函數

四。結果

持續更新中。。。。,歡迎大家關注我的公眾號LHWorld.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM