Hadoop Streaming 使用及參數設置


1. MapReduce 與 HDFS 簡介

  什么是 Hadoop ?

  Google 為自己的業務需要提出了編程模型 MapReduce 和分布式文件系統 Google File System,並發布了相關論文(可在 Google Research 的網站上獲得:GFS、MapReduce)。Doug Cutting 和 Mike Cafarella 在開發搜索引擎 Nutch 時對這兩篇論文進行了自己的實現,即同名的 MapReduce 和 HDFS,合起來就是 Hadoop。

  MapReduce 的 Data Flow 如下圖所示,原始數據經過 mapper 處理,再進行 partition 和 sort,到達 reducer,輸出最后結果。

2. Hadoop Streaming 原理

  Hadoop 本身是用 Java 開發的,程序也需要用 Java 編寫,但是通過 Hadoop Streaming,我們可以使用任意語言來編寫程序,讓 Hadoop 運行。

  Hadoop Streaming 就是通過將其他語言編寫的 mapper 和 reducer 通過參數傳給一個事先寫好的 Java 程序(Hadoop 自帶的 *-streaming.jar),這個 Java 程序會負責創建 MR 作業,另開一個進程來運行 mapper,將得到的輸入通過 stdin 傳給它,再將 mapper 處理后輸出到 stdout 的數據交給 Hadoop,經過 partition 和 sort 之后,再另開進程運行 reducer,同樣通過 stdin/stdout 得到最終結果。因此,我們只需要在其他語言編寫的程序中,通過 stdin 接收數據,再將處理過的數據輸出到 stdout,Hadoop Streaming 就能通過這個 Java 的 wrapper 幫我們解決中間繁瑣的步驟,運行分布式程序。

  原理上只要是能夠處理 stdio 的語言都能用來寫 mapper 和 reducer,也可以指定 mapper 或 reducer 為 Linux 下的程序(如 awk、grep、cat)或者按照一定格式寫好的 java class。因此,mapper 和 reducer 也不必是同一類的程序。

  1. Hadoop Streaming 的優缺點

    優點:

      1. 可以使用自己喜歡的語言來編寫 MapReduce 程序(不必非得使用 Java)

      2. 不需要像寫 Java 的 MR 程序那樣 import 一大堆褲,在代碼里做很多配置,很多東西都抽象到了 stdio 上,代碼量顯著減少。

      3. 因為沒有庫的依賴,調試方便,並且可以脫離 Hadoop 先在本地用管道模擬調試。

    缺點:

      1. 只能通過命令行參數來控制 MapReduce 框架,不像 Java 的程序那樣可以在代碼里使用 API,控制力比較弱。

      2. 因為中間隔着一層處理,效率會比較慢。

      3. 所以 Hadoop Streaming 比較適合做一些簡單的任務,比如用 Python 寫只有一兩百行的腳本。如果項目比較復雜,或者需要進行比較細致的優化,使用 Streaming 就容易出現一些束手束腳的地方。

  2. 用 Python 編寫簡單的 Hadoop Streaming 程序

    使用 Python 編寫 Hadoop Streaming 程序有幾點需要注意:

      1. 在能使用 iterator 的情況下,盡量使用 iterator,避免將 stdin 的輸入大量儲存在內存里,否則會嚴重降低性能。

      2. Streaming 不會幫你分割 key 和 value 傳進來,傳進來的只是一個個字符串而已,需要你自己在代碼里手動調用 split()。

      3. 從 stdin 得到的每一行數據末尾似乎會有 '\n' ,保險起見一般都需要用 rstrip() 來去掉。

      4. 在想獲得 key-value list 而不是一個個處理 key-value pair 時,可以使用 groupby 配合 itemgetter 將 key 相同的 key-value pair 組成一個個 group,得到類似 Java 編寫的 reduce 可以直接獲取一個 Text 類型的 key 和一個 iterable 作為 value 的效果。注意 itemgetter 的效率比 lambda 表達式的效率要高,所以用 itemgetter 比較好。

    編寫 Hadoop Streaming 程序的基本模版:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Some description here...
"""
import sys
from operator import itemgetter
from itertools import groupby

def read_input(file):
"""Read input and split."""
    for line in file:
    yield line.rstrip().split('\t')

def main():
    data = read_input(sys.stdin)
    for key, kviter in groupby(data, itemgetter(0)):
        # some code here..

if __name__ == "__main__":
    main()    

  如果對輸入輸出格式有不同於默認的控制,主要會在 read_input() 里調整。

  3. 本地調試

    本地調試用於 Hadoop Streaming 的 Python 程序的基本模式是:

$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

    這里有幾點需要注意:

      1. Hadoop 默認按照 tab 來分割 key 和 value,以第一個分割出的部分為 key,按 key 進行排序,因此這里使用 sort -t $'\t' -k1,1 來模擬。如果有其他需求,在交給 Hadoop Streaming 執行時可以通過命令行參數設置,本地調試也可以進行相應的調整,主要是調整 sort 的參數。

      2. 如果在 Python 腳本里加上了 shebang,並且為它們添加了執行權限,也可以用類似於 ./mapper.py (會根據 shebang 自動調用指定的解釋器來執行文件)來代替 python mapper.py。

  4. 在集群上運行與監控

    1. 察看文檔

      首先需要知道用於 Streaming 的 Java 程序在哪里。在 1.0.x 的版本中,應該都在 $HADOOP_HOME/contrib/streaming/ 下:

$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar

      通過執行 Hadoop 命令

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info

就會看到一系列 Streaming 自帶的幫助,帶有各種參數的說明和使用樣例。

  5. 運行命令

    用 Hadoop Streaming 執行 Python 程序的一般步驟是:

    1. 將輸入文件放到 HDFS 上,建議使用 copyFromLocal 而不是 put 命令。參見Difference between hadoop fs -put and hadoop fs -copyFromLocal

      1. 一般可以新建一個文件夾用於存放輸入文件,假設叫 input

$ hadoop fs -mkdir input

然后用

$ hadoop fs -ls

查看目錄,可以看到出現了一個 /user/hadoop/input 文件夾。/user/hadoop 是默認的用戶文件夾,相當於本地文件系統中的 /home/hadoop。

      2. 再使用

$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(s)> input/

將本地文件放到 input 文件夾下。

    2. 開始 MapReduce 作業,假設你現在正在放有 mapper 和 reducer 兩個腳本的目錄下,而且它們剛好就叫 mapper.py 和 reducer.py,在不需要做其他配置的情況下,執行

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-input input/* \
-output output

    第一行是告訴 Hadoop 運行 Streaming 的 Java 程序,接下來的是參數:

    這里的 mapper.py 和 reducer.py 是 mapper 所對應 python 程序的路徑。為了讓 Hadoop 將程序分發給其他機器,需要再加一個 -file 參數用於指明要分發的程序放在哪里。

    注意這樣寫的前提是這個 Python 程序里有 Shebang 而且添加了執行權限。如果沒有的話可以改成

-mapper 'python mapper.py'

    加上解釋器命令,用引號擴住(注意在參數中傳入解釋器命令,不再是用`符擴住,而是'符)。准確來說,mapper 后面跟的騎士應該是一個命令而不是文件名。

    假如你執行的程序不放在當前目錄下,比如說在當前目錄的 src 文件夾下,可以這樣寫

-mapper 'python mapper.py' -file src/mapper.py \
-reducer 'python reducer.py' -file src/reducer.py \

    也就是說,-mapper 和 -reducer 后面跟的文件名不需要帶上路徑,而 -file 后的參數需要。注意如果你在 mapper 后的命令用了引號,加上路徑名反而會報錯說找不到這個程序。(因為 -file 選項會將對應的本地參數文件上傳至 Hadoop Streaming 的工作路徑下,所以再執行 -mapper 對應的參數命令能直接找到對應的文件。

    -input 和 -output 后面跟的是 HDFS 上的路徑名,這里的 input/* 指的是"input 文件夾下的所有文件",注意 -output 后面跟着的需要是一個不存在於 HDFS 上的路徑,在產生輸出的時候 Hadoop 會幫你創建這個文件夾,如果已經存在的話就會產生沖突。(因此每次執行 Hadoop Streaming 前可以通過腳本命令 hadoop fs -rmr 清除輸出路徑)。

    有時候 Shebang 不一定能用,尤其是在執行環境比較復雜的時候,最保險的做法是:

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-mapper 'python mapper.py' -file mapper.py \
-reducer 'python reducer.py' -file reducer.py \
-input input/* -output output

    這樣寫還有一個好處,就是可以在引號里寫上提供給 python 程序的命令行參數,甚至做目錄的變更以及環境變量的初始化等一系列 shell 命令。

    由於 mapper 和 reducer 參數跟的實際上是命令,所以如果每台機器上 python 的環境配置不一樣的話,會用每台機器自己的配置去執行 python 程序。

  6. 得到結果

    成功執行完這個任務之后,使用 output 參數在 HDFS 上指定的輸出文件夾里就會多出幾個文件:一個空白文件 _SUCCESS,表面 job 運行成功,這個文件可以讓其他程序只要查看一下 HDFS 就能判斷這次 job 是否運行成功,從而進行相關處理。

    一個 _logs 文件夾,裝着任務日志。

    part-00000,.....,part-xxxxx 文件,有多少個 reducer 后面的數字就會有多大,對應每個 reducer 的輸出結果。

    假如你的輸出很少,比如是一個只有幾行的計數,你可以用

$ hadoop fs -cat <PATH ON HDFS>

直接將輸出打印到終端查看。

    假如你的輸出很多,則需要拷貝到本地文件系統來查看。可以使用 copyToLocal 來獲取整個文件夾。如果你不需要 _SUCCESS 和 _logs,並且想要將所有 reducer 的輸出合並,可以使用 getmerge 命令。

$ hadoop fs -getmerge output ./

    上述命令將 output 下的 part-xxxxx 合並,放到當前目錄的一個叫 output 的文件里。

  7. 如何串聯多趟 MapReduce

    如果有多次任務要執行,下一步需要用上一步的任務做輸入,解決辦法很簡單。假設上一步在 HDFS 的輸出文件夾是 output1,那么在下一步的運行命令中,指明

-input output1/part-*

    即指定上一次的所有輸出為本次任務的輸入即可。

  8. 使用額外的文件

    假如 MapReduce 的 job 除了輸入以外還需要一些額外的文件,有兩種選擇:

    1. 大文件

      所謂的大文件就是大小大於設置的 local.cache.size 的文件,默認是10GB。這個時候可以用 -file 來分發。除此之外代碼本身也可以用 file 來分發。

      格式:假如我要加多一個 sideData.txt 給 python 腳本使用:

$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input inputDir \
-output outputDir \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-file sideData.txt

      這樣 -file 選項的參數文件都會被上傳至 MapReduce 的工作目錄下,所以 mapper 和 reducer 代碼都可以通過文件名直接訪問到文件。在 python 腳本中,只要把這個文件當成自己同一目錄下的本地文件來打開就可以了。比如:

f = open('sideData.txt')

      注意這個 file 是只讀的,不可以寫。

    2. 小文件

      如果是比較小的文件,想要提高讀寫速度可以將它放在 distributed cache 里(也就是每台機器都有自己的一份 copy,不需要網絡 IO 就可以拿到數據)。這里要用到的參數是 -cachefile,寫法和用法與上一個一樣,就是將 -file 改成 -cachefile 而已。

    3. 如果上傳目錄或者多個目錄時使用 -files 選項

      -files dir1,dir2 #多個目錄用','隔開,且不能有空格

      上傳目錄后,可以直接訪問當前目錄

    4. 上傳 HDFS 上的文件或者目錄

      只能 -files 命令上傳 HDFS 路徑下的文件或目錄,然后就可以像訪問本地文件一樣訪問 HDFS 文件。

      比如:

hdfs_file="hdfs://webboss-10-166-133-95:9100/user/hive/conf/part-00000"

input=/user/hive/input/*
output=/user/hive/output
mapper_script=mapper.py
reducer_script=reducer.py
map_file=./mapper.py
reduce_file=./reducer.py

hadoop fs -rmr $output
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar \
-D mapred.reduce.tasks=0 \
-files $hdfs_file \
-input $input \
-output $output \
-mapper $mapper_script \
-file $map_file \
-reducer $reducer_script \
-file $reduce_file            

    然后 map 腳本中就可以直接讀取名為 part-00000 的文件。詳情參考:http://www.cnblogs.com/zhengrunjian/p/4536572.html

  9. 控制 partitioner

    partitioning 指的是數據經過 mapper 處理后,被分發到 reducer 上的過程。partitioner 控制的,就是“怎樣的 mapper 輸出會被分發到哪一個 reducer 上”。

    Hadoop 有幾個自帶的 partitoner,解釋可以看這里。默認的是 HashPartitioner,也就是把第一個 '\t' 前的 key 做 hash 之后用於分配 partition。寫 Hadoop Streaming 程序是可以選擇其他 partitioner 的,你可以選擇自帶的其他幾種里的一種,也可以自己寫一個繼承 Partitioner 的 java 類然后編譯成 jar,在運行參數里指定為你用的 partitioner。

    官方自帶的 partionner 里最常用的是 KeyFieldBasedPartitioner。它會按照 key 的一部分來做 partition,而不是用整個 key 來做 partition。

    在學會用 KeyFieldBasedPartitioner 之前,必然要先學怎么控制 key-value 的分割。分割 key 的步驟可以分成兩步,用 python 來描述一下大約是

fields = output.split(separator)
key = fields[:numKeyfields]

    1. 選擇用什么符號來分割 key,也就是選擇 separator

      map.output.key.field.separator 可以指定用於分割 key 的符號。比如指定為一點的話,就要加上參數。

-D stream.map.output.field.separator=.

      假設你的 mapper 輸出是

11.22.33.44

      這時會用 '.' 進行分割,看准 [11, 22, 33, 44] 這里的其中一個或幾個作為 key。

    2. 選擇 key 的范圍,也就是選擇 numKeyfields

      控制 key 的范圍的參數是這個,假設要設置被分割出的前 2 個元素為 key:

-D stream.num.map.output.key.fields=2

      那么 key 就是上面的 1122。值得注意的是假如這個數字設置到覆蓋整個輸出,在這個例子里是4的話,那么整一行都會變成 key。

      上面分割出 key 之后,KeyFieldBasedPartitioner 還需要知道你想要用 key 里的哪部分作為 partition 的依據。它進行配置的過程可以看源代碼來理解。

      假設在上一步我們通過使用

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 

      將 11.22.33.44 的整個字符串都設置成了 key,下一步就是在這個 key 的內部再進行一次分割。map.output.key.field.separator 可以用來設置第二次分割用的分割符,mapred.text.key.partitioner.options 可以接受參數來划分被分割出來的 partition key,比如:

-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2    

      指的就是在 key 的內部里,將第1到第2個被點分割的元素作為 partition key,這個例子里也就是 1122。這里的值 -ki,j 表示從 i 到 j 個元素(inclusive)會作為 partition key。如果終點省略不寫,像 -ki 的話,那么 i 和 i 之后的元素都會作為 partition key。

      partition key 相同的輸出會保證分到同一個 reducer 上,也就是所有 11.22.xx.xx 的輸出都會到同一個 partitioner,11.22 換成其他各種組合也是一樣。

      實例說明一下,就是這樣的:

      1. mapper 的輸出是

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

      2. 指定前 4 個元素做 key,key 里的前兩個元素做 partition key,分成 3 個 partition 的話,就會被分成

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

      3. 下一步 reducer 會對自己得到的每個 partition 內進行排序,結果就是

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

      Streaming 命令格式如下:

$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=4 \
-D mapred.text.key.partitioner.options=-k1,2 \
-input inputDir \
-output outputDir \
-mapper mapper.py -file mapper.py \
-reducer reducer.py -file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

      注意:

        Hadoop 執行命令時的選項是有順序的,順序是 bin/hadoop command [genericOptions] [commandOptions].

        對於 Streaming,-D 屬於 genericOptions,即 hadoop 的通用選項,所以必須寫在前面。

        Streaming 的所有選項可參考:

          hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -info

    3. 控制 comparator 與自定義排序

      上面說到 mapper 的輸出被 partition 到各個 reducer 之后,會有一步排序。這個排序的標准也是可以通過設置 comparator 控制的。和上面一樣,要先設置分割出 key 用的分割符、key 的范圍,key 內部分隔用的分割符

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=.

      這里要控制的就是 key 內部的哪些元素用來做排序依據,是排字典序還是數字序,倒敘還是正序。用來控制的參數是 mapred.text.key.comparator.options,接受的值格式類似於 unix sort。比如我要按第二個元素的數字序(默認字典序)+倒序來排元素的話,就用 -D mapred.text.key.comparator.options=-k2,2nr

      n表示數字序,r表示倒序。這樣一來

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

      就會被排成

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

    

參考:http://www.uml.org.cn/sjjm/201512111.asp


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM