使用python+hadoop-streaming編寫hadoop處理程序


Hadoop Streaming提供了一個便於進行MapReduce編程的工具包,使用它可以基於一些可執行命令、腳本語言或其他編程語言來實現Mapper和 Reducer,從而充分利用Hadoop並行計算框架的優勢和能力,來處理大數據

好吧我承認以上這句是抄的以下是原創干貨

首先部署hadoop環境,這點可以參考 http://www.powerxing.com/install-hadoop-in-centos/

好吧原創從下一行開始

部署hadoop完成后,需要下載hadoop-streaming包,這個可以到http://www.java2s.com/Code/JarDownload/hadoop-streaming/hadoop-streaming-0.23.6.jar.zip去下載,或者訪問http://www.java2s.com/Code/JarDownload/hadoop-streaming/選擇最新版本,千萬不要選擇source否則后果自負,選擇編譯好的jar包即可,放到/usr/local/hadoop目錄下備用

接下來是選擇大數據統計的樣本,我在阿里的天池大數據競賽網站下載了母嬰類購買統計數據,記錄了900+個萌萌噠小baby的購買用戶名、出生日期和性別信息,天池的地址https://tianchi.shuju.aliyun.com/datalab/index.htm

數據是一個csv文件,結構如下:

用戶名,出生日期,性別(0女,1男,2不願意透露性別)

比如:415971,20121111,0(數據已經脫敏處理)

下面我們來試着統計每年的男女嬰人數

接下來開始寫mapper程序mapper.py,由於hadoop-streaming是基於Unix Pipe的,數據會從標准輸入sys.stdin輸入,所以輸入就寫sys.stdin

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

for line in sys.stdin:
    line = line.strip()
    data = line.split(',')
    if len(data)<3:
        continue
    user_id = data[0]
    birthyear = data[1][0:4]
    gender = data[2]
    print >>sys.stdout,"%s\t%s"%(birthyear,gender)

一個很簡單的程序,看不懂的請自行提高姿勢水平

下面是reduce程序,這里大家需要注意一下,map到reduce的期間,hadoop會自動給map出的key排序,所以到reduce中是一個已經排序的鍵值對,這簡化了我們的編程工作

我是有洪荒之力的reducer.py,和外面的哪些妖艷賤貨不一樣

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

gender_totle = {'0':0,'1':0,'2':0}
prev_key = False
for line in sys.stdin:#map的時候map中的key會被排序
    line = line.strip()    
    data = line.split('\t')
    birthyear = data[0]
    curr_key = birthyear
    gender = data[1]
    
    #尋找邊界,輸出結果
    if prev_key and curr_key !=prev_key:#不是第一次,並且找到了邊界
        print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])#先輸出上一次統計的結果
        prev_key = curr_key
        gender_totle['0'] = 0
        gender_totle['1'] = 0
        gender_totle['2'] = 0#清零
        gender_totle[gender] +=1#開始計數
    else:
        prev_key = curr_key
        gender_totle[gender] += 1
#輸出最后一行
if prev_key:
    print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])

接下來就是將樣本和mapper reducer上傳到hdfs中並執行了,這也是我踩坑的地方

可以先這樣測試下python腳本是否正確

cat sample.csv | ./mapper.py | sort -t ' ' -k 1 | ./reducer.py 

 

首先要在hdfs中創建相應的目錄,為了方便,我將一部分hadoop命令做了別名

alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh'
alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh'
alias dfs='/usr/local/hadoop/bin/hdfs dfs'
echo "alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh'" >> /etc/profile
echo "alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh'" >> /etc/profile
echo "alias dfs='/usr/local/hadoop/bin/hdfs dfs'" >> /etc/profile

 

啟動hadoop后,先創建一個用戶目錄

dfs -mkdir -p /user/root

將樣本上傳到此目錄中

dfs -put ./sample.csv /user/root

當然也可以這樣處理更加規范,這兩者的差別一會兒會說

dfs -mkdir -p /user/root/input
dfs -put ./sample.csv /user/root/input

接下來將mapper.py和reducer.py上傳到服務器上,切換到上傳以上兩個文件的目錄

然后就可以執行了,執行命令如下

hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar \
-D mapred.job.name="testhadoop" \
-D mapred.job.queue.name=testhadoopqueue \
-D mapred.map.tasks=50 \
-D mapred.min.split.size=1073741824 \
-D mapred.reduce.tasks=10 \
-D stream.num.map.output.key.fields=1 \
-D num.key.fields.for.partition=1 \
-input sample.csv \
-output output-streaming \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

如果是將sample.csv放到input下,這個命令就應該這么寫,不過反正我也沒試過,出錯了不關我的事

hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar \
-D mapred.job.name="testhadoop" \
-D mapred.job.queue.name=testhadoopqueue \
-D mapred.map.tasks=50 \
-D mapred.min.split.size=1073741824 \
-D mapred.reduce.tasks=10 \
-D stream.num.map.output.key.fields=1 \
-D num.key.fields.for.partition=1 \
-input input/sample.csv \
-output output-streaming \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

命令的解釋如下

 

(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
(4)-reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
(5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
         這個一般是必須有的,因為mapper和reducer函數都是寫在本地的文件中,因此需要將文件上傳到集群中才能被執行
(6)-partitioner:用戶自定義的partitioner程序
(7)-D:作業的一些屬性(以前用的是-jonconf),具體有:
              1)mapred.map.tasks:map task數目  
              設置的數目與實際運行的值並不一定相同,若輸入文件含有M個part,而此處設置的map_task數目超過M,那么實際運行map_task仍然是M
              2)mapred.reduce.tasks:reduce task數目  不設置的話,默認值就為1
              3)num.key.fields.for.partition=N:shuffle階段將數據集的前N列作為Key;所以對於wordcount程序,map輸出為“word  1”,shuffle是以word作為Key,因此這里N=1
(8)-D stream.num.map.output.key.fields=1 這個是指在reduce之前將數據按前1列做排序,一般情況下可以去掉

 

接下來就是激動人心的一刻了,要非常用力地跪着按下enter鍵

如果有報錯output-streaming already exists就用命令dfs -rm -R /user/root/output-streaming 然后跳起來按下enter鍵

即使出現奇怪的刷屏也不要驚奇恩媽媽是這么教我的

如果出現以下字樣就是成功了

16/08/18 18:35:20 INFO mapreduce.Job:  map 100% reduce 100%
16/08/18 18:35:20 INFO mapreduce.Job: Job job_local926114196_0001 completed successfully

 之后使用如下命令將結果取回本地,使用cat命令就能查看

dfs -get /user/root/output-streaming/* ./output-streaming
cat ./output-streaming/*

 很慚愧,只做了一點微小的工作


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM