[python]使用python實現Hadoop MapReduce程序:計算一組數據的均值和方差


這是參照《機器學習實戰》中第15章“大數據與MapReduce”的內容,因為作者寫作時hadoop版本和現在的版本相差很大,所以在Hadoop上運行python寫的MapReduce程序時出現了很多問題,因此希望能夠分享一些過程中的經驗,但願大家能夠避開同樣的坑。文章內容分為以下幾個部分:(本文的代碼和用到的數據集可以在這里下載)

1.代碼分析

2.運行步驟

3.問題解決

1.代碼分析

問題描述:在一個海量數據上分布式計算均值和方差的MapReduce作業。

設有一組數字,這組數字的均值和方差如下:

每個部分的{count(元素個數)、sum1/count、sum2/count},然后在reduce端將所有map端傳入的sum1加起來在除以總個數n得到均值mean;將所有的sum2加起來除以n再減去均值mean的平方,就得到了方差var.

數據格式如下:一行包含一個數字,保存在inputFile.txt

Map端的代碼如下:(保存在Mapper.py文件中)

#!/usr/bin/env python
#coding=utf-8
import sys
from numpy import mat, mean, power

def read_input(file):
    for line in file:
        yield line.rstrip()#rstrip()去除字符串右邊的空格
        
input = read_input(sys.stdin)#依次讀取每行的數據
input = [float(line) for line in input] #將每行轉換成float型
numInputs = len(input)
input = mat(input)
sqInput = power(input,2)

#輸出數據個數,均值,以及平方和的均值,以'\t'隔開
print "%d\t%f\t%f" % (numInputs, mean(input), mean(sqInput))

這里補充說明一下,在read_input()函數中,為何要使用yield?這里使用的是海量數據集,如果直接對文件對象調用 read() 方法,會導致不可預測的內存占用。好的方法是利用固定長度的緩沖區來不斷讀取文件內容。通過 yield,我們不再需要編寫讀文件的迭代類,就可以輕松實現文件讀取。

下面是Reduce端的代碼(保存在Reducer.py文件中),它接收map端的輸出,並將數據合並成全局的均值,並計算得到方差。

#!/usr/bin/env python
#coding=utf-8

import sys
from numpy import mat, mean, power

def read_input(file):
    for line in file:
        yield line.rstrip()
       
input = read_input(sys.stdin)

#讀取map端的輸出,共有三個字段,按照'\t'分隔開來
mapperOut = [line.split('\t') for line in input]

cumVal=0.0
cumSumSq=0.0
cumN=0.0
for instance in mapperOut:
    nj = float(instance[0])#第一個字段是數據個數
    cumN += nj
    cumVal += nj*float(instance[1])#第二個字段是一個map輸出的均值,均值乘以數據個數就是數據總和
    cumSumSq += nj*float(instance[2])#第三個字段是一個map輸出的平方和的均值,乘以元素個數就是所有元素的平方和
    

mean = cumVal/cumN#得到所有元素的均值
var = (cumSumSq/cumN-mean*mean)#得到所有元素的方差

print "%d\t%f\t%f" % (cumN, mean, var)

2.運行步驟

我使用的環境是:

Centos 64

Python 2.6

Hadoop 2.2.0

 

2.1 本地運行

在運行之前,首先在本地運行一下,看是否能通過。

首先將以上Mapper.py和Reducer.py文件,以及數據文件inputFile.txt放在同一個文件夾中(我這里是桌面的文件夾:/home/hadoop/Desktop/python_doc中),然后輸入命令:chmod +x文件名,修改其權限變成可執行文件。

然后輸入以下命令: 

[hadoop@hadoop1 python_doc]$ python Mapper.py < inputFile.txt | python Reducer.py

出現上述結果表示運行通過。

 

2.2 hadoop上運行

1.啟動HDFS,進入HADOOP_HOME目錄(也就是hadoop的安裝目錄,我的是/app/hadoop/hadoop):

[hadoop@hadoop1 python_doc]$cd $HADOOP_HOME/sbin
[hadoop@hadoop1 sbin]$./start-dfs.sh

2.驗證HDFS是否啟動,在MasterNode上輸入以下命令,將會出現:NameNode、SecondaryNameNode和DataNode

[hadoop@hadoop1 sbin]$./jps

3.在HDFS下創建一個存放“輸入數據“的文件夾

[hadoop@hadoop1 Desktop]$ hadoop fs -mkdir /user/hadoop/mr-input

注意:這里不需要創建“輸出數據“的文件夾,否則會出現以下錯誤:

ERROR security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop1:9000/user/hadoop/mr-ouput15 already exists

4.將數據文件inputFile.txt復制到HDFS:

[hadoop@hadoop1 python_doc]$ hadoop fs -put inputFile.txt  /user/hadoop/mr-input

也可以查看一下,文件是否復制成功:

[hadoop@hadoop1 python_doc]$ hadoop fs –ls  /user/hadoop/mr-input

5.下面重點來了,在命令窗口輸入Hadoop Streaming命令:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-input /user/hadoop/mr-input/* \
-output /user/hadoop/mr-ouput13 \
-file /home/hadoop/Desktop/python_doc/Mapper.py -mapper 'Mapper.py' \
-file /home/hadoop/Desktop/python_doc/Reducer.py -reducer 'Reducer.py'

注意:在hadoop2.2.0的版本中,streaming.jar的目錄發生了改變,保存在:$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar

 

這里再補充一下,Hadoop Streaming的用法:

Hadoop Streaming用法
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar [options]
options:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序
(4)-reducer:用戶自己寫的reducer程序
(5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實現)
(8)-D:作業的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數目
2)mapred.reduce.tasks:reduce task數目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數據的分隔符,默認均為\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均為\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數目

 

6. 運行完成后:

查看在輸出文件夾下的內容:

[hadoop@hadoop1 Desktop]$ hadoop fs -ls /user/hadoop/mr-ouput13
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2016-03-16 16:45 /user/hadoop/mr-ouput13/_SUCCESS
-rw-r--r--   2 hadoop supergroup         22 2016-03-16 16:45 /user/hadoop/mr-ouput13/part-00000

查看結果,結果是輸出文件夾中的part-00000文件(顯示計算結果與本地計算結果是一致的)

[hadoop@hadoop1 Desktop]$ hadoop fs -cat  /user/hadoop/mr-ouput13/part-00000

3.問題解決

1. 出現“Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1”的錯誤

解決方法:

  • 確保Mapper.py Reducer.py這兩個文件的權限是可執行的,如果不是可執行的,使用:chmod +x 文件名,修改其權限為可執行的。
  • 確保安裝numpy包,Centos下的安裝方法是:
  • sudo yum -y install gcc gcc-c++ numpy python-devel scipy

    這個命令會自動把依賴的包都裝好。安裝完成后,測試一下:

  • [hadoop@hadoop1 python_doc]$ python
    Python 2.6.6 (r266:84292, Jul 23 2015, 15:22:56) 
    [GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from numpy import mat

    沒有提示錯誤,說明numpy包已安裝好。

  • 在hadoop上實施MapReduce之前,一定要在本地運行一下你的python程序,看是否能夠跑通。

    首先進入包含map和reduce兩個py腳本文件和數據文件inputFile.txt的文件夾中。然后輸入一下命令,看是否執行通過:

  • [hadoop@hadoop1 python_doc]$ python Mapper.py < inputFile.txt | python Reducer.py

2.出現錯誤:“Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2”,或者出現jar文件找不到的情況,或者出現輸出文件夾已經存在的情況。

  • Mapper.py和Reduce.py的最前面要加上:#!/usr/bin/env python 這條語句
  • 在Hadoop Streaming命令中,請確保按以下的格式來輸入
  • $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-*streaming*.jar \
    -input /user/hadoop/mr-input/* \
    -output /user/hadoop/mr-ouput13 \
    -file /home/hadoop/Desktop/python_doc/Mapper.py -mapper 'Mapper.py' \
    -file /home/hadoop/Desktop/python_doc/Reducer.py -reducer 'Reducer.py'
    1. 要確保jar文件的路徑正確,hadoop 2.2版本的該文件是保存在:$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar,不同的hadoop版本可能略有不同;
    2. 保存數據文件的HDFS文件夾后要加上”/*”,我這里是“/user/hadoop/mr-input”目錄,加上”/*”之后表示該文件夾下所有的文件作為輸入的數據文件;
    3. HDFS中的輸出文件夾(這里是HDFS下的/user/hadoop/mr-ouput13),一定要是一個新的(之前不存在)的文件夾,因為即使上條Hadoop Streaming命令沒有執行成功,仍然會根據你的命令來創建輸出文件夾,而后面再輸入Hadoop Streaming命令如果使用相同的輸出文件夾時,就會出現“輸出文件夾已經存在的錯誤”;
    4. 參數 –file后面是map和reduce的腳本,路徑是詳細的絕對路徑(我這里是/home/hadoop/Desktop/python_doc/Mapper.py),但是在參數 -mapper 和-reducer之后,文件名只需要python腳本的名字即可,而且用引號引起來(比如我這里是:-mapper 'Mapper.py'

Reference:

  1. Peter Harrington,《機器學習實戰》,人民郵電出版社,2013
  2. http://stackoverflow.com/questions/4460522/hadoop-streaming-job-failed-error-in-python (Stackoverflow上關於Hadoop Streaming命令失敗的解答)
  3. http://dongxicheng.org/mapreduce/hadoop-streaming-programming/ (Hadoop Streaming 的參數介紹)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM