比如我的內存中存在如下的以key-value形式的數據集(RDD):
hello:1 sparkSubmit:1 red:1 sparkSubmit:1 hello:2 hello:1 hello:4 red:1 red:1 red:1 ... ...
reduceByKey的作用對象是(key, value)形式的RDD,而reduce有減少、壓縮之意,reduceByKey的作用就是對相同key的數據進行處理,最終每個key只保留一條記錄。
保留一條記錄通常有兩種結果。一種是只保留我們希望的信息,比如每個key出現的次數。第二種是把value聚合在一起形成列表,這樣后續可以對value做進一步的操作
以上面的數據集為例,在spark中比如是word:RDD[(String, Int)] 兩個字段分別是word、單個單詞在不同文件中出現的次數,現在我們需要統計每個單詞出現的總次數。
我們可以這樣寫:
val word = rdd1.reduceByKey((x,y) => x+y)
對上述的寫法簡化一下:
val word= rdd1.reduceByKey(_+_)
reduceByKey
reduceByKey會尋找相同key的數據,當找到這樣的兩條記錄時會對其value(分別記為x,y)做(x,y) => x+y
的處理,即只保留求和之后的數據作為value。反復執行這個操作直至每個key只留下一條記錄。
如果覺得簡化后的寫法比較難易理解,先看不簡化之前的寫法是這樣理解的,以上面的數據集為例,從左到右第一個hello這個key對應的值是1,這個1就是不簡化寫法之前的x,然后繼續找從左到右第五個又是hello,那么第二個找到的helloKey對應的value是2,這個2就是不簡化寫法之前的y。然后reduceByKey就對當前找到的這兩個相同的key的value做一個加法,然后得到一個新的key-value,這個新的的key-value的key就是hello而value就是相加以后的結果3,然后繼續找第三個key為hello的單詞,找到以后和剛才相加以后得到的新的hello:3繼續相加,此時你可以把之前相加得到的3作為x,把找到的第三個key為hello的單詞對應的value作為y,然后繼續相加,再得到第二個新的key-vlaue,這個第二個新的key-vlaue的key為hello,value為4,然后這個第二個新的vlaue4又作為x,然后在繼續找其他的相同的key,找到后以此類推。
而簡化以后的寫法第一個_下划線就代表x,第二個_下划線代表y。然后原理就是上面寫的