hadoop streaming字段排序介紹


我們在使用hadoop streaming的時候默認streaming的map和reduce的separator不指定的話,map和reduce會根據它們默認的分隔符來進行排序

map、reduce:默認的分隔符是\t(讀入數據

得到的結果都是按第一個分隔符排序去重后的結果

 

假設我們的有這么一列數據:USER IP DIR

我們想得到某一個用戶的某一個ip的一系列dir,那我們應該怎么辦呢?

這里我們就會用到streaming map和reduce的separator來指定key來進行排序和去重

 

1.默認情況

在hadoop streaming的默認情況下,是以”\t”作為分隔符的。對於標准輸入來說,每行的第一個”\t” 以前的部分為key,其他部分為對應的value。如果一個”\t”字符沒有,則整行都被當做key。這個

 

2.map階段的sort與partition

map階段很重要的階段包括sort與partition。排序是按照key來進行的。咱們之前講了默認的key是由”\t”分隔得到的。我們能不能自己控制相關的sort與partition呢?答案是可以的。

 

先看以下幾個參數:

map.output.key.field.separator: map中key內部的分隔符

num.key.fields.for.partition: 分桶時,key按前面指定的分隔符分隔之后,用於分桶的key占的列數。通俗地講,就是partition時候按照key中的前幾列進行划分,相同的key會被打到同一個reduce里。

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 前兩個參數,要配合partitioner選項使用!

 

stream.map.output.field.separator: map中的key與value分隔符

stream.num.map.output.key.fields: map中分隔符的位置

stream.reduce.output.field.separator: reduce中key與value的分隔符

stream.num.reduce.output.key.fields: reduce中分隔符的位置

 

3.分桶測試實例

准備數據:

$ cat tmp

1,2,1,1,1

1,2,2,1,1

1,3,1,1,1

1,3,2,1,1

1,3,3,1,1

1,2,3,1,1

1,3,1,1,1

1,3,2,1,1

1,3,3,1,1

 

上傳到hdfs中。

cat mapper.sh

#!/bin/bash

cat

 

$ cat reducer.sh

#!/bin/bash

sort

 

#!/bin/bash

streaming=/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar

output=/tmp/wanglei/part_out

if hadoop fs -test -d $output

then

hadoop fs -rm -r $output

fi

hadoop jar $streaming \

-D map.output.key.field.separator=, \

-D num.key.fields.for.partition=2 \

-D stream.reduce.output.field.separator=, \

-D stream.num.reduce.output.key.fields=4 \

-D mapred.reduce.tasks=2 \

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

-input /tmp/wanglei/partition \

-output $output \

-mapper "sh mapper.sh" \

-reducer "sh reducer.sh" \

-file mapper.sh \

-file reducer.sh

 

代碼最后的運行結果:

 

$ hadoop fs -cat /tmp/wanglei/part_out/part-00000

1,3,1,1 1

1,3,1,1 1

1,3,2,1 1

1,3,2,1 1

1,3,3,1 1

1,3,3,1 1

 

 

$ hadoop fs -cat /tmp/wanglei/part_out/part-00001

1,2,1,1 1

1,2,2,1 1

1,2,3,1 1

 

稍微解釋一下輸出:

1.map階段,key是按逗號分隔的,partition的階段取前兩個字段,所以前兩個字段相同的key都被打到同一個reduce里。這一點從reduce的兩個文件結果中就能看出來。

2.reduce階段通過stream.reduce.output.field.separator指定分隔符為”,”,通過stream.num.reduce.output.key.fields指定前4個字段為key,所以才會有最終的結果。

 

需要注意的幾個小點:

1.之前寫的代碼,當分發的文件有多個的時候,可以用-files指定。但是加了上面的參數以后,再用-files會報錯。具體原因未知。

2.-file 參數必須寫在最后面。如果寫在-input前面,代碼也會報錯。具體原因暫時也未知。

3.-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner參數必須指定,否則代碼沒法輸出預期結果。

 

4.map階段輸出測試實例

stream.map.output.field.separator與stream.num.map.output.key.fields與上面partition一組參數指定map輸出格式是一致的。不一樣的地方在stream這組參數是真正用於map端的輸出,而partition那組參數是用於分桶!

 

看下測試代碼就清楚了:

#!/bin/bash

streaming=/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar

output=/tmp/wanglei/part_out_map

if hadoop fs -test -d $output

then

hadoop fs -rm -r $output

fi

 

hadoop jar $streaming \

-D stream.map.output.field.separator=, \

-D stream.num.map.output.key.fields=2 \

-input /tmp/wanglei/partition \

-output $output \

-mapper "sh mapper.sh" \

-file mapper.sh

 

$ hadoop fs -cat /tmp/wanglei/part_out_map/*

1,2 3,1,1

1,2 2,1,1

1,2 1,1,1

1,3 3,1,1

1,3 2,1,1

1,3 1,1,1

1,3 3,1,1

1,3 2,1,1

1,3 1,1,1

 

將reducer部分去掉,只輸出mapper的結果。可以看出:

1.mapper階段輸出的k,v以”\t”分隔(框架默認)

2.mapper階段以”,”分隔,key占了兩個字段。

3.mapper階段按key排序,所以1,2開頭的數據在前,1,3開頭的數據在后!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM