寫在前面
相關隨筆:
- Hadoop-1.0.4集群搭建筆記
- 用python + hadoop streaming 編寫分布式程序(一) -- 原理介紹,樣例程序與本地調試
- 用python + hadoop streaming 編寫分布式程序(三) -- 自定義功能
為了方便,這篇文章里的例子均為偽分布式運行,一般來說只要集群配置得當,在偽分布式下能夠運行的程序,在真實集群上也不會有什么問題。
為了更好地模擬集群環境,我們可以在mapred-site.xml中增設reducer和mapper的最大數目(默認為2,實際可用數目大約是CPU核數-1)。
假設你為Hadoop安裝路徑添加的環境變量叫$HADOOP_HOME(如果是$HADOOP_PREFIX,下文看到的命令對應改改就行)
$ vi $HADOOP_HOME/conf/mapred-site.xml
假設這台機子的CPU是4核,那么可以添加下面這幾行
<property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>3</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>3</value> </property>
這里修改了以后只是增加了slot的數量,如果你在執行MR作業時沒有指明需要多少mapper或reducer,不一定會用到這么多,Hadoop會自行分配。
查看文檔
首先需要知道用於streaming的java程序在哪里。在1.0.x的版本中,應該都在$HADOOP_HOME/contrib/streaming/下。比如1.0.4的就在
$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar
里。 首先我們可以先看看這個java程序自帶的文檔。以下以1.0.4版本為例,執行
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info
就會看到一系列自帶的幫助,帶有各種參數的說明和一些使用樣例。
運行命令
用Hadoop Streaming執行python程序的一般步驟是:
-
將輸入文件放到HDFS上,建議使用copyFromLocal而不是put命令,參見Difference between hadoop fs -put and hadoop fs -copyFromLocal
-
一般可以新建一個文件夾用於存放輸入文件,假設叫input
$ hadoop fs -mkdir input
然后用
$ hadoop fs -ls
查看目錄,可以看到出現了一個/user/hadoop/input文件夾。/user/hadoop是默認的用戶文件夾,相當於本地文件系統中的/home/hadoop。
-
再使用
$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(S)> input/
將本地文件放到input文件夾下。copyFromLocal命令的用法類似於Linux的cp命令,支持使用wildcard。如果出現了預期外的執行結果,可以試試看在使用了wildcard的路徑外加上引號,參見官方FAQ
建議閱讀:HDFS命令文檔
-
-
開始MR作業,以1.0.4版本為例,假設你現在正在放有mapper和reducer兩個腳本的目錄下,而且它們剛好就叫mapper.py和reducer.py,在不需要做其他配置的情況下,執行
$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \ -mapper mapper.py -file mapper.py\ -reducer reducer.py -file reducer.py \ -input input/* -output output
第一行是告訴Hadoop運行streaming的java程序,接下來的是參數:
- 這里的mapper.py和reducer.py是mapper所在python程序的路徑。為了讓Hadoop將程序分發給其他機器,需要再加一個-file參數用於指明要分發的程序放在哪里。
-
注意這樣寫的前提是這個python程序里有shebang而且添加了執行權限。如果沒有的話,可以改成
-mapper 'python mapper.py'
加上解釋器命令,用引號括住。因為准確來說,mapper后面跟的其實應該是一個命令而不是一個文件名。
-
假如你執行的程序不放在當前目錄下,比如說在當前目錄的src文件夾下,可以這樣寫
-mapper mapper.py -file src/mapper.py\ -reducer reducer.py -file src/reducer.py \
也就是說,-mapper和-reducer后面跟的文件名不需要帶上路徑,而-file后的參數則需要。注意如果你在mapper后的命令用了引號,加上路徑名反而會報錯說找不到這個程序。
- -input和-output后面跟的是HDFS上的路徑名,同樣支持wildcard,這里的input/*指的就是“input文件夾下的所有文件”。注意-output后面跟着的需要是一個不存在於HDFS上的路徑,在產生輸出的時候hadoop會幫你創建這個文件夾,如果已經存在的話就會產生沖突。
-
有時候shebang不一定能用,尤其是在執行環境比較復雜的時候。最保險的寫法可能是:
$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \ -mapper 'python mapper.py' -file mapper.py\ -reducer 'python reducer.py' -file reducer.py \ -input input/* -output output
這樣寫還有一個好處,就是可以在引號里寫上提供給python程序的命令行參數,以后的教程會提到怎么用。
- 由於mapper和reducer參數跟的實際上是命令,所以如果每台機器上python的環境配置不一樣的話,會用每台機器自己的配置去執行python程序。
運行過程
寫完命令回車,順利的話會開始執行程序。 這里不贅述執行時輸出到終端的內容,可以去這里看看正常運行的時候會給些什么。
利用WebUI監控集群運行情況
一般來說要檢查運行狀況,都是去jobtracker的webUI。如果在master上,用瀏覽器訪問http://localhost:50030即可(如果你在配置hadoop的時候修改了mapred-site.xml的mapred.job.tracker.http.address,請訪問對應的其他地址)
在webUI里你可以看到running jobs, completed jobs和retired jobs。點擊Jobid下的超鏈接,可以看到對應job的執行狀況。進去后如果看到Failed/Killed Task Attempts下非空,你可以點進對應的超鏈接,找到對應的log去進行debug。
得到結果
成功執行完這個任務之后,你用output參數在HDFS上指定的輸出文件夾里就會多出幾個文件
- 一個空白文件_SUCCESS,表明job運行成功,這個文件可以讓其他程序只要查看一下HDFS就能判斷這次job是否成功運行,從而進行相關處理。
- 一個_logs文件夾,顧名思義里面放着任務日志
- part-00000, .... part-xxxxx文件,有多少個reducer后面的數字就會有多大,對應每個reducer的輸出結果。
假如你的輸出很少,比如是一個只有幾行的計數,你可以用
$ hadoop fs -cat <PATH ON HDFS>
直接將輸出打印到終端查看。
假如你的輸出很多,則需要拷貝到本地文件系統來查看。可以使用copyToLocal來獲取整個文件夾(與copyFromLocal一樣,它與get的區別在於會限制目標文件夾一定在本地文件系統上)。如果你不需要_SUCCESS 和_logs,並且想要將所有reducer的輸出合並,可以使用getmerge命令。
比如在上面的例子里,可以用命令
$ hadoop fs -copyToLocal output ./
將output文件夾復制到本地文件系統的當前目錄下,或者用
$ hadoop fs -getmerge output ./
將output下的part-xxxxx合並,放到當前目錄的一個叫output的文件里。
如何串聯多趟MR
如果你有多次任務要執行,下一步需要用上一步的任務做輸入,解決辦法其實很簡單。假設上一步在HDFS的輸出文件夾是output1,那么在下一步的運行命令中,指明
-input output1/part-*
即指定上一次的所有輸出為本次任務的輸入即可。注意這里假設你不需要對上一步的輸出做額外處理。
其他
這篇文章只提到了最簡單的執行Hadoop streaming程序的方法。涉及到一些其他需求,比如需要有多個輸入文件等情況,還需要進一步調整運行命令,會在以后的文章里講到。