這是參照《機器學習實戰》中第15章“大數據與MapReduce”的內容,因為作者寫作時hadoop版本和現在的版本相差很大,所以在Hadoop上運行python寫的MapReduce程序時出現了很多問題,因此希望能夠分享一些過程中的經驗,但願大家能夠避開同樣的坑。文章內容分為以下幾個部分:(本文的代碼和用到的數據集可以在這里下載)
1.代碼分析
2.運行步驟
3.問題解決
1.代碼分析
問題描述:在一個海量數據上分布式計算均值和方差的MapReduce作業。
設有一組數字,這組數字的均值和方差如下:

每個部分的{count(元素個數)、sum1/count、sum2/count},然后在reduce端將所有map端傳入的sum1加起來在除以總個數n得到均值mean;將所有的sum2加起來除以n再減去均值mean的平方,就得到了方差var.
數據格式如下:一行包含一個數字,保存在inputFile.txt中

Map端的代碼如下:(保存在Mapper.py文件中)
#!/usr/bin/env python #coding=utf-8 import sys from numpy import mat, mean, power def read_input(file): for line in file: yield line.rstrip()#rstrip()去除字符串右邊的空格 input = read_input(sys.stdin)#依次讀取每行的數據 input = [float(line) for line in input] #將每行轉換成float型 numInputs = len(input) input = mat(input) sqInput = power(input,2) #輸出數據個數,均值,以及平方和的均值,以'\t'隔開 print "%d\t%f\t%f" % (numInputs, mean(input), mean(sqInput))
這里補充說明一下,在read_input()函數中,為何要使用yield?這里使用的是海量數據集,如果直接對文件對象調用 read() 方法,會導致不可預測的內存占用。好的方法是利用固定長度的緩沖區來不斷讀取文件內容。通過 yield,我們不再需要編寫讀文件的迭代類,就可以輕松實現文件讀取。
下面是Reduce端的代碼(保存在Reducer.py文件中),它接收map端的輸出,並將數據合並成全局的均值,並計算得到方差。
#!/usr/bin/env python #coding=utf-8 import sys from numpy import mat, mean, power def read_input(file): for line in file: yield line.rstrip() input = read_input(sys.stdin) #讀取map端的輸出,共有三個字段,按照'\t'分隔開來 mapperOut = [line.split('\t') for line in input] cumVal=0.0 cumSumSq=0.0 cumN=0.0 for instance in mapperOut: nj = float(instance[0])#第一個字段是數據個數 cumN += nj cumVal += nj*float(instance[1])#第二個字段是一個map輸出的均值,均值乘以數據個數就是數據總和 cumSumSq += nj*float(instance[2])#第三個字段是一個map輸出的平方和的均值,乘以元素個數就是所有元素的平方和 mean = cumVal/cumN#得到所有元素的均值 var = (cumSumSq/cumN-mean*mean)#得到所有元素的方差 print "%d\t%f\t%f" % (cumN, mean, var)
2.運行步驟
我使用的環境是:
|   Centos 64 Python 2.6 Hadoop 2.2.0  |  
          
2.1 本地運行
在運行之前,首先在本地運行一下,看是否能通過。
首先將以上Mapper.py和Reducer.py文件,以及數據文件inputFile.txt放在同一個文件夾中(我這里是桌面的文件夾:/home/hadoop/Desktop/python_doc中),然后輸入命令:chmod +x文件名,修改其權限變成可執行文件。
然后輸入以下命令:
[hadoop@hadoop1 python_doc]$ python Mapper.py < inputFile.txt | python Reducer.py 
        
出現上述結果表示運行通過。
2.2 hadoop上運行
1.啟動HDFS,進入HADOOP_HOME目錄(也就是hadoop的安裝目錄,我的是/app/hadoop/hadoop):
[hadoop@hadoop1 python_doc]$cd $HADOOP_HOME/sbin [hadoop@hadoop1 sbin]$./start-dfs.sh
2.驗證HDFS是否啟動,在MasterNode上輸入以下命令,將會出現:NameNode、SecondaryNameNode和DataNode
[hadoop@hadoop1 sbin]$./jps 
        
3.在HDFS下創建一個存放“輸入數據“的文件夾
[hadoop@hadoop1 Desktop]$ hadoop fs -mkdir /user/hadoop/mr-input 
        注意:這里不需要創建“輸出數據“的文件夾,否則會出現以下錯誤:
ERROR security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop1:9000/user/hadoop/mr-ouput15 already exists 
        4.將數據文件inputFile.txt復制到HDFS:
[hadoop@hadoop1 python_doc]$ hadoop fs -put inputFile.txt  /user/hadoop/mr-input 
        也可以查看一下,文件是否復制成功:
[hadoop@hadoop1 python_doc]$ hadoop fs –ls  /user/hadoop/mr-input 
        5.下面重點來了,在命令窗口輸入Hadoop Streaming命令:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-input /user/hadoop/mr-input/* \
-output /user/hadoop/mr-ouput13 \
-file /home/hadoop/Desktop/python_doc/Mapper.py -mapper 'Mapper.py' \
-file /home/hadoop/Desktop/python_doc/Reducer.py -reducer 'Reducer.py' 
        注意:在hadoop2.2.0的版本中,streaming.jar的目錄發生了改變,保存在:$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar
這里再補充一下,Hadoop Streaming的用法:
Hadoop Streaming用法 Usage: $HADOOP_HOME/bin/hadoop jar \ $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar [options] options: (1)-input:輸入文件路徑 (2)-output:輸出文件路徑 (3)-mapper:用戶自己寫的mapper程序 (4)-reducer:用戶自己寫的reducer程序 (5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件。 (6)-partitioner:用戶自定義的partitioner程序 (7)-combiner:用戶自定義的combiner程序(必須用java實現) (8)-D:作業的一些屬性(以前用的是-jonconf),具體有: 1)mapred.map.tasks:map task數目 2)mapred.reduce.tasks:reduce task數目 3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數據的分隔符,默認均為\t。 4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數目 5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均為\t。 6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數目
6. 運行完成后:
查看在輸出文件夾下的內容:
[hadoop@hadoop1 Desktop]$ hadoop fs -ls /user/hadoop/mr-ouput13 Found 2 items -rw-r--r-- 2 hadoop supergroup 0 2016-03-16 16:45 /user/hadoop/mr-ouput13/_SUCCESS -rw-r--r-- 2 hadoop supergroup 22 2016-03-16 16:45 /user/hadoop/mr-ouput13/part-00000
查看結果,結果是輸出文件夾中的part-00000文件(顯示計算結果與本地計算結果是一致的)
[hadoop@hadoop1 Desktop]$ hadoop fs -cat  /user/hadoop/mr-ouput13/part-00000 
        3.問題解決
1. 出現“Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1”的錯誤
解決方法:
- 確保Mapper.py Reducer.py這兩個文件的權限是可執行的,如果不是可執行的,使用:chmod +x 文件名,修改其權限為可執行的。
 - 確保安裝numpy包,Centos下的安裝方法是:
 -  
          
sudo yum -y install gcc gcc-c++ numpy python-devel scipy這個命令會自動把依賴的包都裝好。安裝完成后,測試一下:
 -  
          
[hadoop@hadoop1 python_doc]$ python Python 2.6.6 (r266:84292, Jul 23 2015, 15:22:56) [GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from numpy import mat
沒有提示錯誤,說明numpy包已安裝好。
 
-  
在hadoop上實施MapReduce之前,一定要在本地運行一下你的python程序,看是否能夠跑通。
首先進入包含map和reduce兩個py腳本文件和數據文件inputFile.txt的文件夾中。然后輸入一下命令,看是否執行通過:
 -  
          
[hadoop@hadoop1 python_doc]$ python Mapper.py < inputFile.txt | python Reducer.py
 
2.出現錯誤:“Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2”,或者出現jar文件找不到的情況,或者出現輸出文件夾已經存在的情況。
- Mapper.py和Reduce.py的最前面要加上:#!/usr/bin/env python 這條語句
 - 在Hadoop Streaming命令中,請確保按以下的格式來輸入
 -  
          
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-*streaming*.jar \ -input /user/hadoop/mr-input/* \ -output /user/hadoop/mr-ouput13 \ -file /home/hadoop/Desktop/python_doc/Mapper.py -mapper 'Mapper.py' \ -file /home/hadoop/Desktop/python_doc/Reducer.py -reducer 'Reducer.py'
- 要確保jar文件的路徑正確,hadoop 2.2版本的該文件是保存在:$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar中,不同的hadoop版本可能略有不同;
 - 保存數據文件的HDFS文件夾后要加上”/*”,我這里是“/user/hadoop/mr-input”目錄,加上”/*”之后表示該文件夾下所有的文件作為輸入的數據文件;
 - HDFS中的輸出文件夾(這里是HDFS下的/user/hadoop/mr-ouput13),一定要是一個新的(之前不存在)的文件夾,因為即使上條Hadoop Streaming命令沒有執行成功,仍然會根據你的命令來創建輸出文件夾,而后面再輸入Hadoop Streaming命令如果使用相同的輸出文件夾時,就會出現“輸出文件夾已經存在的錯誤”;
 - 參數 –file后面是map和reduce的腳本,路徑是詳細的絕對路徑(我這里是/home/hadoop/Desktop/python_doc/Mapper.py),但是在參數 -mapper 和-reducer之后,文件名只需要python腳本的名字即可,而且用引號引起來(比如我這里是:-mapper 'Mapper.py')
 
 
Reference:
- Peter Harrington,《機器學習實戰》,人民郵電出版社,2013
 - http://stackoverflow.com/questions/4460522/hadoop-streaming-job-failed-error-in-python (Stackoverflow上關於Hadoop Streaming命令失敗的解答)
 - http://dongxicheng.org/mapreduce/hadoop-streaming-programming/ (Hadoop Streaming 的參數介紹)
 
