首先明確:
1.Hadoop不支持全局變量,也不建議使用全局變量。
我的理解是,這是因為hadoop具有map類和reducer類,並且不同的task一般執行的是不同的map或reduce。所以全局變量是無法傳遞的。(但是一般情況下,我們也許會需要一個對於所有map和reduce都能訪問的全局變量),暫時我知道的解決方法如下:
2.如果Mapper類和Reducer類都是主類的內部類,可以在主類中使用 private static string global = "global variable";
但是這種方法不通用,因為Mapper類和Reducer類在概念上是與主類無關的。很多情況下,他們不是主類的內部類。所以該task所擁有的信息,只能從context上下文中獲得,這也正體現了常說的:充足的上下文信息才是關鍵!
修正:Mapper類(或Reducer類) 使用 主類名.global 也可以獲取全局變量。。
3.設置xml文件,然后在map函數中讀取即可;
這種方法有一個缺點,值只能從客戶端傳遞到Mapper中,reduce不能讀出來。實際上reducer所知道的只有context信息。
4.直接在Configuration中設置屬性值,然后讀取這個屬性值,在Mapper或Reducer中使用變量來存儲這個值即可。
可以看出這種方式最為麻煩。可以將各個屬性值放入文件中,然后在文件讀取即可。
關鍵代碼如下:
//初始化configuration后,使用
conf.set("propertyName“,properyValue);
//在mapper或reducer中,
Configuration conf = context.getConfiguration();
String g = conf.get("propertyName");
//g作為可以使用的全局變量即可;
這里的文章講述了hadoop下數據的傳遞:http://blog.csdn.net/jackydai987/article/details/6441241
代碼如下
public class xml_test {
public static int getFileInt(String filename) //從文件中讀取預設值
{
int temp = 0;
Configuration config = new Configuration();
FSDataInputStream dis = null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileSystem hdfs = FileSystem.get(config);
dis = hdfs.open(new Path(filename));
IOUtils.copyBytes(dis, baos, 4096, false); //寫入ByteArrayOutputStream
String str = baos.toString(); //這里我假定只有一行,多行可以使用循環。
str = str.substring(0, str.length() - 1); //最后一個是回車,需要過濾掉,不然在整形轉換時會出錯
temp = Integer.parseInt(str);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally{
IOUtils.closeStream(dis);
}
return temp;
}
public static class wordcountMapper extends
Mapper<LongWritable, Text, Text, IntWritable>{
int temp2 = getFileInt("/user/jackydai/int"); //從文件中讀取預設的值
//這里也可以通過args參數傳一個文件進來,這樣更靈活
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
int temp1 = 0;
Configuration mapconf = context.getConfiguration();
temp1 = mapconf.getInt("count", 0); //map讀取值
IntWritable one = new IntWritable(temp1 + temp2); //求和后輸出,檢測是否正確
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while(itr.hasMoreElements()){
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class wordcountReduce extends
Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{
int sum = 0;
for (IntWritable str : values){
sum += str.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String args[])throws Exception{
Configuration conf = new Configuration();
conf.setInt("count", 2); //設置值為2,需要注意的是設置值需要在new job之前
Job job = new Job(conf, "xml_test");
job.setJarByClass(xml_test.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(wordcountMapper.class);
job.setReducerClass(wordcountReduce.class);
job.setCombinerClass(wordcountReduce.class);
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
}
}
