Hadoop Streaming提供了一個便於進行MapReduce編程的工具包,使用它可以基於一些可執行命令、腳本語言或其他編程語言來實現Mapper和 Reducer,從而充分利用Hadoop並行計算框架的優勢和能力,來處理大數據
好吧我承認以上這句是抄的以下是原創干貨
首先部署hadoop環境,這點可以參考 http://www.powerxing.com/install-hadoop-in-centos/
好吧原創從下一行開始
部署hadoop完成后,需要下載hadoop-streaming包,這個可以到http://www.java2s.com/Code/JarDownload/hadoop-streaming/hadoop-streaming-0.23.6.jar.zip去下載,或者訪問http://www.java2s.com/Code/JarDownload/hadoop-streaming/選擇最新版本,千萬不要選擇source否則后果自負,選擇編譯好的jar包即可,放到/usr/local/hadoop目錄下備用
接下來是選擇大數據統計的樣本,我在阿里的天池大數據競賽網站下載了母嬰類購買統計數據,記錄了900+個萌萌噠小baby的購買用戶名、出生日期和性別信息,天池的地址https://tianchi.shuju.aliyun.com/datalab/index.htm
數據是一個csv文件,結構如下:
用戶名,出生日期,性別(0女,1男,2不願意透露性別)
比如:415971,20121111,0(數據已經脫敏處理)
下面我們來試着統計每年的男女嬰人數
接下來開始寫mapper程序mapper.py,由於hadoop-streaming是基於Unix Pipe的,數據會從標准輸入sys.stdin輸入,所以輸入就寫sys.stdin
#!/usr/bin/python # -*- coding: utf-8 -*- import sys for line in sys.stdin: line = line.strip() data = line.split(',') if len(data)<3: continue user_id = data[0] birthyear = data[1][0:4] gender = data[2] print >>sys.stdout,"%s\t%s"%(birthyear,gender)
一個很簡單的程序,看不懂的請自行提高姿勢水平
下面是reduce程序,這里大家需要注意一下,map到reduce的期間,hadoop會自動給map出的key排序,所以到reduce中是一個已經排序的鍵值對,這簡化了我們的編程工作
我是有洪荒之力的reducer.py,和外面的哪些妖艷賤貨不一樣
#!/usr/bin/python # -*- coding: utf-8 -*- import sys gender_totle = {'0':0,'1':0,'2':0} prev_key = False for line in sys.stdin:#map的時候map中的key會被排序 line = line.strip() data = line.split('\t') birthyear = data[0] curr_key = birthyear gender = data[1] #尋找邊界,輸出結果 if prev_key and curr_key !=prev_key:#不是第一次,並且找到了邊界 print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])#先輸出上一次統計的結果 prev_key = curr_key gender_totle['0'] = 0 gender_totle['1'] = 0 gender_totle['2'] = 0#清零 gender_totle[gender] +=1#開始計數 else: prev_key = curr_key gender_totle[gender] += 1 #輸出最后一行 if prev_key: print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])
接下來就是將樣本和mapper reducer上傳到hdfs中並執行了,這也是我踩坑的地方
可以先這樣測試下python腳本是否正確
cat sample.csv | ./mapper.py | sort -t ' ' -k 1 | ./reducer.py
首先要在hdfs中創建相應的目錄,為了方便,我將一部分hadoop命令做了別名
alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh' alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh' alias dfs='/usr/local/hadoop/bin/hdfs dfs' echo "alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh'" >> /etc/profile echo "alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh'" >> /etc/profile echo "alias dfs='/usr/local/hadoop/bin/hdfs dfs'" >> /etc/profile
啟動hadoop后,先創建一個用戶目錄
dfs -mkdir -p /user/root
將樣本上傳到此目錄中
dfs -put ./sample.csv /user/root
當然也可以這樣處理更加規范,這兩者的差別一會兒會說
dfs -mkdir -p /user/root/input dfs -put ./sample.csv /user/root/input
接下來將mapper.py和reducer.py上傳到服務器上,切換到上傳以上兩個文件的目錄
然后就可以執行了,執行命令如下
hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar \ -D mapred.job.name="testhadoop" \ -D mapred.job.queue.name=testhadoopqueue \ -D mapred.map.tasks=50 \ -D mapred.min.split.size=1073741824 \ -D mapred.reduce.tasks=10 \ -D stream.num.map.output.key.fields=1 \ -D num.key.fields.for.partition=1 \ -input sample.csv \ -output output-streaming \ -mapper mapper.py \ -reducer reducer.py \ -file mapper.py \ -file reducer.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
如果是將sample.csv放到input下,這個命令就應該這么寫,不過反正我也沒試過,出錯了不關我的事
hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar \ -D mapred.job.name="testhadoop" \ -D mapred.job.queue.name=testhadoopqueue \ -D mapred.map.tasks=50 \ -D mapred.min.split.size=1073741824 \ -D mapred.reduce.tasks=10 \ -D stream.num.map.output.key.fields=1 \ -D num.key.fields.for.partition=1 \ -input input/sample.csv \ -output output-streaming \ -mapper mapper.py \ -reducer reducer.py \ -file mapper.py \ -file reducer.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
命令的解釋如下
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
(4)-reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
(5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
這個一般是必須有的,因為mapper和reducer函數都是寫在本地的文件中,因此需要將文件上傳到集群中才能被執行
(6)-partitioner:用戶自定義的partitioner程序
(7)-D:作業的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數目
設置的數目與實際運行的值並不一定相同,若輸入文件含有M個part,而此處設置的map_task數目超過M,那么實際運行map_task仍然是M
2)mapred.reduce.tasks:reduce task數目 不設置的話,默認值就為1
3)num.key.fields.for.partition=N:shuffle階段將數據集的前N列作為Key;所以對於wordcount程序,map輸出為“word 1”,shuffle是以word作為Key,因此這里N=1
(8)-D stream.num.map.output.key.fields=1 這個是指在reduce之前將數據按前1列做排序,一般情況下可以去掉
接下來就是激動人心的一刻了,要非常用力地跪着按下enter鍵
如果有報錯output-streaming already exists就用命令dfs -rm -R /user/root/output-streaming 然后跳起來按下enter鍵
即使出現奇怪的刷屏也不要驚奇恩媽媽是這么教我的
如果出現以下字樣就是成功了
16/08/18 18:35:20 INFO mapreduce.Job: map 100% reduce 100% 16/08/18 18:35:20 INFO mapreduce.Job: Job job_local926114196_0001 completed successfully
之后使用如下命令將結果取回本地,使用cat命令就能查看
dfs -get /user/root/output-streaming/* ./output-streaming cat ./output-streaming/*
很慚愧,只做了一點微小的工作