我開始使用如下方式進行傳遞.
示例:
main中:
Configuration conf = new Configuration();
Text maxscore = new Text("12989");
DefaultStringifier.store(conf, maxscore ,"maxscore");
這樣,Text對象maxscore就以“maxscore”作為key存儲在conf對象中了,然后在map和reduce函數中調用load的方法便可以把對象讀出。
mapper獲取:
Configuration conf = context.getConfiguration()
Text out = DefaultStringifier.load(conf, "maxscore", Text.class);
需要說明的是,這個需要傳遞的對象必須要先實現序列化的接口,Hadoop的序列化是通過Writable接口來實現的。
(2) 參考自:http://blog.sina.com.cn/s/blog_6b7cf18f0100x9jg.html
2. 編寫 Streaming 程序時,如何向map、reduce函數傳遞參數
可以通過 streaming 的 cmdenv 選項設置環境變量,然后在 map 和 reduce 腳本中獲取環境變量。
可參考 << hadoop streaming 高級編程 >>
http://dongxicheng.org/mapreduce/hadoop-streaming-advanced-programming/
(0) 作業提交腳本:
#!/usr/bin/env bash
max_read_count=${array[0]}
min_read_count=${array[1]}
max_write_count=${array[2]}
min_write_count=${array[3]}
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar \
-D mapred.reduce.tasks=1 \
-input $input \
-output $output \
-mapper $mapper_script \
-file $map_file \
-reducer $reducer_script \
-file $reduce_file \
-cmdenv "max_read_count=${array[0]}" \ # 設置環境變量 max_read_count .
-cmdenv "min_read_count=${array[1]}" \ # 多個變量時請多次使用 -cmdenv
-cmdenv "max_write_count=${array[2]}" \
-cmdenv "min_write_count=${array[3]}" \
(1) Python mapper.py
#!/usr/bin/env python
import sys
import os
min_r_count = float(os.environ.get('min_read_count')) # get environment variables.
max_r_count = float(os.environ.get('max_read_count'))
min_w_count = float(os.environ.get('min_write_count'))
max_w_count = float(os.environ.get('max_write_count'))
(2)Shell mapper.sh
#!/usr/bin/env bash
while read line # 讀入行
do
a=$line
done
echo $min_read_count $max_read_count # get environment variables.
(3)C/C++ mapper.c
#include <stdio.h>
#include <string.h>
int main(int argc, char *argv[], char *env[])
{
double min_r_count;
int i = 0;
for (i = 0; env[i] != NULL; i++) // env[i] 存儲了環境變量, 每項的值為此種形式: PATH=******, 所以需要截取變量值
{
if( strstr(env[i], "PATH=") ) {
char *p =NULL;
p = strstr(env[i], "=");
if( (p-env[i]) == 4 )
printf("%s\n", ++p); // 獲取 PATH 環境變量
}
if( strstr(env[i], "min_write_count=") ) {
char *p =NULL;
p = strstr(env[i], "=");
if( (p-env[i]) == strlen("min_write_count") )
printf("%s\n", ++p); // 獲取 min_write_count 環境變量
}
}
char eachLine[200]={0};
while(fgets(eachLine, 199, stdin)) // read line from stdin
{
printf("%s", eachLine);
}
}
注意:
(1) streaming 加載本地單個文件
streaming 支持 -file 選項, 可以把 -file 后面的本地文件(注意是本地文件)打包成作業提交的一部分, 即打包到作業的jar文件當中, 這樣在mapreduce腳本中就可以像訪問本地文件一樣訪問打包的文件了.
實例:
作業提交文件 run.sh

mapper.py

注意:在提交作業時使用的是 -file logs/wbscoretest.log 指定需要加載的文件. 在 map 腳本中只需要直接讀取文件 wbscoretest.log 即可, 不需要寫 logs/wbscoretest.log, 因為只加載了文件 wbscoretest.log, 而不會加載 logs 目錄和
wbscoretest.log 文件.
(2) streaming 加載本地多個文件

(3) streaming 加載本地目錄 ( 若加載多個目錄,用逗號隔開,-files dir1, dir2, dir3 )
使用streaming的 -file 選項不能加載本地目錄, 我實驗是如此.
我們可以使用 hadoop 的通用選項 -files 來加載本地目錄, 加載成功后在mapreduce腳本中可以像訪問本地目錄一樣訪問加載的目錄.
實際應用中,我們在編寫 分詞MapReduce作業時需要加載分詞詞典,就使用該方法.
作業提交腳本:

map 腳本: 讀取目錄下的文件.

加載多個目錄:

注意:多個目錄之間用逗號隔開,且不能有空格,否則會出錯,這個限制太蛋疼了。
例如:
(4) streaming編程時在mapreduce腳本中讀 hdfs 文件
使用 -files 選項, 后面跟需要讀的 hdfs 文件路徑. 這樣在 mapreduce 腳本中就可以直接通過文件名來訪問該文件.
作業提交腳本:

map腳本:

如果需要加載大文件, 我們可以將文件先上傳到 hdfs 中, 然后在 mapreduce 腳本中讀取 hdfs 文件.
(5) streaming編程時在mapreduce腳本中讀 hdfs 目錄
使用 -files 選項, 后面跟需要讀的 hdfs 目錄. 這樣在 mapreduce 腳本中就可以像訪問本地目錄一樣訪問該目錄.
作業提交腳本:

map腳本: 直接讀取 tmp_kentzhan 目錄.

