Hadoop(三):MapReduce程序(python)


使用python語言進行MapReduce程序開發主要分為兩個步驟,一是編寫程序,二是用Hadoop Streaming命令提交任務。

還是以詞頻統計為例

一、程序開發
1、Mapper

1 for line in sys.stdin:
2     filelds = line.strip.split(' ')
3     for item in fileds:
4         print item+' '+'1'

2、Reducer

 1 import sys
 2 
 3 result={}
 4 for line in  sys.stdin:
 5     kvs = line.strip().split(' ')
 6     k = kvs[0]
 7     v = kvs[1]
 8     if k in result:
 9         result[k]+=1
10     else:
11         result[k] = 1
12 for k,v in result.items():
13     print k+' '+v

....

寫完發現其實只用map就可以處理了...reduce只用cat就好了

3、運行腳本

1)Streaming簡介

  Hadoop的MapReduce和HDFS均采用Java進行實現,默認提供Java編程接口,用戶通過這些編程接口,可以定義map、reduce函數等等。
  但是如果希望使用其他語言編寫map、reduce函數怎么辦呢?
  Hadoop提供了一個框架Streaming,Streaming的原理是用Java實現一個包裝用戶程序的MapReduce程序,該程序負責調用hadoop提供的Java編程接口。

2)運行命令

  /.../bin/hadoop streaming

  -input /..../input

  -output /..../output

  -mapper "mapper.py"

  -reducer "reducer.py"

  -file mapper.py

  -file reducer.py

  -D mapred.job.name ="wordcount"

  -D mapred.reduce.tasks = "1"

3)Streaming常用命令

(1)-input <path>:指定作業輸入,path可以是文件或者目錄,可以使用*通配符,-input選項可以使用多次指定多個文件或目錄作為輸入。

(2)-output <path>:指定作業輸出目錄,path必須不存在,而且執行作業的用戶必須有創建該目錄的權限,-output只能使用一次。

(3)-mapper:指定mapper可執行程序或Java類,必須指定且唯一。

(4)-reducer:指定reducer可執行程序或Java類,必須指定且唯一。

(5)-file, -cacheFile, -cacheArchive:分別用於向計算節點分發本地文件、HDFS文件和HDFS壓縮文件,具體使用方法參考文件分發與打包

(6)numReduceTasks:指定reducer的個數,如果設置-numReduceTasks 0或者-reducer NONE則沒有reducer程序,mapper的輸出直接作為整個作業的輸出。

(7)-jobconf | -D NAME=VALUE:指定作業參數,NAME是參數名,VALUE是參數值,可以指定的參數參考hadoop-default.xml。

   -jobconf mapred.job.name='My Job Name'設置作業名

   -jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW設置作業優先級

   -jobconf mapred.job.map.capacity=M設置同時最多運行M個map任務

   -jobconf mapred.job.reduce.capacity=N設置同時最多運行N個reduce任務

   -jobconf mapred.map.tasks 設置map任務個數

   -jobconf mapred.reduce.tasks 設置reduce任務個數   

   -jobconf mapred.compress.map.output 設置map的輸出是否壓縮

   -jobconf mapred.map.output.compression.codec 設置map的輸出壓縮方式   

   -jobconf mapred.output.compress 設置reduce的輸出是否壓縮

   -jobconf mapred.output.compression.codec 設置reduce的輸出壓縮方式

   -jobconf stream.map.output.field.separator 設置map輸出分隔符

    例子:-D stream.map.output.field.separator=: \  以冒號進行分隔

            -D stream.num.map.output.key.fields=2 \  指定在第二個冒號處進行分隔,也就是第二個冒號之前的作為key,之后的作為value

(8)-combiner:指定combiner Java類,對應的Java類文件打包成jar文件后用-file分發。

(9)-partitioner:指定partitioner Java類,Streaming提供了一些實用的partitioner實現,參考KeyBasedFiledPartitonerIntHashPartitioner

(10)-inputformat, -outputformat:指定inputformat和outputformat Java類,用於讀取輸入數據和寫入輸出數據,分別要實現InputFormat和OutputFormat接口。如果不指定,默認使用TextInputFormat和TextOutputFormat。

(11)cmdenv NAME=VALUE:給mapper和reducer程序傳遞額外的環境變量,NAME是變量名,VALUE是變量值。

(12)-mapdebug, -reducedebug:分別指定mapper和reducer程序失敗時運行的debug程序。

(13)-verbose:指定輸出詳細信息,例如分發哪些文件,實際作業配置參數值等,可以用於調試。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM