Hadoop:使用原生python編寫MapReduce


功能實現

功能:統計文本文件中所有單詞出現的頻率功能。

下面是要統計的文本文件

【/root/hadooptest/input.txt】

foo foo quux labs foo bar quux abc bar see you by test welcome test
abc labs foo me python hadoop ab ac bc bec python

編寫Map代碼

Map代碼,它會從標准輸入(stdin)讀取數據,默認以空格分割單詞,然后按行輸出單詞機器出現頻率到標准輸出(stdout),不過整個Map處理過程並不會統計每個單詞出現的總次數,而是直接輸出“word,1”,以便作為Reduce的輸入進行統計,要求mapper.py具備執行權限。

【/root/hadooptest/mapper.py】

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys

#輸入為標准輸入stdin
for line in sys.stdin:
#刪除開頭和結尾的空行
line = line.strip()
#以默認空格分隔單詞到words列表
words = line.split()
for word in words:
#輸出所有單詞,格式為“單詞,1”以便作為Reduce的輸入
print '%s\t%s' % (word,1)0

編寫Reduce代碼

Reduce代碼,它會從標准輸入(stdin)讀取mapper.py的結果,然后統計每個單詞出現的總次數並輸出到標准輸出(stdout),要求reducer.py同樣具備可執行 權限。

【/root/hadooptest/reducer.py】

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

#獲取標准輸入,即mapper.py的標准輸出
for line in sys.stdin:
#刪除開頭和結尾的空行
line = line.strip()

#解析mapper.py輸出作為程序的輸入,以tab作為分隔符
word,count = line.split('\t',1)

#轉換count從字符型到整型
try:
count = int(count)
except ValueError:
#count非數字時,忽略此行
continue

#要求mapper.py的輸出做排序(sort)操作,以便對連續的word做判斷
if current_word == word:
current_count += count
else:
if current_word:
#輸出當前word統計結果到標准輸出
print '%s\t%s' % (current_word,current_count)
current_count = count
current_word = word

#輸出最后一個word統計
if current_word == word:
print '%s\t%s' % (current_word,current_count)

 

測試代碼

在Hadoop平台運行前進行本地測試

[root@wx ~]# cd /root/hadooptest/
[root@wx hadooptest]# cat input.txt | ./mapper.py 
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
abc 1
bar 1
see 1
you 1
by 1
test 1
welcome 1
test 1
abc 1
labs 1
foo 1
me 1
python 1
hadoop 1
ab 1
ac 1
bc 1
bec 1
python 1


[root@wx hadooptest]# cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py 
ab 1
abc 2
ac 1
bar 2
bc 1
bec 1
by 1
foo 4
hadoop 1
labs 2
me 1
python 2
quux 2
see 1
test 2
welcome 1
you 1

 

Hadoop平台運行

在HDFS上創建文本文件存儲目錄,本示例中為/user/root/word

/usr/local/hadoop-2.6.4/bin/hadoop fs -mkdir -p /user/root/word

將輸入文件上傳到HDFS,本例中是/root/hadooptest/input.txt

/usr/local/hadoop-2.6.4/bin/hadoop fs -put /root/hadooptest/input.txt /user/root/word

查看/user/root/word目錄下的文件

/usr/local/hadoop-2.6.4/bin/hadoop fs -ls /user/root/word
#結果:
Found 1 items
-rw-r--r-- 2 root supergroup 118 2016-03-22 13:36 /user/root/word/input.txt

執行MapReduce任務,輸出結果文件制定為/output/word

/usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar -files 'mapper.py,reducer.py' -input /user/root/word -output /output/word -mapper ./mapper.py -reducer ./reducer.py

參數說明:

/usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
-input <輸入目錄> \ # 可以指定多個輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
-inputformat <輸入格式 JavaClassName> \
-output <輸出目錄> \
-outputformat <輸出格式 JavaClassName> \
-mapper <mapper executable or JavaClassName> \
-reducer <reducer executable or JavaClassName> \
-combiner <combiner executable or JavaClassName> \
-partitioner <JavaClassName> \
-cmdenv <name=value> \ # 可以傳遞環境變量,可以當作參數傳入到任務中,可以配置多個
-file <依賴的文件> \ # 配置文件,字典等依賴
-D <name=value> \ # 作業的屬性配置

 

查看生成的分析結果文件清單,其中/output/word/part-00000為分析結果文件

[root@wx hadooptest]# /usr/local/hadoop-2.6.4/bin/hadoop fs -ls /output/word
Found 2 items
-rw-r--r-- 2 root supergroup 0 2016-03-22 13:47 /output/word/_SUCCESS
-rw-r--r-- 2 root supergroup 110 2016-03-22 13:47 /output/word/part-00000

查看結果數據

[root@wx hadooptest]# /usr/local/hadoop-2.6.4/bin/hadoop fs -cat /output/word/part-00000
ab 1
abc 2
ac 1
bar 2
bc 1
bec 1
by 1
foo 4
hadoop 1
labs 2
me 1
python 2
quux 2
see 1
test 2
welcome 1
you 1

 


參考資料:

根據劉天斯《Python自動化運維技術與最佳實踐》整理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM