功能實現
功能:統計文本文件中所有單詞出現的頻率功能。
下面是要統計的文本文件
【/root/hadooptest/input.txt】
foo foo quux labs foo bar quux abc bar see you by test welcome test
abc labs foo me python hadoop ab ac bc bec python
編寫Map代碼
Map代碼,它會從標准輸入(stdin)讀取數據,默認以空格分割單詞,然后按行輸出單詞機器出現頻率到標准輸出(stdout),不過整個Map處理過程並不會統計每個單詞出現的總次數,而是直接輸出“word,1”,以便作為Reduce的輸入進行統計,要求mapper.py具備執行權限。
【/root/hadooptest/mapper.py】
#!/usr/bin/env python # -*- coding:utf-8 -*- import sys #輸入為標准輸入stdin for line in sys.stdin: #刪除開頭和結尾的空行 line = line.strip() #以默認空格分隔單詞到words列表 words = line.split() for word in words: #輸出所有單詞,格式為“單詞,1”以便作為Reduce的輸入 print '%s\t%s' % (word,1)0
編寫Reduce代碼
Reduce代碼,它會從標准輸入(stdin)讀取mapper.py的結果,然后統計每個單詞出現的總次數並輸出到標准輸出(stdout),要求reducer.py同樣具備可執行 權限。
【/root/hadooptest/reducer.py】
#!/usr/bin/env python # -*- coding:utf-8 -*- from operator import itemgetter import sys current_word = None current_count = 0 word = None #獲取標准輸入,即mapper.py的標准輸出 for line in sys.stdin: #刪除開頭和結尾的空行 line = line.strip() #解析mapper.py輸出作為程序的輸入,以tab作為分隔符 word,count = line.split('\t',1) #轉換count從字符型到整型 try: count = int(count) except ValueError: #count非數字時,忽略此行 continue #要求mapper.py的輸出做排序(sort)操作,以便對連續的word做判斷 if current_word == word: current_count += count else: if current_word: #輸出當前word統計結果到標准輸出 print '%s\t%s' % (current_word,current_count) current_count = count current_word = word #輸出最后一個word統計 if current_word == word: print '%s\t%s' % (current_word,current_count)
測試代碼
在Hadoop平台運行前進行本地測試
[root@wx ~]# cd /root/hadooptest/ [root@wx hadooptest]# cat input.txt | ./mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1 abc 1 bar 1 see 1 you 1 by 1 test 1 welcome 1 test 1 abc 1 labs 1 foo 1 me 1 python 1 hadoop 1 ab 1 ac 1 bc 1 bec 1 python 1 [root@wx hadooptest]# cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py ab 1 abc 2 ac 1 bar 2 bc 1 bec 1 by 1 foo 4 hadoop 1 labs 2 me 1 python 2 quux 2 see 1 test 2 welcome 1 you 1
Hadoop平台運行
在HDFS上創建文本文件存儲目錄,本示例中為/user/root/word
/usr/local/hadoop-2.6.4/bin/hadoop fs -mkdir -p /user/root/word
將輸入文件上傳到HDFS,本例中是/root/hadooptest/input.txt
/usr/local/hadoop-2.6.4/bin/hadoop fs -put /root/hadooptest/input.txt /user/root/word
查看/user/root/word目錄下的文件
/usr/local/hadoop-2.6.4/bin/hadoop fs -ls /user/root/word #結果: Found 1 items -rw-r--r-- 2 root supergroup 118 2016-03-22 13:36 /user/root/word/input.txt
執行MapReduce任務,輸出結果文件制定為/output/word
/usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar -files 'mapper.py,reducer.py' -input /user/root/word -output /output/word -mapper ./mapper.py -reducer ./reducer.py
參數說明:
/usr/local/hadoop-2.6.4/bin/hadoop jar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \ -input <輸入目錄> \ # 可以指定多個輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2' -inputformat <輸入格式 JavaClassName> \ -output <輸出目錄> \ -outputformat <輸出格式 JavaClassName> \ -mapper <mapper executable or JavaClassName> \ -reducer <reducer executable or JavaClassName> \ -combiner <combiner executable or JavaClassName> \ -partitioner <JavaClassName> \ -cmdenv <name=value> \ # 可以傳遞環境變量,可以當作參數傳入到任務中,可以配置多個 -file <依賴的文件> \ # 配置文件,字典等依賴 -D <name=value> \ # 作業的屬性配置
查看生成的分析結果文件清單,其中/output/word/part-00000為分析結果文件
[root@wx hadooptest]# /usr/local/hadoop-2.6.4/bin/hadoop fs -ls /output/word Found 2 items -rw-r--r-- 2 root supergroup 0 2016-03-22 13:47 /output/word/_SUCCESS -rw-r--r-- 2 root supergroup 110 2016-03-22 13:47 /output/word/part-00000
查看結果數據
[root@wx hadooptest]# /usr/local/hadoop-2.6.4/bin/hadoop fs -cat /output/word/part-00000 ab 1 abc 2 ac 1 bar 2 bc 1 bec 1 by 1 foo 4 hadoop 1 labs 2 me 1 python 2 quux 2 see 1 test 2 welcome 1 you 1
參考資料:
根據劉天斯《Python自動化運維技術與最佳實踐》整理
