hadoop streaming多路輸出方法和注意點(附超大數據diff對比源碼)


簡介

hadoop 支持reduce多路輸出的功能,一個reduce可以輸出到多個part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在輸出<key,value>對的時候,在value的后面追加"#X"后綴,比如#A,輸出的文件就是part-00000-A,不同的后綴可以把key,value輸出到不同的文件中,方便做輸出類型分類, #X僅僅用做指定輸出文件后綴, 不會體現到輸出的內容中

使用方法

啟動腳本中需要指定-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat或者-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat, 輸出就會按照多路輸出的方式進行分文件輸出

所有標准輸出的value中都要加上 #X后綴,X代表A-Z, 不然會報invalid suffix錯誤 

簡單示例如下:

$HADOOP_HOME_PATH/bin/hadoop streaming \
      -Dhadoop.job.ugi="$HADOOP_JOB_UGI" \
      -file ./map.sh \
      -file ./red.sh \
      -file ./config.sh \
      -mapper "sh -x map.sh" \
      -reducer "sh -x red.sh" \
      -input $NEW_INPUT_PATH \
      -input $OLD_INPUT_PATH \
      -output  $OUTPUT_PATH \
      -jobconf stream.num.map.output.key.fields=1 \
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
      -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat \
      -jobconf mapred.job.name="test-shapherd-dist-diff" \
      -jobconf mapred.job.priority=HIGH \
      -jobconf mapred.job.map.capacity=100 \
      -jobconf mapred.job.reduce.capacity=100 \
      -jobconf mapred.reduce.tasks=3

在red腳本中可以所以的輸出都加上后綴, 這樣輸出就是分part的了,比如大數據diff對比的腳本

map.sh如下:

source ./config.sh

awk 'BEGIN{
}
{
    if(match("'${map_input_file}'","'$OLD_INPUT_PATH'"))
    {
        print $0"\t"0
    next
    }
    if(match("'${map_input_file}'","'$NEW_INPUT_PATH'"))
    print $0"\t"1
}'

exit 0

  

red.sh如下:

awk  -F"\t" 'BEGIN{
    key=""
        flag=0
        num=0
        old_num=0
        new_num=0
        diff_num=0
}
{
    if($NF == "0")
        old_num++
    else
        new_num++
    if($1 != key)
    {
        if(key != "")
        {
            if(num <= 1)
            {
                diff_num++
                if(flag == "0")
                    print $0"#A"
                else
                    print $0"#B"
            }
        }
        key=$1
        flag=$NF
        num=1
        next
    }

    if(key == $1)
    {
        num++
        next    
    }

}
END{
        if(num  == 1)
        {
            if(flag == "0")
                print $0"#A"
            else
                print $0"#B"
        }

        print old_num"\tshapherd#C"
        print new_num"\tshapherd#D"
        print diff_num"\tshapherd#E"
}'


exit 0

  

  

我的兩個大數據沒有diff, 所以輸出就是:

part-00000-C
part-00000-D
part-00000-E
part-00001-C
part-00001-D
part-00001-E
part-00002-C
part-00002-D
part-00002-E

沒有A和B結尾的

注意事項

  • 多路輸出最多支持26路, 也就是字母只能是A-Z范圍。
  • reduce的輸入key和value的分隔符默認是\t, 如果輸出中沒有\t,reduce腳本會把整行當作key, value就是空的,這時如果加了#X,會報invalid suffix錯誤,因為#X作為了key的一部分,這種問題一種是保證你的key和value是按照\t分隔的, 一種是指定自己想要的分隔符。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM