又是期末又是實訓TA的事耽擱了好久……先把寫好的放上博客吧
相關隨筆:
- Hadoop-1.0.4集群搭建筆記
- 用python + hadoop streaming 編寫分布式程序(一) -- 原理介紹,樣例程序與本地調試
- 用python + hadoop streaming 編寫分布式程序(二) -- 在集群上運行與監控
使用額外的文件
假如你跑的job除了輸入以外還需要一些額外的文件(side data),有兩種選擇:
-
大文件
所謂的大文件就是大小大於設置的local.cache.size的文件,默認是10GB。這個時候可以用-file來分發。除此之外代碼本身也可以用file來分發。
格式:假如我要加多一個sideData.txt給python腳本用:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input iputDir \ -output outputDir \ -mapper mapper.py \ -file mapper.py \ -reducer reduer.py \ -file reducer.py \ -file sideDate.txt
在python腳本里,只要把這個文件當成自己同一目錄下的本地文件來打開就可以了。比如:
f = open("sideData.txt")
注意這個file是只讀的,不可以寫。
-
小文件
如果是比較小的文件,想要提高讀寫速度可以將它放在distributed cache里(也就是每台機器都有自己的一份copy,不需要網絡IO就可以拿到數據)。這里要用到的參數是-cachefile,寫法和用法上一個一樣,就是-file改成-cachefile而已。
控制partitioner
partitioning指的是數據經過mapper處理后,被分發到reducer上的過程。partitioner控制的,就是“怎樣的mapper輸出會被分發到哪一個reducer上”。
Hadoop有幾個自帶的partitioner,解釋可以看這里。默認的是HashPartitioner,也就是把第一個tab前的key做hash之后用於分配partition。寫Hadoop Streaming程序是可以選擇其他partitioner的,你可以選擇自帶的其他幾種里的一種,也可以自己寫一個繼承Partitioner的java類然后編譯成jar,在運行參數里指定為你用的partitioner。
官方自帶的partitioner里最常用的是KeyFieldBasedPartitioner(源代碼可以看這里)。它會按照key的一部分來做partition,而不是用整個key來做partition。
在學會用KeyFieldBasedPartitioner之前,必然要先學怎么控制key-value的分割。分割key的步驟可以分為兩步,用python來描述一下大約是
fields = output.split(seperator)
key = fields[:numKeyfields]
-
選擇用什么符號來分割key,也就是選擇seperator
map.output.key.field.separator可以指定用於分隔key的符號。比如指定為一點的話,就要加上參數
-D stream.map.output.field.separator=.
假設你的mapper輸出是
11.22.33.44
這時會先看准[11, 22, 33, 44]這里的其中一個或幾個作為key
-
選擇key的范圍,也就是選擇numKeyfields
控制key的范圍的參數是這個,假設我要設置被分割出的前2個元素為key:
-D stream.num.map.output.key.fields=2
那么key就是上面的 1122。值得注意的是假如這個數字設置到覆蓋整個輸出,在這個例子里是4的話,那么整一行都會變成key。
上面分割出key之后, KeyFieldBasedPartitioner還需要知道你想要用key里的哪部分作為partition的依據。它進行配置的過程可以看源代碼來理解。
假設在上一步我們通過使用
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
將11.22.33.44的整個字符串都設置成了key,下一步就是在這個key的內部再進行一次分割。map.output.key.field.separator可以用來設置第二次分割用的分割符,mapred.text.key.partitioner.options可以接受參數來划分被分割出來的partition key,比如:
-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \
指的就是在key的內部里,將第1到第2個被點分割的元素作為partition key,這個例子里也就是1122。這里的值-ki,j表示從i到j個元素(inclusive)會作為partition key。如果終點省略不寫,像-ki的話,那么i和i之后的元素都會作為partition key。
partition key相同的輸出會保證分到同一個reducer上,也就是所有11.22.xx.xx的輸出都會到同一個partitioner,11.22換成其他各種組合也是一樣。
實例說明一下,就是這樣的:
-
mapper的輸出是
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
-
指定前4個元素做key,key里的前兩個元素做partition key,分成3個partition的話,就會被分成
11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3
11.14.2.2 -
下一步reducer會對自己得到的每個partition內進行排序,結果就是
11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3
命令格式大約就是長這樣
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \
-input inputDir \
-output outputDir \
-mapper mapper.py -file mapper.py \
-reducer reducer.py -file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
注意-D參數放在前面,指定用KeyFieldBasedPartitioner的-partitioner要放在下面。
控制comparator與自定義排序
上面說到mapper的輸出被partition到各個reducer之后,會有一步排序。這個排序的標准也是可以通過設置comparator控制的。和上面一樣,要先設置分割出key用的分隔符、key的范圍,key內部分割用的分隔符
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \
這里要控制的就是key內部的哪些元素用來做排序依據,是排字典序還是數字序,倒序還是正序。用來控制的參數是mapred.text.key.comparator.options,接受的值格式類似於unix sort。比如我要按第二個元素的數字序(默認字典序)+倒序來排元素的話,就用
-D mapred.text.key.comparator.options=-k2,2nr
n表示數字序,r表示倒序。這樣一來
11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2
就會被排成
11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1