大數據mapreduce全局排序top-N之python實現


a.txt、b.txt文件如下:

a.txt

1       hadoop
3       hadoop
5       hadoop
7       hadoop
9       hadoop
11      hadoop
13      hadoop
15      hadoop
17      hadoop
19      hadoop
21      hadoop
23      hadoop
25      hadoop
27      hadoop
29      hadoop
31      hadoop
33      hadoop
35      hadoop
37      hadoop
39      hadoop
41      hadoop
43      hadoop
45      hadoop
47      hadoop
49      hadoop
51      hadoop
53      hadoop
55      hadoop
57      hadoop

b.txt如下:

0       java
2       java
4       java
6       java
8       java
10      java
12      java
14      java
16      java
18      java
20      java
22      java
24      java
26      java
28      java
30      java
32      java
34      java
36      java
38      java
40      java
42      java
44      java
46      java
48      java
50      java
52      java
54      java
56      java
58      java

將a.txt、b.txt上傳至hdfs文件 /mapreduce/allsort 內:

hadoop fs -put a.txt b.txt  /mapreduce/allsort

實驗一:第一種全局排序為,將數字列作為key,其余為value,設置一個reduce,利用shffer階段,進行排序:(sgffer排序默認字符串排序,需要注意)

map.py代碼如下:

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
        ss=line.strip().split('\t')
        key,val=ss
        new_key=base_count-int(key)
        print "%s\t%s"%(new_key,val)

red.py代碼如下:

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
    ss=line.strip().split()
    key=9000000000-int(ss[0])
    print "%s\t%s"%(key,ss[1])

run.sh代碼如下:

HADOOP=/usr/local/src/hadoop-1.2.1/bin/hadoop
HADOOP_STREAMING=/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar
INPUT_PATH=/mapreduce/allsort
OUT_PATH=/mapreduce/allsort/out
$HADOOP jar $HADOOP_STREAMING \
    -input $INPUT_PATH \
    -output $OUT_PATH \
    -mapper "python map.py" \
    -reducer "python red.py" \
    -file "./map.py" \
    -file "./red.py"

不設置reduce的運行個數,默認red.py的個數為1,結果輸出為一個文件,且完成了倒排序;

實驗二:設置3個reduce,每個ruduce內部完成排序,且3個reduce間也可以排序;思路:3個桶,60-40為0號桶、40-20為1號桶,20以下為3號桶,每個桶內部依賴shffer排序

map.py

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
        ss=line.strip().split('\t')
        key,val=ss
        new_key=base_count-int(key)
        if int(key)>=40:
                print "%s\t%s\t%s"%("0",new_key,val)
        elif int(key)>=20:
                print "%s\t%s\t%s"%("1",new_key,val)
        else:
                print "%s\t%s\t%s"%("2",new_key,val)

red.py

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
        ss=line.strip().split()
        key=base_count-int(ss[1])
        print "%s\t%s"%(key,ss[2])

run.sh

HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop"
HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_PATH="/mapreduce/allsort"
OUT_PATH="/mapreduce/allsort/out"
$HADOOP fs -rmr $OUT_PATH
$HADOOP jar $HADOOP_STREAMING \
        -input $INPUT_PATH \
        -output $OUT_PATH \
        -mapper "python map.py" \
        -reducer "python red.py" \
        -file "./map.py" \
        -file "./red.py" \
       -jobconf "mapred.reduce.tasks=3" \
       -jobconf "stream.num.map.output.key.fields=2" \#設置前2個為key
       -jobconf "num.key.fields.for.partition=1" \ #設置第一個為partition
       -partitioner "org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner"

實驗三:通過參數調控實現全局排序:

數據如下:

aaa.txt

d.1.5.23
e.9.4.5
e.5.9.22
e.5.1.45
e.5.1.23
a.7.2.6
f.8.3.3

目的:在streaming模式默認hadoop會把map輸出的一行中遇到的第一個設定的字段分隔符前面的部分作為key,后面的作為 value,這里我們將map輸出的前2個字段作為key,后面字段作為value,並且不使用hadoop默認的“\t”字段分隔符,而是根據該 文本特點使用“.”來分割

實現前3個字段為key排序,后面為value;

第2個和第三個字段為partition

run.sh如下:

HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop"
HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_PATH="/mapreduce/allsort/aaa.txt"
OUT_PATH="/mapreduce/allsort/out"
$HADOOP fs -rmr $OUT_PATH
$HADOOP jar $HADOOP_STREAMING \
        -input $INPUT_PATH \
        -output $OUT_PATH \
        -mapper "cat" \
        -reducer "cat" \
        -jobconf stream.num.map.output.key.fields=3 \
        -jobconf stream.map.output.field.separator=. \
        -jobconf map.output.key.field.separator=. \
        -jobconf mapred.text.key.partitioner.options=-k2,3 \
        -jobconf mapred.reduce.tasks=3 \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM