Python初次實現MapReduce——WordCount


前言

Hadoop 本身是用 Java 開發的,所以之前的MapReduce代碼小練都是由Java代碼編寫,但是通過Hadoop Streaming,我們可以使用任意語言來編寫程序,讓Hadoop 運行。

本文用Python語言實現了詞頻統計功能,最后通過Hadoop Streaming使其運行在Hadoop上。

 

Python寫MapReduce代碼

使用Python寫MapReduce的“訣竅”是利用Hadoop流的API,通過STDIN(標准輸入)、STDOUT(標准輸出)在Map函數和Reduce函數之間傳遞數據。

我們唯一需要做的是利用Python的sys.stdin讀取輸入數據,並把我們的輸出傳送給sys.stdout。Hadoop流將會幫助我們處理別的任何事情。

 

Map階段:mapper.py

1 #!/usr/bin/env python3
2 import sys
3 for line in sys.stdin:
4     line = line.strip()
5     words = line.split()
6     for word in words:
7         print("%s\t%s" % (word, 1))

 

Reducer階段:reducer.py

 1 #!/usr/bin/env python3
 2 from operator import itemgetter
 3 import sys
 4 
 5 current_word = None
 6 current_count = 0
 7 word = None
 8 
 9 for line in sys.stdin:
10     line = line.strip()
11     word, count = line.split('\t', 1)
12     try:
13         count = int(count)
14     except ValueError:  #count如果不是數字的話,直接忽略掉
15         continue
16     if current_word == word:
17         current_count += count
18     else:
19         if current_word:
20             print("%s\t%s" % (current_word, current_count))
21         current_count = count
22         current_word = word
23 
24 if word == current_word:  #最后一個單詞
25     print("%s\t%s" % (current_word, current_count))

python代碼放在本地即可,不需上傳到HDFS。由於后面需要執行這兩段代碼,所以為它們增加可執行權限,即:

chmod +x mapper.py
chmod +x reducer.py

 

本地測試

用Hadoop Streaming的好處之一就是因為代碼沒有庫的依賴,調試方便,可以脫離Hadoop先在本地用管道模擬調試,所以我們先在本地進行測試。

mapper.py

reducer.py

 

Hadoop運行

數據准備

測試文件in.txt文件內容為:

需要將其上傳至HDFS,上傳命令為:

bin/hadoop -copyFromLocal in.txt in.txt

 

Hadoop Streaming簡介

Hadoop Streaming框架,最大的好處是,讓任何語言編寫的map, reduce程序能夠在hadoop集群上運行,map/reduce程序只要遵循從標准輸入stdin讀,寫出到標准輸出stdout即可。

它通過將其他語言編寫的 mapper 和 reducer 通過參數傳給一個事先寫好的 Java 程序(Hadoop 自帶的 *-streaming.jar),這個 Java 程序會負責創建 MR 作業,另開一個進程來運行 mapper,將得到的輸入通過 stdin 傳給它,再將 mapper 處理后輸出到 stdout 的數據交給 Hadoop,經過 partition 和 sort 之后,再另開進程運行 reducer,同樣通過 stdin/stdout 得到最終結果。因此,我們只需要在其他語言編寫的程序中,通過 stdin 接收數據,再將處理過的數據輸出到 stdout,Hadoop Streaming 就能通過這個 Java 的 wrapper 幫我們解決中間繁瑣的步驟,運行分布式程序。

優點:

1. 可以使用自己喜歡的語言來編寫 MapReduce 程序(不必非得使用 Java)

2. 不需要像寫 Java 的 MR 程序那樣 import 一大堆庫,在代碼里做很多配置,很多東西都抽象到了 stdio 上,代碼量顯著減少。

3. 因為沒有庫的依賴,調試方便,並且可以脫離 Hadoop 先在本地用管道模擬調試。

缺點:

1. 只能通過命令行參數來控制 MapReduce 框架,不像 Java 的程序那樣可以在代碼里使用 API,控制力比較弱。

2. 因為中間隔着一層處理,效率會比較慢。

3. 所以 Hadoop Streaming 比較適合做一些簡單的任務,比如用 Python 寫只有一兩百行的腳本。如果項目比較復雜,或者需要進行比較細致的優化,使用 Streaming 就容易出現一些束手束腳的地方。

 

Hadoop Streaming運行

首先需要找到hadoop-streaming的位置,我的hadoop是2.x版本的,該包的位置在:

 

在執行的過程中遇到了權限不夠的問題:

解決辦法是擴大權限:

為了方便起見,接下來我就把hadoop-streaming-2.9.2.jar放在了/usr/local/hadoop目錄下,所以在下面的命令中大家注意一下。

最后輸入如下命令:

bin/hadoop jar /usr/local/hadoop/hadoop-streaming-2.9.2.jar\
-mapper /usr/local/hadoop/mapper.py\
-file /usr/local/hadoop/mapper.py\
-reducer /usr/local/hadoop/reducer.py\
-file /usr/local/hadoop/reducer.py\
-input in.txt\
-output out

第一行是告訴Hadoop運行Streaming的Jav 程序,后面的mapper.py 和 reducer.py 是 mapper 所對應 Python 程序的路徑。為了讓Hadoop 將程序分發給其他機器,需要再加一個 -file 參數用於指明要分發的程序放在哪里。

 

Python代碼優化

 使用 Python 編寫 Hadoop Streaming 時,在能使用 iterator 的情況下,盡量使用 iterator,避免將 stdin 的輸入大量儲存在內存里,否則會嚴重降低性能。

 

 

參考:

[1] 用python寫MapReduce函數——以WordCount為例

[2] 使用Python實現Hadoop MapReduce程序

[3] Hadoop Streaming詳解

[4] Hadoop Streaming 使用及參數設置

[5] 使用hadoop-streaming初體驗mapreduce

[6] 使用python+hadoop-streaming編寫hadoop處理程序


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM