一個mapreduce程序大致分成三個部分,第一部分是mapper文件,第二個就是reducer文件,第三部分就是使用hadoop command 執行程序。
在這個過程中,困惑我最久的一個問題就是在hadoop command中hadoop-streaming 也就是streaming jar包的路徑。
路徑大概是這樣的:
cd ~
cd /usr/local/hadoop-2.7.3/share/hadoop/tools/lib
#在這個文件下,我們可以找到你 hadoop-streaming-2.7.3.jar
這個路徑是參考的這里
這個最基本的mapreduce程序我主要參考了三個博客:
第一個-主要是參考這個博客的mapper和reducer的寫法-在這個博客中它在練習中給出了只寫mapper執行文件的一個例子
第三個博客-主要是參考這個博客的將本地文件上傳到hdfs文件系統中
首先對於mapper文件
mapper.py
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
#上面這個文件我們得到的結果大概是每個單詞對應一個數字1
對於reducer文件:reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
對上面兩個代碼先進行一個本地的檢測
vim test.txt
foo foo quux labs foo bar quux
cat test.txt|python mapper.py
cat test.txt|python mapper.py|sort|python reducer.py
##注意在這里我們執行萬mapper之后我們進行了一個排序,所以對於相同單詞是處於相鄰位置的,這樣在執行reducer文件的時候代碼可以寫的比較簡單一點
然后在hadoop集群中跑這個代碼
首先講這個test.txt 上傳到相應的hdfs文件系統中,使用的命令模式如下:
hadoop fs -put ./test.txt /dw_ext/weibo_bigdata_ugrowth/mds/
然后寫一個run.sh
HADOOP_CMD="/usr/local/hadoop-2.7.3/bin/hadoop" # hadoop的bin的路徑
STREAM_JAR_PATH="/usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar" ## streaming jar包的路徑
INPUT_FILE_PATH="/dw_ext/weibo_bigdata_ugrowth/mds/src.txt" #hadoop集群上的資源輸入路徑
#需要注意的是intput文件必須是在hadooop集群上的hdfs文件中的,所以必須將本地文件上傳到集群上
OUTPUT_PATH="/dw_ext/weibo_bigdata_ugrowth/mds/output"
#需要注意的是這output文件必須是不存在的目錄,因為我已經執行過一次了,所以這里我把這個目錄通過下面的代碼刪掉
$HADOOP_CMD fs -rmr $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file ./mapper.py \
-file ./reducer.py
# -mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
# -reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
# -file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
明天看這個
https://www.cnblogs.com/shay-zhangjin/p/7714868.html
https://www.cnblogs.com/kaituorensheng/p/3826114.html