用python + hadoop streaming 編寫分布式程序(二) -- 在集群上運行與監控


寫在前面

相關隨筆:

 

為了方便,這篇文章里的例子均為偽分布式運行,一般來說只要集群配置得當,在偽分布式下能夠運行的程序,在真實集群上也不會有什么問題。

為了更好地模擬集群環境,我們可以在mapred-site.xml中增設reducer和mapper的最大數目(默認為2,實際可用數目大約是CPU核數-1)。

假設你為Hadoop安裝路徑添加的環境變量叫$HADOOP_HOME(如果是$HADOOP_PREFIX,下文看到的命令對應改改就行)

$ vi $HADOOP_HOME/conf/mapred-site.xml

假設這台機子的CPU是4核,那么可以添加下面這幾行

<property>
    <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>3</value>
</property>
<property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>3</value>
</property>

這里修改了以后只是增加了slot的數量,如果你在執行MR作業時沒有指明需要多少mapper或reducer,不一定會用到這么多,Hadoop會自行分配。

查看文檔

首先需要知道用於streaming的java程序在哪里。在1.0.x的版本中,應該都在$HADOOP_HOME/contrib/streaming/下。比如1.0.4的就在

$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar

里。 首先我們可以先看看這個java程序自帶的文檔。以下以1.0.4版本為例,執行

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info

就會看到一系列自帶的幫助,帶有各種參數的說明和一些使用樣例。

運行命令

用Hadoop Streaming執行python程序的一般步驟是:

  1. 將輸入文件放到HDFS上,建議使用copyFromLocal而不是put命令,參見Difference between hadoop fs -put and hadoop fs -copyFromLocal

    1. 一般可以新建一個文件夾用於存放輸入文件,假設叫input

      $ hadoop fs -mkdir input
      

      然后用

      $ hadoop fs -ls
      

      查看目錄,可以看到出現了一個/user/hadoop/input文件夾。/user/hadoop是默認的用戶文件夾,相當於本地文件系統中的/home/hadoop。

    2. 再使用

      $ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(S)> input/
      

      將本地文件放到input文件夾下。copyFromLocal命令的用法類似於Linux的cp命令,支持使用wildcard。如果出現了預期外的執行結果,可以試試看在使用了wildcard的路徑外加上引號,參見官方FAQ

    建議閱讀:HDFS命令文檔

  2. 開始MR作業,以1.0.4版本為例,假設你現在正在放有mapper和reducer兩個腳本的目錄下,而且它們剛好就叫mapper.py和reducer.py,在不需要做其他配置的情況下,執行

    $hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
    -mapper mapper.py   -file mapper.py\
    -reducer reducer.py  -file reducer.py \
    -input input/* -output output
    

    第一行是告訴Hadoop運行streaming的java程序,接下來的是參數:

    • 這里的mapper.py和reducer.py是mapper所在python程序的路徑。為了讓Hadoop將程序分發給其他機器,需要再加一個-file參數用於指明要分發的程序放在哪里。
    • 注意這樣寫的前提是這個python程序里有shebang而且添加了執行權限。如果沒有的話,可以改成

      -mapper 'python mapper.py'
      

      加上解釋器命令,用引號括住。因為准確來說,mapper后面跟的其實應該是一個命令而不是一個文件名。

    • 假如你執行的程序不放在當前目錄下,比如說在當前目錄的src文件夾下,可以這樣寫

      -mapper mapper.py   -file src/mapper.py\
      -reducer reducer.py  -file src/reducer.py \
      

      也就是說,-mapper和-reducer后面跟的文件名不需要帶上路徑,而-file后的參數則需要。注意如果你在mapper后的命令用了引號,加上路徑名反而會報錯說找不到這個程序。

    • -input和-output后面跟的是HDFS上的路徑名,同樣支持wildcard,這里的input/*指的就是“input文件夾下的所有文件”。注意-output后面跟着的需要是一個不存在於HDFS上的路徑,在產生輸出的時候hadoop會幫你創建這個文件夾,如果已經存在的話就會產生沖突。
    • 有時候shebang不一定能用,尤其是在執行環境比較復雜的時候。最保險的寫法可能是:

      $hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
      -mapper 'python mapper.py'   -file mapper.py\
      -reducer 'python reducer.py'  -file reducer.py \
      -input input/* -output output
      

      這樣寫還有一個好處,就是可以在引號里寫上提供給python程序的命令行參數,以后的教程會提到怎么用。

    • 由於mapper和reducer參數跟的實際上是命令,所以如果每台機器上python的環境配置不一樣的話,會用每台機器自己的配置去執行python程序。

運行過程

寫完命令回車,順利的話會開始執行程序。 這里不贅述執行時輸出到終端的內容,可以去這里看看正常運行的時候會給些什么。

利用WebUI監控集群運行情況

一般來說要檢查運行狀況,都是去jobtracker的webUI。如果在master上,用瀏覽器訪問http://localhost:50030即可(如果你在配置hadoop的時候修改了mapred-site.xml的mapred.job.tracker.http.address,請訪問對應的其他地址)

在webUI里你可以看到running jobs, completed jobs和retired jobs。點擊Jobid下的超鏈接,可以看到對應job的執行狀況。進去后如果看到Failed/Killed Task Attempts下非空,你可以點進對應的超鏈接,找到對應的log去進行debug。

得到結果

成功執行完這個任務之后,你用output參數在HDFS上指定的輸出文件夾里就會多出幾個文件

  • 一個空白文件_SUCCESS,表明job運行成功,這個文件可以讓其他程序只要查看一下HDFS就能判斷這次job是否成功運行,從而進行相關處理。
  • 一個_logs文件夾,顧名思義里面放着任務日志
  • part-00000, .... part-xxxxx文件,有多少個reducer后面的數字就會有多大,對應每個reducer的輸出結果。

假如你的輸出很少,比如是一個只有幾行的計數,你可以用

$ hadoop fs -cat <PATH ON HDFS>

直接將輸出打印到終端查看。

假如你的輸出很多,則需要拷貝到本地文件系統來查看。可以使用copyToLocal來獲取整個文件夾(與copyFromLocal一樣,它與get的區別在於會限制目標文件夾一定在本地文件系統上)。如果你不需要_SUCCESS 和_logs,並且想要將所有reducer的輸出合並,可以使用getmerge命令。

比如在上面的例子里,可以用命令

$ hadoop fs -copyToLocal output ./

將output文件夾復制到本地文件系統的當前目錄下,或者用

$ hadoop fs -getmerge output ./

將output下的part-xxxxx合並,放到當前目錄的一個叫output的文件里。

如何串聯多趟MR

如果你有多次任務要執行,下一步需要用上一步的任務做輸入,解決辦法其實很簡單。假設上一步在HDFS的輸出文件夾是output1,那么在下一步的運行命令中,指明

-input output1/part-*

即指定上一次的所有輸出為本次任務的輸入即可。注意這里假設你不需要對上一步的輸出做額外處理。

其他

這篇文章只提到了最簡單的執行Hadoop streaming程序的方法。涉及到一些其他需求,比如需要有多個輸入文件等情況,還需要進一步調整運行命令,會在以后的文章里講到。

推薦閱讀


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM