Hadoop常用操作匯總


Hadoop Streaming示例程序(wordcount)

run_hadoop_word_counter.sh

$HADOOP_BIN streaming \
    -input "${INPUT}" \
    -output "${OUT_DIR}" \
    -cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \
    -file "mapper_word_counter.py" \                                                                                                                                         
    -file "reducer_word_counter.py" \
    -file "filter_word_counter.py" \
    -mapper "./python2.7.2/bin/python mapper_word_counter.py" \
    -combiner "./python2.7.2/bin/python reducer_word_counter.py" \
    -reducer "./python2.7.2/bin/python reducer_word_counter.py" \
    -jobconf abaci.job.base.environment="centos6u3_hadoop" \
    -jobconf mapred.job.priority="NORMAL" \
    -jobconf mapred.job.name="${TASK_NAME}" \
    -jobconf mapred.map.tasks="${MAP_NUM}" \
    -jobconf mapred.reduce.tasks="${REDUCE_NUM}" \
    -jobconf mapred.map.memory.limit="1000" \
    -jobconf mapred.reduce.memory.limit="1000" \
    -jobconf mapred.job.map.capacity="3000" \
    -jobconf mapred.job.reduce.capacity="2500" \
    -jobconf mapred.job.keep.files.hours=12 \
    -jobconf mapred.max.map.failures.percent=1 \
    -jobconf mapred.reduce.tasks.speculative.execution="false"

mapper_word_counter.py

import sys 

for line in sys.stdin:
    fields = line.strip().split('\t')
    try:
        cnt = 1                                                                                                                                                              
        dateval = fields[1]
        sys.stdout.write('%s\t%d\n' %(dateval, cnt))
    except Exception as exp:
        sys.stderr.write("exp:%s, %s" %(str(exp), line))

reducer_word_counter.py

import sys 

word_pre = None
counter_pre = 0 

for line in sys.stdin:
    try:
        word, cnt  = line.strip().split('\t')                                                                                                                                
        cnt = int(cnt)
    except Exception as exp:
        sys.stderr.write('Exp:%s,line:%s' %(str(exp), line.strip()))
        continue

    if word == word_pre:
        counter_pre += cnt 
    else:
        if word_pre:
            print('%s\t%d' %(word_pre, counter_pre))
        word_pre = word
        counter_pre = cnt 

if word_pre:
    print('%s\t%d' %(word_pre, counter_pre))

純文本輸入格式

  • 每個mapper輸入若干行
    -inputformat "org.apache.hadoop.mapred.TextInputFormat" \
  • 指定每個mapper輸入的行數
    -inputformat "org.apache.hadoop.mapred.lib.NLineInputFormat"
    -jobconf mapred.line.input.format.linespermap="5" \

注意:輸入給mapper的內容會在每行前新增一行偏移的數字

文件分發方式:

-file將客戶端本地文件打成jar包上傳到HDFS然后分發到計算節點;
-cacheFile將HDFS文件分發到計算節點;
-cacheArchive將HDFS壓縮文件分發到計算節點並解壓;

分桶&排序

Hadoop默認會把map輸出行中遇到的第一個分隔符(默認為\t)前面的部分作為key,后面的作為value,如果輸出行中沒有指定的分隔符,則整行作為key,value被設置為空字符串。mapper輸出的key經過partition分發到不同的reduce里。

  • 應用示例
${HADOOP_BIN} streaming \
    -input "${INPUT}" \
    -output "${OUT_DIR}" \
    -mapper cat \
    -reducer cat \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -jobconf stream.num.map.output.key.fields=4 \
    -jobconf stream.map.output.field.separator=. \
    -jobconf map.output.key.field.separator=. \
    -jobconf mapred.text.key.partitioner.options=-k1,2 \
    -jobconf mapred.text.key.comparator.options="-k3,3 -k4nr" \
    -jobconf stream.reduce.output.field.separator=. \
    -jobconf stream.num.reduce.output.key.fields=4 \
    -jobconf mapred.reduce.tasks=5

說明:

  • 設定mapper輸出的key
    stream.map.output.field.separator 設置map輸出的字段分隔符
    stream.num.map.output.key.fields 設置map輸出的前幾個字段作為key
  • 設定根據key進行分桶的規則
    org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner partition類
    map.output.key.field.separator 設置key內的字段分隔符(KeyFieldBasedPartitioner和KeyFieldBasedComparator所特有)
    num.key.fields.for.partition 設置key內前幾個字段用來做partition
    mapred.text.key.partitioner.options 可單獨指定key中哪些字段做partition,和num.key.fields.for.partition一起使用以num.key.fields.for.partition為准
  • 設定根據key進行排序的規則
    KeyFieldBasedComparator 可靈活設置的高級比較器,默認使用Text的基於字典序或者通過-n來基於數字比較
    mapred.text.key.comparator.options 設置key中需要比較的字段或字節范圍
  • 設定reducer輸出的key
    stream.reduce.output.field.separator 設置reduce輸出的字段分隔符
    stream.num.reduce.output.key.fields 設置reduce輸出的前幾個字段作為key

多路輸出

Hadoop支持多路輸出,可以將MapReduce的處理數據輸出到多個part-xxxxx-X文件中(X是A-Z共26個字母中的一個)。程序需要在maper(正對僅有mapper的MR任務)/reducer(針對包含reducer的任務)程序中將輸出形式由<key,value>變為<key, value#X>,以便輸出特定后綴的文件中。其中#X僅僅用做指定輸出文件后綴, 不會出現在輸出內容中。
啟動腳本中需要指定
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat
或者
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat

  • 應用示例
    run_hadoop.sh
${HADOOP_BIN} streaming \
    -input "${INPUT}" \
    -output "${OUT_DIR}" \
    -cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \
    -file "mapper_worker.sh" \
    -file "reducer_worker.py" \
    -mapper "sh mapper_worker.sh" \
    -reducer "python2.7.2/bin/python reducer_worker.py" \
    -inputformat "org.apache.hadoop.mapred.TextInputFormat" \
    -outputformat "org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" \
    -jobconf mapred.job.priority="NORMAL" \
    -jobconf mapred.job.name="${TASK_NAME}" \
    -jobconf mapred.map.tasks="${MAP_NUM}" \
    -jobconf mapred.reduce.tasks="${REDUCE_NUM}" \
    -jobconf mapred.max.split.size=134217728 \
    -jobconf mapred.map.memory.limit="800" \
    -jobconf mapred.reduce.memory.limit="500" \
    -jobconf mapred.job.map.capacity="3500" \
    -jobconf mapred.job.reduce.capacity="2000" \
    -jobconf mapred.job.keep.files.hours=12 \
    -jobconf mapred.max.map.failures.percent=1 \
    -jobconf mapred.reduce.tasks.speculative.execution="false"

reducer_worder.py

for line in sys.stdin:
    record = line.strip()
    fields = record.split('\t')
    if len(fields) != 7:
        continue
    vcpurl, playurl, title, poster, duration, pubtime, accept = fields
    duration = int(duration)
    pubtime = int(pubtime)
    accept = int(accept)
    if duration < 60:
        sys.stdout.write('%s#A\n' %(record))
    elif duration < 300:
        sys.stdout.write('%s#B\n' %(record))
    else:
        sys.stdout.write('%s#C\n' %(record))

本地調試

為避免在啟動MR任務后才發現程序bug,最好提前在本地模擬MR的運行流程,驗證結果是否符合預期

cat inputfile | ./mapper_task.sh | sort -t$'\t' -k1,1 | ./reducer.sh

壓縮輸出

Hadoop默認支持gzip壓縮, streaming作業中指定以下參數即可使輸出以gzip形式壓縮.

-D mapreduce.output.fileoutputformat.compress=true
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

Hadoop 是可自行讀取gzip壓縮的數據,無需特殊指明輸入是 Gzip 壓縮。Gzip 的特點是壓縮比較高,Hadoop 原生支持,缺點是壓縮效率並不是很高,壓縮比和效率不可兼得,需要考慮其他壓縮方式。

Hadoop常用配置項

配置名 說明
abaci.job.base.environment centos6u3_hadoop 如果系統環境需要升級,可以指定為 centos6u3_hadoop 支持更高版本的 glibc
stream.memory.limit 單個map/reduce最高使用內存,默認800M
mapred.map.memory.limit 單個map最高使用內存,優先級高於stream.memory.limit
mapred.reduce.memory.limit 單個reduce最高使用內存,優先級高於stream.memory.limit
mapred.map.capacity.per.tasktracker 每台機器最多同時啟動map個數
mapred.reduce.capacity.per.tasktracker 每台機器最多同時啟動reduce個數
mapred.job.map.capacity map並發數目
mapred.job.reduce.capacity reduce並發數目
abaci.job.map.max.capacity map並發限制,默認10000
abaci.job.reduce.max.capacity reduce並發限制,默認3000
mapred.map.tasks map數目
mapred.reduce.tasks reduce數目
mapred.job.reuse.jvm.num.tasks 1表示不reuse,-1表示無限reuse,其他數值表示每個jvm reuse次數。reuse的時候,map結束時不會釋放內存
mapred.compress.map.output 指定map的輸出是否壓縮。有助於減小數據量,減小io壓力,但壓縮和解壓有cpu成本,需要慎重選擇壓縮算法
mapred.map.output.compression.codec map輸出的壓縮算法
mapred.output.compress reduce輸出是否壓縮
mapred.output.compression.codec 控制mapred的輸出的壓縮的方式
io.compression.codecs 壓縮算法
mapred.max.map.failures.percent 容忍map錯誤百分比,默認為0
mapred.max.reduce.failures.percent 容忍reduce錯誤百分比,默認為0
stream.map.output.field.separator map輸出分隔符,默認Tab
stream.reduce.output.field.separator reduce輸出分隔符,默認Tab
mapred.textoutputformat.separator 設置TextOutputFormat的輸出key,value分隔符,默認Tab
mapred.textoutputformat.ignoreseparator 設置為true后,當只有key沒有value會去掉自動補上的Tab
mapred.min.split.size 指定map最小處理數據量,單位B
mapred.max.split.size 指定map最多處理數據量,單位B,同時設置inputformat=org.apache.hadoop.mapred.CombineTextInputFormat
mapred.combine.input.format.local.only 是否只合並本節點,默認true,設置為false可以跨節點合並數據
abaci.job.map.cpu.percent map消耗cpu占比,參數默認值40(表示1個cpu的40%,即0.4個cpu)
abaci.job.reduce.cpu.percent reduce消耗cpu占比,參數默認值40(表示1個cpu的40%,即0.4個cpu)
mapred.map.capacity.per.tasktracker 表示每個節點最多並行跑幾個該job的map任務(請根據內存情況適當增減該參數,默認是8)
mapred.reduce.capacity.per.tasktracker 表示每個節點最多並行跑幾個該job的reduce任務(請根據內存情況適當增減該參數,默認是8)
mapred.map.tasks.speculative.execution 開啟map預測執行,默認true
mapred.reduce.tasks.speculative.execution 開啟reduce預測執行,默認true

Hadoop環境下系統變量

  • 變量名列表
變量名 變量說明
HADOOP_HOME 計算節點上配置的Hadoop路徑
LD_LIBRARY_PATH 計算節點上加載庫文件的路徑列表
PWD 當前工作目錄
dfs_block_size 當前設置的HDFS文件塊大小
map_input_file mapper正在處理的輸入文件路徑
mapred_job_id 作業ID
mapred_job_name 作業名
mapred_tip_id 當前任務的第幾次重試
mapred_task_id 任務ID
mapred_task_is_map 當前任務是否為map
mapred_output_dir 計算輸出路徑
mapred_map_tasks 計算的map任務數
mapred_reduce_tasks 計算的reduce任務數

https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Configured+Parameters

  • 應用示例:
    Shell版
#!/bin/bash

set -o pipefail
HOST="localhost"
PORT=$((1000 + ${mapred_task_partition}))

awk '{print $2}' \
    | ./access_remote_data ${HOST} ${PORT} outdata.gz
                                                                                                                                                     
hdfs_outfile=${mapred_work_output_dir}/${mapred_task_partition}.pack
cat outdata.gz \
    | gzip -d \
    | python ../postprocess.py
    | ${HADOOP_HOME}/bin/hadoop fs -D hadoop.job.ugi="username,pwd" -copyFromLocal - ${hdfs_outfile}

Python版

import os

input_file = os.environ['mapreduce_map_input_file']
#do something else

References

Hadoop Streaming相關官方文檔:https://hadoop.apache.org/docs/r3.1.2/hadoop-streaming/HadoopStreaming.html
Hadoop Streaming入門:http://icejoywoo.github.io/2015/09/28/introduction-to-hadoop-streaming.html
Hadoop排序工具用法小結:http://www.dreamingfish123.info/?p=1102
Hadoop壓縮選項權衡:https://www.slideshare.net/Hadoop_Summit/singh-kamat-june27425pmroom210c


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM