1 Hadoop Streaming
概述:提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,
从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据。
一般部署完hadoop之后都会存在hadoop-streaming包,为了以后使用方便我们把它cp到当前目录下
[hadoop@master ~]$ pwd /home/hadoop [hadoop@master ~]$ ls /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar [hadoop@master ~]$ cp /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar ./
2 运行hadoop streaming来求取学生平均值
测试文件如下:
[hadoop@master ~]$ cat chinese.txt Zhao 98 Qian 9 Sun 67 Li 23 [hadoop@master ~]$ cat english.txt Zhao 93 Qian 42 Sun 87 Li 54 [hadoop@master ~]$ cat math.txt Zhao 38 Qian 45 Sun 23 Li 43
2.1 编译测试python程序
[hadoop@master ~]$ cat mapper.py #!/usr/bin/python # -*- coding: utf-8 -*- import sys for line in sys.stdin: line = line.strip() words = line.split() print '%s\t%s' % (words[0],words[1]) [hadoop@master ~]$ cat reducer.py #!/usr/bin/python # -*- coding: utf-8 -*- import sys count=0 i=0 sum=0 for line in sys.stdin: line = line.strip() name,score = line.split('\t', 1) if i==0: current_name=name i=1 try: score = int(score) except ValueError: continue if current_name == name: count += 1 sum +=score else: print '%s\t%s' % (current_name,sum/count) current_name=name sum=score count=1 print '%s\t%s' % (current_name,sum/count)
2.2 测试下python脚本是否正确
[hadoop@master ~]$ cat chinese.txt english.txt | ./mapper.py | sort -t ' ' -k 1 | ./reducer.py Li 38 Qian 25 Sun 77 Zhao 9
2.3 将测试文件上传到HDFS中
[hadoop@master ~]$ ls data/ chinese.txt english.txt math.txt [hadoop@master ~]$ hdfs dfs -put data/* /input/score
2.4 输出测试:
hadoop jar hadoop-streaming-2.7.3.jar -input /input/score/* -output /output/score/ -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
解释:
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
这个一般是必须有的,因为mapper和reducer函数都是写在本地的文件中,因此需要将文件上传到集群中才能被执行