本文介紹幾種MapReduce算法設計的技巧,全部內容翻譯自《Data-Intensive Text Processing with MapReduce》。
Local Aggregation
說到Local Aggregation,你可能會想不就是Combiner嗎。實際上在mapper中進行combining比使用真正的combiner高效得多。首先combiner只是作為MapReduce的可選優化方案(就像inline對於C++編譯器是一種可選優化方案一樣),不一定會被執行。其次在mapper中進行combining可以減少很多的I/O操作,提高效率,畢竟mapper的每個結果都個結果都要被寫入磁盤,我們當然希望寫入磁盤的數據越少越好。
回想一下WordCount的例子,在mapper中Emit(word,1),在reducer中對相同的鍵值Emit(word,sum),而combiner采用和reducer一樣的例程。
現在我們要在mapper中就把combiner的事情給做了。
class Mapper method setup H := new AssociativeArray method map(docid a,doc d) foreach term in doc H{term} := H{term}+1 method cleanup foreach term in H Emit(term,value of H{term})
但是上述方法在可擴展性上會遇到瓶頸:我們必須在內存中維持一個關聯數組H,當mapper接收到的數據分片很大,里面有很多word時,H就會很大,甚至超出內存。解決辦法有兩種:
- 設定一個值K,mapper每讀入K條鍵值對,就把H中的內容全部Emit,然后把H清空接着讀入鍵值對。類似於先讀滿一個buffer,再把它flush掉,然后buffer重復利用。
- 在mapper中監控內存的使用率,當達到一定閾值時,就把H的內容flush。
使用Local Aggregation設計算法的正確性
有一個計算平均值的例子。輸入文件中的每一行記錄着(string,count)表示一個單詞出現的次數,在所有的記錄中相同的string會出現多次,現在我們要計算每一個string的平均count。
求和是一種“分配型”的聚合,即
但求平均不是分配型的,即
一種直接的想法是:
class Mapper method map(string word,integer count) Emit(word,count) class Reducer method reduce(string word,integer-list [c1,c2,...]) sum := 0 cnt := 0 foreach integer c in [c1,c2,...] sum := sum+c cnt := cnt+1 avg := sum/cnt Emit(word,avg)
現在我們想使用combiner進行local aggregation。在上面的代碼中,很容易地可以將reducer的部分工作放到combiner中來完成。
class Mapper method map(string word,integer count) Emit(word,count) class Combiner method reduce(string word,integer-list [c1,c2,...]) sum := 0 cnt := 0 foreach integer c in [c1,c2,...] sum := sum+c cnt := cnt+1 Emit(word,pair(sum,cnt)) class Reducer method reduce(string word,pairs[(s1,c1),(s2,c2),...]) sum := 0 cnt := 0 foreach pair(s,c) in pairs[(s1,c1),(s2,c2),...] sum := sum+s cnt := cnt+c avg := sum/cnt Emit(word,avg)
很遺憾上面的代碼是錯誤的,不符合MapReduce的規范,由於Combiner只是可選的優化方案,所以MapReduce要求在沒有Combiner的情況下程序仍能正確的運行。上述代碼mapper的輸出鍵值對和reducer的輸入鍵值對類型不匹配,導致程序無法運行。還好,我們只需要把mapper稍作修改就可以了。
class Mapper method map(string word,integer count) Emit(word,pair(count,1))
進一步,現在我們想把local aggregation從combiner中轉移到mapper中,與wordcount例子中使用的技巧一樣,使用關聯數組!
class Mapper method setup S := new AssociativeArray C := new AssociativeArray method map(string word,integer count) S{word} := S{word}+count C{word} := C{word}+1 method cleanup foreach word in S Emit(word,pair(S{word},C{word}))
Pairs and Stripes
mapper和reducer之間能夠交換的數據僅僅是一個鍵值對,很多時候我們為了使這一個鍵值對中包含更多的信息,就把鍵或值設計得足夠復雜,比如讓它是一個pair或AssociativeArray。在上文中我們已經看到這種應用實例了。
在N-Gram模型中,我們要計算一個方陣A,其規模是N×N,N是語料庫中單詞的個數。A[i][j]表示單詞j在單詞i的領域中出現的次數。
Pairs方式:
class Mapper method map(docid a,doc d) foreach term w in doc foreach term u in Neighbour(w) Emit(pair(w,u),1) class Reducer method reduce(pair p,counts [c1,c2,...]) s := 0 foreach count c in [c1,c2,...] s := s+c Emit(p,s)
Stripes方式:
class Mapper method map(docid a,doc d) foreach term w in doc H := new AssociativeArray foreach term u in Neighbour(w) H{u} := H{u}+1 Emit(w,H) class Reducer method reduce(term w,stripes [H1,H2,...]) F := new AssociativeArray foreach stripe H in [H1,H2,...] sum(F,H) Emit(w,F)
上述pairs和stripe方式都可以很輕松的寫一個combiner來提高效率,並且在stripes方式中combiner發揮的作用會相對大一些。
注意在Stripes方式中,由於要為doc中的每一個term建立一個AssociativeArray,維持這么多的關聯數組可能會導致內存溢出。解決方案在本文的第一小節已經提到了。
stripe方式要比pairs方式快得多,因為pairs方式Emit操作(即寫磁盤操作)太頻繁了。
Computing Relative Frequencies
還說N-Gram這個問題,剛才我們只是計算了單詞u在單詞w的領域內出現的次數,但是個次數多並不能說明u和w的關系就強烈,因為可能是w太平凡了,導致w出現的次數很多,進而導致w和u相伴出現的次數也很多。另外有一對單詞p和q,它們相伴出現的次數比較少並不能說明它們的關系不強烈,可能是因為在語料庫p總共出現的次數也不多。所以我們應該計算“相對頻率”才是合理的:
實際上就是對已有矩陣的每一行求和,再讓該行的每個元素除以這個和。
使用上面的Stripes方式代碼可以很容易地擴展為計算相對頻率的代碼,因為在redcuer中Emit的每一個鍵值對剛好對應矩陣的一行數據,計算矩陣的行和非常方便。
現在我們要在pairs方式中計算相對頻率該怎么做呢?因為mapper輸入的鍵值對是(pair(w,u),1),要計算行和就必須使所相同的w都到一個reducer中,但默認情況下Hadoop只能保證相同的pair(w,u)映射到同一個reducer當中--對於復雜的鍵類型,MapReduce中要把它轉換成bytes再進行Hash。因此我們需要重寫Partitioner,僅對pair的左值(即w)進行散列,這就可以保證相同的w都映射到了同一個reducer。但是注意,映射到同一個reducer的pair的左值可能不只一種。MapReduce保證映射到同一個reducer的鍵值對都是按鍵排序好的,為了使在同一個reducer中,相同的左值是連續在一起的,我們還需要自定義的pair的比較方式:對pair進行比較時,只比較左值。
class Pair method compareTo(Pair obj) rect := this.left.compareTo(obj.left) if(rect == 0) if(this.right == "*") return -1 else if(obj.right == "*") return 1 return rect class Mapper method map(docid a,doc d) foreach term w in doc foreach term u in Neighbour(w) Emit(Pair(w,u),1) Emit(Pair(w,*),1) class Partitioner method getPartition(Pair key) return key.getLeft().hashCode() class Reducer method setup prev := NULL rowsum := 0 method reduce(pair p,counts [c1,c2,...]) foreach count c in [c1,c2,...] s := s+c if(p.left != prev) assert(p.right == "*") prev := p.left rowsum := s else Emit(p,s/rowsum)
上面的代碼中,我們使mapper多輸出了一項Emit(Pair(w,*),1),它的作用是使在reducer中不需要遍歷所有具有相同左值的pair(w,u)就能提前計算出行和--僅依靠pair(w,*)就可計算出行和,這樣在第一次遇到pair(w,u)時就可以計算出它的相對頻率。為了使reducer最先遇到pair(w,*),我們定義在對pair排序時,在左值相同的情況下,右值為“*"者較小。
在一個reducer中出現的鍵值對的順序是這樣子的:
同樣,仍然可以使用combiner進行優化,把local aggregate放在mapper中更好。
說到這里順便扯一句,一般程序員在計算樣本方差時會采用這個公式:
這個公式要求2趟掃描樣本,第1趟掃描是為了計算出X的平均值,第2趟掃描是按照公式來計算。而統計學專業的學生知道計算方差還有另外一個公式:
只需要1趟掃描就可以把ΣXi2和ΣXi都算出來,直接代入公式就可算出方差。
Secondary Sorting
Hadoop只能保證同一個reducer中的健值對是按鍵排好序的,在Goole在MapReduce中提供一個可選的函數--在按鍵排序之后按value排序。考慮這么一種情形:mapper輸出的鍵值對形如
key value
t1 (m1,r1988)
t1 (m2,r1987)
t2 (m1,r1990)
key已經排好序了,t1<t2。我們希望key相同的時候按value中的第2列排序,即希望的結果為
key value
t1 (m2,r1987)
t1 (m1,r1988)
t2 (m1,r1990)
Hadoop是不對value進行排序的,所以我們的解決方法是:把需要排序的valueu部分放到key里面去,即
key value
(t1,r1987) m2
(t1,r1988) m1
(t2,r1990) m1
當然這個時候需要自定義partitioner僅對pair的左值進行散列,且pair的比較規則是先比較左值,左值相等時再比較右值。