a.txt、b.txt文件如下:
a.txt
1 hadoop 3 hadoop 5 hadoop 7 hadoop 9 hadoop 11 hadoop 13 hadoop 15 hadoop 17 hadoop 19 hadoop 21 hadoop 23 hadoop 25 hadoop 27 hadoop 29 hadoop 31 hadoop 33 hadoop 35 hadoop 37 hadoop 39 hadoop 41 hadoop 43 hadoop 45 hadoop 47 hadoop 49 hadoop 51 hadoop 53 hadoop 55 hadoop 57 hadoop
b.txt如下:
0 java 2 java 4 java 6 java 8 java 10 java 12 java 14 java 16 java 18 java 20 java 22 java 24 java 26 java 28 java 30 java 32 java 34 java 36 java 38 java 40 java 42 java 44 java 46 java 48 java 50 java 52 java 54 java 56 java 58 java
將a.txt、b.txt上傳至hdfs文件 /mapreduce/allsort 內:
hadoop fs -put a.txt b.txt /mapreduce/allsort
實驗一:第一種全局排序為,將數字列作為key,其余為value,設置一個reduce,利用shffer階段,進行排序:(sgffer排序默認字符串排序,需要注意)
map.py代碼如下:
#!usr/bin/python import sys base_count=9000000000 for line in sys.stdin: ss=line.strip().split('\t') key,val=ss new_key=base_count-int(key) print "%s\t%s"%(new_key,val)
red.py代碼如下:
#!usr/bin/python import sys base_count=9000000000 for line in sys.stdin: ss=line.strip().split() key=9000000000-int(ss[0]) print "%s\t%s"%(key,ss[1])
run.sh代碼如下:
HADOOP=/usr/local/src/hadoop-1.2.1/bin/hadoop HADOOP_STREAMING=/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar INPUT_PATH=/mapreduce/allsort OUT_PATH=/mapreduce/allsort/out $HADOOP jar $HADOOP_STREAMING \ -input $INPUT_PATH \ -output $OUT_PATH \ -mapper "python map.py" \ -reducer "python red.py" \ -file "./map.py" \ -file "./red.py"
不設置reduce的運行個數,默認red.py的個數為1,結果輸出為一個文件,且完成了倒排序;
實驗二:設置3個reduce,每個ruduce內部完成排序,且3個reduce間也可以排序;思路:3個桶,60-40為0號桶、40-20為1號桶,20以下為3號桶,每個桶內部依賴shffer排序
map.py
#!usr/bin/python import sys base_count=9000000000 for line in sys.stdin: ss=line.strip().split('\t') key,val=ss new_key=base_count-int(key) if int(key)>=40: print "%s\t%s\t%s"%("0",new_key,val) elif int(key)>=20: print "%s\t%s\t%s"%("1",new_key,val) else: print "%s\t%s\t%s"%("2",new_key,val)
red.py
#!usr/bin/python import sys base_count=9000000000 for line in sys.stdin: ss=line.strip().split() key=base_count-int(ss[1]) print "%s\t%s"%(key,ss[2])
run.sh
HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop" HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar" INPUT_PATH="/mapreduce/allsort" OUT_PATH="/mapreduce/allsort/out" $HADOOP fs -rmr $OUT_PATH $HADOOP jar $HADOOP_STREAMING \ -input $INPUT_PATH \ -output $OUT_PATH \ -mapper "python map.py" \ -reducer "python red.py" \ -file "./map.py" \ -file "./red.py" \ -jobconf "mapred.reduce.tasks=3" \ -jobconf "stream.num.map.output.key.fields=2" \#設置前2個為key -jobconf "num.key.fields.for.partition=1" \ #設置第一個為partition -partitioner "org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner"
實驗三:通過參數調控實現全局排序:
數據如下:
aaa.txt
d.1.5.23 e.9.4.5 e.5.9.22 e.5.1.45 e.5.1.23 a.7.2.6 f.8.3.3
目的:在streaming模式默認hadoop會把map輸出的一行中遇到的第一個設定的字段分隔符前面的部分作為key,后面的作為 value,這里我們將map輸出的前2個字段作為key,后面字段作為value,並且不使用hadoop默認的“\t”字段分隔符,而是根據該 文本特點使用“.”來分割
實現前3個字段為key排序,后面為value;
第2個和第三個字段為partition
run.sh如下:
HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop" HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar" INPUT_PATH="/mapreduce/allsort/aaa.txt" OUT_PATH="/mapreduce/allsort/out" $HADOOP fs -rmr $OUT_PATH $HADOOP jar $HADOOP_STREAMING \ -input $INPUT_PATH \ -output $OUT_PATH \ -mapper "cat" \ -reducer "cat" \ -jobconf stream.num.map.output.key.fields=3 \ -jobconf stream.map.output.field.separator=. \ -jobconf map.output.key.field.separator=. \ -jobconf mapred.text.key.partitioner.options=-k2,3 \ -jobconf mapred.reduce.tasks=3 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner