MapReduce(三) 典型場景(一)


一、mapreduce多job串聯

   1、需求   

一個稍復雜點的處理邏輯往往需要多個 mapreduce 程序串聯處理,多 job 的串聯可以借助 mapreduce 框架的 JobControl 實現

   2、實例

以下有兩個 MapReduce 任務,分別是 Flow SumMR SortMR,其中有依賴關系: SumMR 的輸出是 SortMR 的輸入,所以 SortMR 的啟動得在 SumMR 完成之后 

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job jobsum = Job.getInstance(conf);
jobsum.setJarByClass(RunManyJobMR.class);
jobsum.setMapperClass(FlowSumMapper.class);
jobsum.setReducerClass(FlowSumReducer.class);
jobsum.setMapOutputKeyClass(Text.class);
jobsum.setMapOutputValueClass(Flow.class);
jobsum.setCombinerClass(FlowSumReducer.class);
jobsum.setOutputKeyClass(Text.class);
jobsum.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(jobsum, "d:/flow/input");
FileOutputFormat.setOutputPath(jobsum, new Path("d:/flow/output12"));
Job jobsort = Job.getInstance(conf); jobsort.setJarByClass(RunManyJobMR.class); jobsort.setMapperClass(FlowSortMapper.class); jobsort.setReducerClass(FlowSortReducer.class); jobsort.setMapOutputKeyClass(Flow.class); jobsort.setMapOutputValueClass(Text.class); jobsort.setOutputKeyClass(NullWritable.class); jobsort.setOutputValueClass(Flow.class); FileInputFormat.setInputPaths(jobsort, "d:/flow/output12"); FileOutputFormat.setOutputPath(jobsort, new Path("d:/flow/sortoutput12"));
ControlledJob sumcj = new ControlledJob(jobsum.getConfiguration()); ControlledJob sortcj = new ControlledJob(jobsort.getConfiguration()); sumcj.setJob(jobsum); sortcj.setJob(jobsort); // 設置作業依賴關系 sortcj.addDependingJob(sumcj); JobControl jc = new JobControl("flow sum and sort"); jc.addJob(sumcj); jc.addJob(sortcj); Thread jobThread = new Thread(jc); jobThread.start(); while(!jc.allFinished()){ Thread.sleep(500); } jc.stop(); }

二、topn算法實現——自定義GroupComparator

    1、需求

   在統計學生成績的小項目中,現在有一個需求:
   求出每個班參考學生成績最高的學生的信息,班級,姓名和平均分

   2、分析

(1)利用“班級和平均分”作為 key,可以將 map 階段讀取到的所有學生成績數據按照班級 和成績排倒序,發送到 reduce
(2)在 reduce 端利用 GroupingComparator 將班級相同的 kv 聚合成組,然后取第一個即是最 大值
   3、實現

 數據類似於

computer	huangxiaoming	85	86	41	75	93	42	85
computer	xuzheng	54	52	86	91	42
computer	huangbo	85	42	96	38	
english	zhaobenshan	54	52	86	91	42	85	75
english	liuyifei	85	41	75	21	85	96	14
algorithm	liuyifei	75	85	62	48	54	96	15
computer	huangjiaju	85	75	86	85	85
english	liuyifei	76	95	86	74	68	74	48

  第一步:先把分組和排序字段都綜合到一個自定義對象里

package com.ghgj.mr.topn;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class ClazzScore implements WritableComparable<ClazzScore>{
private String clazz;
private Double score;
public String getClazz() {
return clazz;
}
public void setClazz(String clazz) {
this.clazz = clazz;
}
public Double getScore() {
return score;
}
public void setScore(Double score) {
this.score = score;
}
public ClazzScore(String clazz, Double score) {
super();
this.clazz = clazz;
this.score = score;
}
public ClazzScore() {
super();
// TODO Auto-generated constructor stub
}
@Override
public String toString() {
return clazz + "\t" + score;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(clazz);
out.writeDouble(score);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.clazz = in.readUTF();
this.score = in.readDouble();
}
/**
* key 排序
*/
@Override
public int compareTo(ClazzScore cs) {
int it = cs.getClazz().compareTo(this.clazz);
if(it == 0){
return (int) (cs.getScore() - this.score);
}else{
return it;
}
}
}

  第二步:編寫排序之后的 ClazzScore 數據傳入 ReduceTask 的分組規則

package com.ghgj.mr.topn;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class ClazzScoreGroupComparator extends WritableComparator{
ClazzScoreGroupComparator(){
super(ClazzScore.class, true);
}
/**
* 決定輸入到 reduce 的數據的分組規則
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
ClazzScore cs1 = (ClazzScore)a;
ClazzScore cs2 = (ClazzScore)b;
int it = cs1.getClazz().compareTo(cs2.getClazz());
return it;
}
}

  第三步:編寫mapreduce程序

package com.ghgj.mr.topn;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* TopN 問題
*/
public class ScoreTop1MR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ScoreTop1MR.class);
job.setMapperClass(ScoreTop1MRMapper.class);
job.setReducerClass(ScoreTop1MRReducer.class);
job.setOutputKeyClass(ClazzScore.class);
job.setOutputValueClass(DoubleWritable.class);
// 設置傳入 reducer 的數據分組規則
job.setGroupingComparatorClass(ClazzScoreGroupComparator.class);
FileInputFormat.setInputPaths(job, "d:/score_all/input");
Path p = new Path("d:/score_all/output1");
FileSystem fs = FileSystem.newInstance(conf);
if(fs.exists(p)){
fs.delete(p, true);
}
FileOutputFormat.setOutputPath(job, p);
boolean status = job.waitForCompletion(true);
System.exit(status ? 0 : 1);
}
static class ScoreTop1MRMapper extends Mapper<LongWritable, Text, ClazzScore,
DoubleWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String[] splits = value.toString().split("\t");
ClazzScore cs = new ClazzScore(splits[0], Double.parseDouble(splits[2]));
context.write(cs, new DoubleWritable(Double.parseDouble(splits[2])));
}
}
static class ScoreTop1MRReducer extends Reducer<ClazzScore, DoubleWritable, ClazzScore,
DoubleWritable>{
@Override
protected void reduce(ClazzScore cs, Iterable<DoubleWritable> scores, Context
context) throws IOException, InterruptedException {
// 按照規則,取每組的第一個就是 Top1
context.write(cs, scores.iterator().next());
}
}
}

 三、Mapreduce全局計數器

     1、需求

在實際生產代碼中,常常需要將數據處理過程中遇到的不合規數據行進行全局計數,類似這 種需求可以借助 MapReduce 框架中提供的全局計數器來實現
     2、實例

以下是一個利用全局計數器來統計一個目錄下所有文件出現的單詞總數和總行數

   

package com.ghgj.mr.counter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
	
	enum MyWordCounter{COUNT_LINES,COUNT_WORD}
//	enum Weekday{MONDAY, TUESDAY, WENSDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY}

	public static void main(String[] args) throws Exception {
		// 指定hdfs相關的參數
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		// 設置jar包所在路徑
		job.setJarByClass(WordCount.class);
		job.setMapperClass(WCMapper.class);
		job.setReducerClass(WCReducer.class);
		
		// 指定reducetask的輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		// 本地路徑
		Path inputPath = new Path("d:/wordcount/input");
		Path outputPath = new Path("d:/wordcount/output");
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 最后提交任務
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion?0:1);
	}
	
	private static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			
//			COUNT_LINES++;
			context.getCounter(MyWordCounter.COUNT_LINES).increment(1L);
			
			// 在此寫maptask的業務代碼
			String[] words = value.toString().split(" ");
			for(String word: words){
				context.write(new Text(word), new LongWritable(1));
				context.getCounter(MyWordCounter.COUNT_WORD).increment(1L);
			}
		}
	}
	
	private static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		@Override
		protected void reduce(Text key, Iterable<LongWritable> values, Context context)
				throws IOException, InterruptedException {
			// 在此寫reducetask的業務代碼
			long sum = 0;
			for(LongWritable v: values){
				sum += v.get();
			}
			context.write(key, new LongWritable(sum));
		}
	}
}

  或者:另一種情況

package com.ghgj.mr.counter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CounterWordCount {
enum CouterWordCountC{COUNT_WORDS, COUNT_LINES}
public static void main(String[] args) throws Exception {
// 指定 hdfs 相關的參數
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設置 jar 包所在路徑
job.setJarByClass(CounterWordCount.class);
job.setMapperClass(WCCounterMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 本地路徑
Path inputPath = new Path("d:/wordcount/input");
FileInputFormat.setInputPaths(job, inputPath);
job.setNumReduceTasks(0);
Path outputPath = new Path("d:/wordcount/output");
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
// 最后提交任務
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1);
}
private static class WCCounterMapper extends Mapper<LongWritable, Text, Text,
LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 統計行數,因為默認讀取文本是逐行讀取,所以 map 執行一次,行數+1
context.getCounter(CouterWordCountC.COUNT_LINES).increment(1L);
String[] words = value.toString().split(" ");
for(String word: words){
// 統計單詞總數,遇見一個單詞就+1
context.getCounter(CouterWordCountC.COUNT_WORDS).increment(1L);
}
}
}
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM