1, 安裝mrjob
pip install mrjob
pip的安裝看上一篇文章。
2,代碼測試
mrjob安裝完之后,就可以直接用了。如果hadoop已經配置好,不需要額外的配置東西(HADOOP_HOME這個環境變量要配置好),基於mrjob的程序就可以直接在hadoop平台上運行了。
mrjob提供了幾種代碼運行的方式,1)本地測試,就是直接在本地運行代碼 2)在本地模擬hadoop的運行 3)在hadoop集群上運行 等等。下面先看一下本地運行的情況。
來自官網的一段代碼:
from mrjob.job import MRJob class MRWordCounter(MRJob): def mapper(self, key, line): for word in line.split(): yield word, 1 def reducer(self, word, occurrences): yield word, sum(occurrences) if __name__ == '__main__': MRWordCounter.run()
本地運行:python MRWrodCounter.py -r inline <input> output
這會把結果輸出到Output里面。
發現還有一種用法:python MRWrodCounter.py -r inline input1 這樣可以直接打印到屏幕,此時可以設置多個輸入如:python MRWordCounter.py -r inline input1 input2 input3。
應用python MRWordCounter.py -r inline input1 input2 input3 > out 命令可以將處理多個文件的結果輸出到out里面。
本地模擬hadoop運行:python MRWordCounter -r local <input> output
這個會把結果輸出到output里面,這個output必須寫。
hadoop集群上運行:python MRWordCounter -r hadoop <input> output
3,mrjob的用法
mrjob的用法在它的官方文檔里面寫的很全面,這里寫到的知識其中最基本的部分。
首先,分析上面的代碼。
一個map-reduce任務最簡單的寫法就是覆蓋MRJob的mapper,combiner,reducer函數。默認的配置下,輸入到mapper的key是個None. mapper產生(word,1),這個具體是通過JSON在各個任務中傳輸。所以,你的python要支持JSON。再看,combiner和reducer,這個兩個的key沒問題,需要注意的是value部分,根據官方文檔上描述,這個value是個iterator of the numbers,所以sum這個函數用在這里是很合理的。最終reduce的輸出還是key,value對,中間用tab鍵分隔。
mrjob還提供了定義多個步驟的功能,覆蓋steps()函數即可完成,下面代碼展示了這一過程:
from mrjob.job import MRJob class MRDoubleWordFreqCount(MRJob): """Word frequency count job with an extra step to double all the values""" def get_words(self, _, line): for word in line.split(): yield word.lower(), 1 def sum_words(self, word, counts): yield word, sum(counts) def double_counts(self, word, counts): yield word, counts * 2 def steps(self): return [self.mr(mapper=self.get_words, combiner=self.sum_words, reducer=self.sum_words), self.mr(mapper=self.double_counts)] if __name__=='__main__': MRDoubleWordFreqCount.run()
這個mrjob用起來很方便的,可以很容易的測試代碼,也可以很快捷的進行開發。