1.概述
Hadoop Streaming提供了一個便於進行MapReduce編程的工具包,使用它可以基於一些可執行命令、腳本語言或其他編程語言來實現Mapper和 Reducer,從而充分利用Hadoop並行計算框架的優勢和能力,來處理大數據。需要注意的是,Streaming方式是基於Unix系統的標准輸入 輸出來進行MapReduce Job的運行,它區別與Pipes的地方主要是通信協議,Pipes使用的是Socket通信,是對使用C++語言來實現MapReduce Job並通過Socket通信來與Hadopp平台通信,完成Job的執行。任何支持標准輸入輸出特性的編程語言都可以使用Streaming方式來實現MapReduce Job,基本原理就是輸入從Unix系統標准輸入,輸出使用Unix系統的標准輸出。
2.Hadoop Streaming原理
mapper和reducer會從標准輸入中讀取用戶數據,一行一行處理后發送給標准輸出。Streaming工具會創建MapReduce作業,發送給各個tasktracker,同時監控整個作業的執行過程。
如果一個文件(可執行或者腳本)作為mapper,mapper初始化時,每一個mapper任務會把該文件作為一個單獨進程啟動,mapper任 務運行時,它把輸入切分成行並把每一行提供給可執行文件進程的標准輸入。 同時,mapper收集可執行文件進程標准輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。如果沒有tab,整行作為key值,value值為null。
對於reducer,類似。以上是Map/Reduce框架和streaming mapper/reducer之間的基本通信協議。
3.Hadoop Streaming用法
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar
options:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
(4)-reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
(5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實現)
(8)-D:作業的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數目
2)mapred.reduce.tasks:reduce task數目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數據的分隔符,默認均為\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均為\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數目
另外,Hadoop本身還自帶一些好用的Mapper和Reducer:
4.使用示例
使用Python編寫MapReduce代碼的技巧就在於我們使用了 HadoopStreaming 來幫助我們在Map 和 Reduce間傳遞數據通過STDIN (標准輸入)和STDOUT (標准輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為 HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!
舉例
將下列的代碼保存在/usr/local/hadoop/mapper.py中,他將從STDIN讀取數據並將單詞成行分隔開,生成一個列表映射單詞與發生次數的關系:注意:要確保這個腳本有足夠權限(chmod +x mapper.py)。
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
將代碼存儲在/usr/local/hadoop/reducer.py 中,這個腳本的作用是從mapper.py 的STDIN中讀取結果,然后計算每個單詞出現次數的總和,並輸出結果到STDOUT。同樣,要注意腳本權限:chmod +x reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
測試結果:
hadoop@derekUbun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1 hadoop@derekUbun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py bar 1 foo 3 labs 1 quux 2
實例
需求:這里面只是個小練習,沒有多高深,簡單的不能再簡單,只是一個小實例,做個拋磚的作用。
寫一個mapreduce streaming程序(可使用任意語言,這里我們用python),將數據轉換成“key=value”的格式,其中,key包括“ip”、“time”、“path”三個,
比如,175.44.30.93 - - [29/Sep/2013:00:10:16 +0800] "GET /structure/heap/ HTTP/1.1" 200 22539 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1;)"
轉化為:ip=175.44.30.93|time=29/Sep/2013:00:10:16|path=/structure/heap/ 其中,不同key/value之間用“|”分割。
具體步驟:
1.將日志文件上傳到hdfs上 hadoop fs -put 文件 目的地
2.編程程序,這個比較簡單,我覺得只用mapper就能實現,我就只寫了一個mapper。
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import sys 5 6 for line in sys.stdin: #接受系統的標准輸入 7 line = line.strip() 8 lists = line.split() 9 print 'ip=%s|time=%s|path=%s' %(lists[0],lists[3].strip('[]'),lists[6])#處理成想要的結果
3.測試程序執行命令
hadoop jar /home/biedong/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -mapper /home/biedong/test/mapper1.py -input /home/zuoye/access.log -output /home/zuoye/book-output
執行報錯:提示找不多執行程序, 比如“Caused by: java.io.IOException: Cannot run program “/user/hadoop/Mapper”: error=2, No such file or directory”:
解決辦法:可在提交作業時,采用-file選項指定這些文件, 比如上面例子中,可以使用“-file Mapper -file Reducer” 或者 “-file Mapper.py -file Reducer.py”, 這樣,Hadoop會將這兩個文件自動分發到各個節點上。
hadoop jar /home/biedong/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -mapper /home/biedong/test/mapper1.py -file /home/biedong/test/mapper1.py -input /home/zuoye/access.log -output /home/zuoye/book-output
執行完成后在hdfs上的結果:文件輸出正常,結果也正常1904條。
4.加個reducer吧,這個比較簡單,因為mapper已經處理好了,我直接接受mapper的輸入,完了直接打印出來。
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys for line in sys.stdin: print line
問題是:多出一個空行
原因查找:默認情況下,Streaming使用\t分離記錄中得鍵和值,當沒有\t時,整個記錄被視為鍵,值為空白文本。 在mapper輸出的時候會自動在尾行加上\t 因此在reducer接受后,會把數據直接按照\t拆分成k和v兩個,只是k是mapper的數據行,v是空白,如果咱們直接輸出結果的話,就會有空白行。