Hadoop Streaming示例程序(wordcount)
run_hadoop_word_counter.sh
$HADOOP_BIN streaming \
-input "${INPUT}" \
-output "${OUT_DIR}" \
-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \
-file "mapper_word_counter.py" \
-file "reducer_word_counter.py" \
-file "filter_word_counter.py" \
-mapper "./python2.7.2/bin/python mapper_word_counter.py" \
-combiner "./python2.7.2/bin/python reducer_word_counter.py" \
-reducer "./python2.7.2/bin/python reducer_word_counter.py" \
-jobconf abaci.job.base.environment="centos6u3_hadoop" \
-jobconf mapred.job.priority="NORMAL" \
-jobconf mapred.job.name="${TASK_NAME}" \
-jobconf mapred.map.tasks="${MAP_NUM}" \
-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \
-jobconf mapred.map.memory.limit="1000" \
-jobconf mapred.reduce.memory.limit="1000" \
-jobconf mapred.job.map.capacity="3000" \
-jobconf mapred.job.reduce.capacity="2500" \
-jobconf mapred.job.keep.files.hours=12 \
-jobconf mapred.max.map.failures.percent=1 \
-jobconf mapred.reduce.tasks.speculative.execution="false"
mapper_word_counter.py
import sys
for line in sys.stdin:
fields = line.strip().split('\t')
try:
cnt = 1
dateval = fields[1]
sys.stdout.write('%s\t%d\n' %(dateval, cnt))
except Exception as exp:
sys.stderr.write("exp:%s, %s" %(str(exp), line))
reducer_word_counter.py
import sys
word_pre = None
counter_pre = 0
for line in sys.stdin:
try:
word, cnt = line.strip().split('\t')
cnt = int(cnt)
except Exception as exp:
sys.stderr.write('Exp:%s,line:%s' %(str(exp), line.strip()))
continue
if word == word_pre:
counter_pre += cnt
else:
if word_pre:
print('%s\t%d' %(word_pre, counter_pre))
word_pre = word
counter_pre = cnt
if word_pre:
print('%s\t%d' %(word_pre, counter_pre))
純文本輸入格式
- 每個mapper輸入若干行
-inputformat "org.apache.hadoop.mapred.TextInputFormat" \ - 指定每個mapper輸入的行數
-inputformat "org.apache.hadoop.mapred.lib.NLineInputFormat"
-jobconf mapred.line.input.format.linespermap="5" \
注意:輸入給mapper的內容會在每行前新增一行偏移的數字
文件分發方式:
-file將客戶端本地文件打成jar包上傳到HDFS然后分發到計算節點;
-cacheFile將HDFS文件分發到計算節點;
-cacheArchive將HDFS壓縮文件分發到計算節點並解壓;
分桶&排序
Hadoop默認會把map輸出行中遇到的第一個分隔符(默認為\t)前面的部分作為key,后面的作為value,如果輸出行中沒有指定的分隔符,則整行作為key,value被設置為空字符串。mapper輸出的key經過partition分發到不同的reduce里。
- 應用示例
${HADOOP_BIN} streaming \
-input "${INPUT}" \
-output "${OUT_DIR}" \
-mapper cat \
-reducer cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-jobconf stream.num.map.output.key.fields=4 \
-jobconf stream.map.output.field.separator=. \
-jobconf map.output.key.field.separator=. \
-jobconf mapred.text.key.partitioner.options=-k1,2 \
-jobconf mapred.text.key.comparator.options="-k3,3 -k4nr" \
-jobconf stream.reduce.output.field.separator=. \
-jobconf stream.num.reduce.output.key.fields=4 \
-jobconf mapred.reduce.tasks=5
說明:
- 設定mapper輸出的key
stream.map.output.field.separator 設置map輸出的字段分隔符
stream.num.map.output.key.fields 設置map輸出的前幾個字段作為key - 設定根據key進行分桶的規則
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner partition類
map.output.key.field.separator 設置key內的字段分隔符(KeyFieldBasedPartitioner和KeyFieldBasedComparator所特有)
num.key.fields.for.partition 設置key內前幾個字段用來做partition
mapred.text.key.partitioner.options 可單獨指定key中哪些字段做partition,和num.key.fields.for.partition一起使用以num.key.fields.for.partition為准 - 設定根據key進行排序的規則
KeyFieldBasedComparator 可靈活設置的高級比較器,默認使用Text的基於字典序或者通過-n來基於數字比較
mapred.text.key.comparator.options 設置key中需要比較的字段或字節范圍 - 設定reducer輸出的key
stream.reduce.output.field.separator 設置reduce輸出的字段分隔符
stream.num.reduce.output.key.fields 設置reduce輸出的前幾個字段作為key
多路輸出
Hadoop支持多路輸出,可以將MapReduce的處理數據輸出到多個part-xxxxx-X文件中(X是A-Z共26個字母中的一個)。程序需要在maper(正對僅有mapper的MR任務)/reducer(針對包含reducer的任務)程序中將輸出形式由<key,value>變為<key, value#X>,以便輸出特定后綴的文件中。其中#X僅僅用做指定輸出文件后綴, 不會出現在輸出內容中。
啟動腳本中需要指定
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat
或者
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat
- 應用示例
run_hadoop.sh
${HADOOP_BIN} streaming \
-input "${INPUT}" \
-output "${OUT_DIR}" \
-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \
-file "mapper_worker.sh" \
-file "reducer_worker.py" \
-mapper "sh mapper_worker.sh" \
-reducer "python2.7.2/bin/python reducer_worker.py" \
-inputformat "org.apache.hadoop.mapred.TextInputFormat" \
-outputformat "org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" \
-jobconf mapred.job.priority="NORMAL" \
-jobconf mapred.job.name="${TASK_NAME}" \
-jobconf mapred.map.tasks="${MAP_NUM}" \
-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \
-jobconf mapred.max.split.size=134217728 \
-jobconf mapred.map.memory.limit="800" \
-jobconf mapred.reduce.memory.limit="500" \
-jobconf mapred.job.map.capacity="3500" \
-jobconf mapred.job.reduce.capacity="2000" \
-jobconf mapred.job.keep.files.hours=12 \
-jobconf mapred.max.map.failures.percent=1 \
-jobconf mapred.reduce.tasks.speculative.execution="false"
reducer_worder.py
for line in sys.stdin:
record = line.strip()
fields = record.split('\t')
if len(fields) != 7:
continue
vcpurl, playurl, title, poster, duration, pubtime, accept = fields
duration = int(duration)
pubtime = int(pubtime)
accept = int(accept)
if duration < 60:
sys.stdout.write('%s#A\n' %(record))
elif duration < 300:
sys.stdout.write('%s#B\n' %(record))
else:
sys.stdout.write('%s#C\n' %(record))
本地調試
為避免在啟動MR任務后才發現程序bug,最好提前在本地模擬MR的運行流程,驗證結果是否符合預期
cat inputfile | ./mapper_task.sh | sort -t$'\t' -k1,1 | ./reducer.sh
壓縮輸出
Hadoop默認支持gzip壓縮, streaming作業中指定以下參數即可使輸出以gzip形式壓縮.
-D mapreduce.output.fileoutputformat.compress=true
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
Hadoop 是可自行讀取gzip壓縮的數據,無需特殊指明輸入是 Gzip 壓縮。Gzip 的特點是壓縮比較高,Hadoop 原生支持,缺點是壓縮效率並不是很高,壓縮比和效率不可兼得,需要考慮其他壓縮方式。
Hadoop常用配置項
配置名 | 說明 |
---|---|
abaci.job.base.environment | centos6u3_hadoop 如果系統環境需要升級,可以指定為 centos6u3_hadoop 支持更高版本的 glibc |
stream.memory.limit | 單個map/reduce最高使用內存,默認800M |
mapred.map.memory.limit | 單個map最高使用內存,優先級高於stream.memory.limit |
mapred.reduce.memory.limit | 單個reduce最高使用內存,優先級高於stream.memory.limit |
mapred.map.capacity.per.tasktracker | 每台機器最多同時啟動map個數 |
mapred.reduce.capacity.per.tasktracker | 每台機器最多同時啟動reduce個數 |
mapred.job.map.capacity | map並發數目 |
mapred.job.reduce.capacity | reduce並發數目 |
abaci.job.map.max.capacity | map並發限制,默認10000 |
abaci.job.reduce.max.capacity | reduce並發限制,默認3000 |
mapred.map.tasks | map數目 |
mapred.reduce.tasks | reduce數目 |
mapred.job.reuse.jvm.num.tasks | 1表示不reuse,-1表示無限reuse,其他數值表示每個jvm reuse次數。reuse的時候,map結束時不會釋放內存 |
mapred.compress.map.output | 指定map的輸出是否壓縮。有助於減小數據量,減小io壓力,但壓縮和解壓有cpu成本,需要慎重選擇壓縮算法 |
mapred.map.output.compression.codec | map輸出的壓縮算法 |
mapred.output.compress | reduce輸出是否壓縮 |
mapred.output.compression.codec | 控制mapred的輸出的壓縮的方式 |
io.compression.codecs | 壓縮算法 |
mapred.max.map.failures.percent | 容忍map錯誤百分比,默認為0 |
mapred.max.reduce.failures.percent | 容忍reduce錯誤百分比,默認為0 |
stream.map.output.field.separator | map輸出分隔符,默認Tab |
stream.reduce.output.field.separator | reduce輸出分隔符,默認Tab |
mapred.textoutputformat.separator | 設置TextOutputFormat的輸出key,value分隔符,默認Tab |
mapred.textoutputformat.ignoreseparator | 設置為true后,當只有key沒有value會去掉自動補上的Tab |
mapred.min.split.size | 指定map最小處理數據量,單位B |
mapred.max.split.size | 指定map最多處理數據量,單位B,同時設置inputformat=org.apache.hadoop.mapred.CombineTextInputFormat |
mapred.combine.input.format.local.only | 是否只合並本節點,默認true,設置為false可以跨節點合並數據 |
abaci.job.map.cpu.percent | map消耗cpu占比,參數默認值40(表示1個cpu的40%,即0.4個cpu) |
abaci.job.reduce.cpu.percent | reduce消耗cpu占比,參數默認值40(表示1個cpu的40%,即0.4個cpu) |
mapred.map.capacity.per.tasktracker | 表示每個節點最多並行跑幾個該job的map任務(請根據內存情況適當增減該參數,默認是8) |
mapred.reduce.capacity.per.tasktracker | 表示每個節點最多並行跑幾個該job的reduce任務(請根據內存情況適當增減該參數,默認是8) |
mapred.map.tasks.speculative.execution | 開啟map預測執行,默認true |
mapred.reduce.tasks.speculative.execution | 開啟reduce預測執行,默認true |
Hadoop環境下系統變量
- 變量名列表
變量名 | 變量說明 |
---|---|
HADOOP_HOME | 計算節點上配置的Hadoop路徑 |
LD_LIBRARY_PATH | 計算節點上加載庫文件的路徑列表 |
PWD | 當前工作目錄 |
dfs_block_size | 當前設置的HDFS文件塊大小 |
map_input_file | mapper正在處理的輸入文件路徑 |
mapred_job_id | 作業ID |
mapred_job_name | 作業名 |
mapred_tip_id | 當前任務的第幾次重試 |
mapred_task_id | 任務ID |
mapred_task_is_map | 當前任務是否為map |
mapred_output_dir | 計算輸出路徑 |
mapred_map_tasks | 計算的map任務數 |
mapred_reduce_tasks | 計算的reduce任務數 |
https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Configured+Parameters
- 應用示例:
Shell版
#!/bin/bash
set -o pipefail
HOST="localhost"
PORT=$((1000 + ${mapred_task_partition}))
awk '{print $2}' \
| ./access_remote_data ${HOST} ${PORT} outdata.gz
hdfs_outfile=${mapred_work_output_dir}/${mapred_task_partition}.pack
cat outdata.gz \
| gzip -d \
| python ../postprocess.py
| ${HADOOP_HOME}/bin/hadoop fs -D hadoop.job.ugi="username,pwd" -copyFromLocal - ${hdfs_outfile}
Python版
import os
input_file = os.environ['mapreduce_map_input_file']
#do something else
References
Hadoop Streaming相關官方文檔:https://hadoop.apache.org/docs/r3.1.2/hadoop-streaming/HadoopStreaming.html
Hadoop Streaming入門:http://icejoywoo.github.io/2015/09/28/introduction-to-hadoop-streaming.html
Hadoop排序工具用法小結:http://www.dreamingfish123.info/?p=1102
Hadoop壓縮選項權衡:https://www.slideshare.net/Hadoop_Summit/singh-kamat-june27425pmroom210c