前言
Hadoop 本身是用 Java 開發的,所以之前的MapReduce代碼小練都是由Java代碼編寫,但是通過Hadoop Streaming,我們可以使用任意語言來編寫程序,讓Hadoop 運行。
本文用Python語言實現了詞頻統計功能,最后通過Hadoop Streaming使其運行在Hadoop上。
Python寫MapReduce代碼
使用Python寫MapReduce的“訣竅”是利用Hadoop流的API,通過STDIN(標准輸入)、STDOUT(標准輸出)在Map函數和Reduce函數之間傳遞數據。
我們唯一需要做的是利用Python的sys.stdin讀取輸入數據,並把我們的輸出傳送給sys.stdout。Hadoop流將會幫助我們處理別的任何事情。
Map階段:mapper.py
1 #!/usr/bin/env python3 2 import sys 3 for line in sys.stdin: 4 line = line.strip() 5 words = line.split() 6 for word in words: 7 print("%s\t%s" % (word, 1))
Reducer階段:reducer.py
1 #!/usr/bin/env python3 2 from operator import itemgetter 3 import sys 4 5 current_word = None 6 current_count = 0 7 word = None 8 9 for line in sys.stdin: 10 line = line.strip() 11 word, count = line.split('\t', 1) 12 try: 13 count = int(count) 14 except ValueError: #count如果不是數字的話,直接忽略掉 15 continue 16 if current_word == word: 17 current_count += count 18 else: 19 if current_word: 20 print("%s\t%s" % (current_word, current_count)) 21 current_count = count 22 current_word = word 23 24 if word == current_word: #最后一個單詞 25 print("%s\t%s" % (current_word, current_count))
python代碼放在本地即可,不需上傳到HDFS。由於后面需要執行這兩段代碼,所以為它們增加可執行權限,即:
chmod +x mapper.py
chmod +x reducer.py
本地測試
用Hadoop Streaming的好處之一就是因為代碼沒有庫的依賴,調試方便,可以脫離Hadoop先在本地用管道模擬調試,所以我們先在本地進行測試。
mapper.py
reducer.py
Hadoop運行
數據准備
測試文件in.txt文件內容為:
需要將其上傳至HDFS,上傳命令為:
bin/hadoop -copyFromLocal in.txt in.txt
Hadoop Streaming簡介
Hadoop Streaming框架,最大的好處是,讓任何語言編寫的map, reduce程序能夠在hadoop集群上運行,map/reduce程序只要遵循從標准輸入stdin讀,寫出到標准輸出stdout即可。
它通過將其他語言編寫的 mapper 和 reducer 通過參數傳給一個事先寫好的 Java 程序(Hadoop 自帶的 *-streaming.jar),這個 Java 程序會負責創建 MR 作業,另開一個進程來運行 mapper,將得到的輸入通過 stdin 傳給它,再將 mapper 處理后輸出到 stdout 的數據交給 Hadoop,經過 partition 和 sort 之后,再另開進程運行 reducer,同樣通過 stdin/stdout 得到最終結果。因此,我們只需要在其他語言編寫的程序中,通過 stdin 接收數據,再將處理過的數據輸出到 stdout,Hadoop Streaming 就能通過這個 Java 的 wrapper 幫我們解決中間繁瑣的步驟,運行分布式程序。
優點:
1. 可以使用自己喜歡的語言來編寫 MapReduce 程序(不必非得使用 Java)
2. 不需要像寫 Java 的 MR 程序那樣 import 一大堆庫,在代碼里做很多配置,很多東西都抽象到了 stdio 上,代碼量顯著減少。
3. 因為沒有庫的依賴,調試方便,並且可以脫離 Hadoop 先在本地用管道模擬調試。
缺點:
1. 只能通過命令行參數來控制 MapReduce 框架,不像 Java 的程序那樣可以在代碼里使用 API,控制力比較弱。
2. 因為中間隔着一層處理,效率會比較慢。
3. 所以 Hadoop Streaming 比較適合做一些簡單的任務,比如用 Python 寫只有一兩百行的腳本。如果項目比較復雜,或者需要進行比較細致的優化,使用 Streaming 就容易出現一些束手束腳的地方。
Hadoop Streaming運行
首先需要找到hadoop-streaming的位置,我的hadoop是2.x版本的,該包的位置在:
在執行的過程中遇到了權限不夠的問題:
解決辦法是擴大權限:
為了方便起見,接下來我就把hadoop-streaming-2.9.2.jar放在了/usr/local/hadoop目錄下,所以在下面的命令中大家注意一下。
最后輸入如下命令:
bin/hadoop jar /usr/local/hadoop/hadoop-streaming-2.9.2.jar\ -mapper /usr/local/hadoop/mapper.py\ -file /usr/local/hadoop/mapper.py\ -reducer /usr/local/hadoop/reducer.py\ -file /usr/local/hadoop/reducer.py\ -input in.txt\ -output out
第一行是告訴Hadoop運行Streaming的Jav 程序,后面的mapper.py 和 reducer.py 是 mapper 所對應 Python 程序的路徑。為了讓Hadoop 將程序分發給其他機器,需要再加一個 -file 參數用於指明要分發的程序放在哪里。
Python代碼優化
使用 Python 編寫 Hadoop Streaming 時,在能使用 iterator 的情況下,盡量使用 iterator,避免將 stdin 的輸入大量儲存在內存里,否則會嚴重降低性能。
參考:
[1] 用python寫MapReduce函數——以WordCount為例
[2] 使用Python實現Hadoop MapReduce程序