JobControl管理多job依賴完整示例


處理 復雜的要求的時候,有時一個mapreduce程序是完成不了的,往往需要多個mapreduce程序,這個時候就要牽扯到各個任務之間的依賴關系,所謂 依賴就是一個MR Job 的處理結果是另外的MR 的輸入,以此類推,完成幾個mapreduce程序,得到最后的結果

下面是用Mapreduce寫的tf-idf算法微博關鍵字廣告推送案例,總共三個job,貼出完整代碼。

第一個job代碼如下:

FirstMapper

import java.io.IOException;
import java.io.StringReader;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
/**
* 第一個MR,計算TF和計算
* @author root
*
*/
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {

String [] v=value.toString().trim().split("\t");//制表符隔開
if(v.length>=2){

String id=v[0].trim();
String content=v[1];
StringReader sr=new StringReader(content);
IKSegmenter ikSegmenter=new IKSegmenter(sr, true);
Lexeme word=null;
while((word=ikSegmenter.next())!=null){
String w=word.getLexemeText();
context.write(new Text(w+"_"+id),new IntWritable(1) );//某個詞出現一次,輸出1
}
context.write(new Text("count"), new IntWritable(1));//微博條數,每讀一條輸出1
}else {
System.out.println(value.toString()+"-----------------");
}
}


}

FirstPartition

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
* 第一個map自定義分區
* @author root
*
*/
public class FirstPartition extends HashPartitioner<Text, IntWritable> {

@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
if(key.equals(new Text("count"))){
return 3;//代表第四個區,從0開始
}else{

return super.getPartition(key, value, numReduceTasks-1);
}

}

}

 

FirstReduce

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
* C1_001,2 c1在001的微博中出現2次
* C2_002,1
* count ,1000 微博條數
* @author root
*
*/
public class FirstReduce extends Reducer<Text, IntWritable, Text,IntWritable >{

@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
int sum=0;
for(IntWritable i:arg1){
//按微博分組,累加每個詞在每條微博中出現的次數
sum=sum+i.get();
}
if(arg0.equals(new Text("count"))){
System.out.println(arg0.toString()+"-----"+sum);//微博總條數

}
arg2.write(arg0,new IntWritable(sum));
}


}

 

第二個job:

SecondMapper

 


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 統計df:詞在多少個微博中出現過
* @author root
*
*/
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {

//獲取當前mapper task 的數據片段
FileSplit fs=(FileSplit) context.getInputSplit();
if(!fs.getPath().getName().contains("part-r-00003")){//一個reduce一個文件,part-r-00003代表第四個分區生成的文件,文件內容為“count 微博條數”
//其余三個分區記錄了詞組在某條微博中出現的次數;
String[] v=value.toString().trim().split("\t");
if(v.length>=2){
String []ss=v[0].split("_");
if(ss.length>=2){
String word=ss[0];
context.write(new Text(word), new IntWritable(1));//詞在微博中出現n次,每一條輸出1
}
}else{
System.out.println(value.toString()+"---------------");

}

}

}

}

ScondReduce

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoReduce extends Reducer<Text, IntWritable,Text,IntWritable> {

@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
int sum=0;
for(IntWritable i:arg1){
sum=sum+i.get();
}
arg2.write(arg0, new IntWritable(sum));
}

}

第三個job:

ThirdMapper

import java.io.BufferedReader;

import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/*最后計算
* @author root
*
*/

public class LastMapper extends Mapper<LongWritable,Text,Text,Text> {
// 存放微博總數
public static Map<String,Integer> cmap=null;
//存放df
public static Map<String,Integer> df=null;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
FileSplit fs=(FileSplit) context.getInputSplit();
if(!fs.getPath().getName().endsWith("part-r-00003")){
String[] v = value.toString().trim().split("\t");
if(v.length>=2){
int tf=Integer.parseInt(v[1].trim());
String []ss=v[0].split("_");
if(ss.length>=2){

String word=ss[0];
String uid=ss[1];
double s=tf*Math.log(cmap.get("count")/df.get(word));
NumberFormat nf=NumberFormat.getInstance();
nf.setMinimumIntegerDigits(5);
context.write(new Text(uid), new Text(word+":"+nf.format(s)));
}else{
System.out.println(value.toString()+"------");

}

}


}


}

// 在map方法執行之前
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
System.out.println("----------------");
if(cmap==null || cmap.size()==0 || df==null || df.size()==0){
URI [] ss=context.getCacheFiles();
if(ss!=null){
for(int i=0;i<ss.length;i++){
URI uri=ss[i];
if(uri.getPath().endsWith("part-r-00003")){//該文件存放的微博的總條數
Path path=new Path(uri.getPath());
BufferedReader br=new BufferedReader(new FileReader(path.getName()));
String line=br.readLine();
if(line.startsWith("count")){
String []ls=line.split("\t");//制表符左邊是count 右邊是總微博數
cmap=new HashMap<String,Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
}else if(uri.getPath().endsWith("part-r-00000")){//每個次在所有微博中出現的總次數
df=new HashMap<String,Integer>();
Path path=new Path(uri.getPath());
BufferedReader br=new BufferedReader(new FileReader(path.getName()));
String line;
while((line=br.readLine())!=null){
String []ls=line.split("\t");
df.put(ls[0],Integer.parseInt(ls[1].trim()));
}
br.close();
}

}

}

}

}

}

 

ThirdReduce

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class LastReduce extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
throws IOException, InterruptedException {
StringBuffer sb=new StringBuffer();

for(Text i:arg1){
sb.append(i.toString()+"\t");
}
arg2.write(arg0, new Text(sb.toString()));
}
}

 以下是JobControl代碼:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class StarJob {
// 啟動函數
public static void main(String[] args) throws IOException {
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://node4:8020");
FileSystem fs=FileSystem.get(config);
JobConf conf = new JobConf(StarJob.class);
// 第一個job的配置
@SuppressWarnings("deprecation")
Job job1 = new Job(conf, "join1");
job1.setJarByClass(StarJob.class);
job1.setJobName("weibo1");
job1.setMapperClass(FirstMapper.class);
job1.setCombinerClass(FirstReduce.class);
job1.setReducerClass(FirstReduce.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job1.setNumReduceTasks(4);
job1.setPartitionerClass(FirstPartition.class);
// 加入控制容器
ControlledJob ctrljob1 = new ControlledJob(conf);
ctrljob1.setJob(job1);
// job1的輸入輸出文件路徑
FileInputFormat.addInputPath(job1, new Path("hdfs://node4:8020/usr/input/tf-idf"));
Path path1=new Path("hdfs://node4:8020/usr/output/weibo1");
if(fs.exists(path1)){
fs.delete(path1,true);
}
FileOutputFormat.setOutputPath(job1, path1);
// 第二個作業的配置
@SuppressWarnings("deprecation")
Job job2 = new Job(conf, "Join2");
job2.setJarByClass(StarJob.class);
job2.setJobName("weibo2");
job2.setMapperClass(TwoMapper.class);
job2.setCombinerClass(TwoReduce.class);
job2.setReducerClass(TwoReduce.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
// 作業2加入控制容器
ControlledJob ctrljob2 = new ControlledJob(conf);
ctrljob2.setJob(job2);
// 設置多個作業直接的依賴關系
// 如下所寫:
// 意思為job2的啟動,依賴於job1作業的完成
ctrljob2.addDependingJob(ctrljob1);
// 輸入路徑是上一個作業的輸出路徑,因此這里填path1,要和上面對應好
FileInputFormat.addInputPath(job2, path1);
// 輸出路徑從新傳入一個參數,這里需要注意,因為我們最后的輸出文件一定要是沒有出現過得
// 因此我們在這里new Path(args[2])因為args[2]在上面沒有用過,只要和上面不同就可以了
Path path2=new Path("hdfs://node4:8020/usr/output/weibo2");
if(fs.exists(path2)){
fs.delete(path2,true);
}
FileOutputFormat.setOutputPath(job2, path2);
//第三個作業的配置
@SuppressWarnings("deprecation")
Job job3=new Job(conf,"join3");
job3.setJarByClass(StarJob.class);
job3.setJobName("weibo3");

// DistributedCache.addCacheFile(uri, conf);
//2.5
//把微博總數加載到內存
// job3.addCacheFile(new Path("hdfs:node4:8020/usr/output/weibo1/part-r-00003").toUri());
//把df加載到內存
// job3.addCacheFile(new Path("hdfs:node4:8020/usr/output/weibo2/part-r-00000").toUri());

job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setMapperClass(LastMapper.class);
job3.setReducerClass(LastReduce.class);

// 作業3加入控制容器
ControlledJob ctrljob3 = new ControlledJob(conf);
ctrljob3.setJob(job3);
ctrljob3.addDependingJob(ctrljob2);

FileInputFormat.addInputPath(job3, path2);
Path path3=new Path("hdfs://node4:8020/usr/output/weibo3");
if(fs.exists(path3)){
fs.delete(path3,true);
}
FileOutputFormat.setOutputPath(job3, path3);

// 主的控制容器,控制上面的總的3個子作業
JobControl jobCtrl = new JobControl("myctrl");

// 添加到總的JobControl里,進行控制
jobCtrl.addJob(ctrljob1);
jobCtrl.addJob(ctrljob2);
jobCtrl.addJob(ctrljob3);
// 在線程啟動,記住一定要有這個
Thread t = new Thread(jobCtrl);
t.start();

while (true) {

if (jobCtrl.allFinished()) {// 如果作業成功完成,就打印成功作業的信息
System.out.println(jobCtrl.getSuccessfulJobList());
System.out.println("所有job執行完畢");
jobCtrl.stop();
break;
}
}

}
}

執行成功后文件下圖所示

控制台打印:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM