1 Hadoop Streaming
概述:提供了一個便於進行MapReduce編程的工具包,使用它可以基於一些可執行命令、腳本語言或其他編程語言來實現Mapper和 Reducer,
從而充分利用Hadoop並行計算框架的優勢和能力,來處理大數據。
一般部署完hadoop之后都會存在hadoop-streaming包,為了以后使用方便我們把它cp到當前目錄下
[hadoop@master ~]$ pwd /home/hadoop [hadoop@master ~]$ ls /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar [hadoop@master ~]$ cp /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar ./
2 運行hadoop streaming來求取學生平均值
測試文件如下:
[hadoop@master ~]$ cat chinese.txt Zhao 98 Qian 9 Sun 67 Li 23 [hadoop@master ~]$ cat english.txt Zhao 93 Qian 42 Sun 87 Li 54 [hadoop@master ~]$ cat math.txt Zhao 38 Qian 45 Sun 23 Li 43
2.1 編譯測試python程序
[hadoop@master ~]$ cat mapper.py #!/usr/bin/python # -*- coding: utf-8 -*- import sys for line in sys.stdin: line = line.strip() words = line.split() print '%s\t%s' % (words[0],words[1]) [hadoop@master ~]$ cat reducer.py #!/usr/bin/python # -*- coding: utf-8 -*- import sys count=0 i=0 sum=0 for line in sys.stdin: line = line.strip() name,score = line.split('\t', 1) if i==0: current_name=name i=1 try: score = int(score) except ValueError: continue if current_name == name: count += 1 sum +=score else: print '%s\t%s' % (current_name,sum/count) current_name=name sum=score count=1 print '%s\t%s' % (current_name,sum/count)
2.2 測試下python腳本是否正確
[hadoop@master ~]$ cat chinese.txt english.txt | ./mapper.py | sort -t ' ' -k 1 | ./reducer.py Li 38 Qian 25 Sun 77 Zhao 9
2.3 將測試文件上傳到HDFS中
[hadoop@master ~]$ ls data/ chinese.txt english.txt math.txt [hadoop@master ~]$ hdfs dfs -put data/* /input/score
2.4 輸出測試:
hadoop jar hadoop-streaming-2.7.3.jar -input /input/score/* -output /output/score/ -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
解釋:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
(4)-reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
(5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
這個一般是必須有的,因為mapper和reducer函數都是寫在本地的文件中,因此需要將文件上傳到集群中才能被執行
