Hadoop 學習自定義數據類型


 

序列化在分布式環境的兩大作用:進程間通信,永久存儲。
 
Writable接口, 是根據 DataInput 和 DataOutput 實現的簡單、有效的序列化對象.
MR的任意Value必須實現Writable接口:
MR的key必須實現WritableComparable接口, WritableComparable繼承自Writable和Comparable接口
(本節先講自定義value值,下一節再講自定義key值,根據key值進行自定義排序)
以一個例子說明,自定義數據類型(例子來源於學習的課程):
原始數據是由若干條下面數據組成:
數據格式及字段順序如下:
日志格式.jpg
現在要做的工作是以“手機號碼”為關鍵字,計算同一個號碼的 upPackNumdownPackNum,upPayLoad,downPayLoad四個累加值
 
運用MapReduce解決問題思路:
1、框架將數據分成<k1,v1>,k1是位置標記,v1表示一行數據;
2、map函數輸入 <k1,v1>,輸入 <k2,v2>,k2是選定數據的第1列(從0開始),v2是自定義的數據類型,包含第六、七、八、九列封裝后的數據;
3、框架將<k2,v2>依據k2關鍵字進行map排序,然后進行combine過程,再進行Reduce段排序,得到<k2,list(v2...)>;
4、reduce函數處理 <k2,list(v2...)>,以k2為關鍵字,計算list的內容。
 
要自定義的數據類型是Value值,因此要繼承 Writable接口, 自定義數據類型如下

  

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;


public class TrafficWritable implements Writable {

	long upPackNum, downPackNum,upPayLoad,downPayLoad;
	
	
	public TrafficWritable() { //這個構造函數不能省,否則報錯
		super();
		// TODO Auto-generated constructor stub
	}

	public TrafficWritable(String upPackNum, String downPackNum, String upPayLoad,
			String downPayLoad) {
		super();
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void write(DataOutput out) throws IOException { //序列化
		// TODO Auto-generated method stub
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException { //反序列化
		// TODO Auto-generated method stub
		this.upPackNum=in.readLong();
		this.downPackNum=in.readLong();
		this.upPayLoad=in.readLong();
		this.downPayLoad=in.readLong();
	}

	@Override
	public String toString() { //不加toStirng函數,最后輸出內存的地址
		return upPackNum + "\t"+ downPackNum + "\t" + upPayLoad + "\t"
				+ downPayLoad;
	}

	
}

  

最后實現map函數和Reduce函數如下,基本框架和wordCount相同:
  
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TrafficCount {
	/**
	 * @author nwpulisz
	 * @date 2016.3.31
	 */
	static final String INPUT_PATH="hdfs://192.168.255.132:9000/input";
	static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output";
	
	public static void main(String[] args) throws Throwable {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Path outPut_path= new Path(OUTPUT_PATH);
		Job job = new Job(conf, "TrafficCount");
		
		//如果輸出路徑是存在的,則提前刪除輸出路徑
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		if(fileSystem.exists(outPut_path))
		{
			fileSystem.delete(outPut_path,true);
		}
		
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, outPut_path);
		
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TrafficWritable.class);
		job.waitForCompletion(true);
	}
	
	static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{

		protected void map(LongWritable k1, Text v1, 
                Context context) throws IOException, InterruptedException {
				String[] splits = v1.toString().split("\t");
				Text k2 = new Text(splits[1]);
				TrafficWritable v2 = new TrafficWritable(splits[6], splits[7], 
						splits[8], splits[9]);
				context.write(k2, v2);
			
		}
		
	}
	
	static class MyReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{

		protected void reduce(Text k2, Iterable<TrafficWritable> v2s, Context context
                ) throws IOException, InterruptedException {
			
			long upPackNum=0L, downPackNum=0L,upPayLoad=0L,downPayLoad=0L;
			for(TrafficWritable traffic: v2s) {
					upPackNum += traffic.upPackNum;
					downPackNum += traffic.downPackNum;
					upPayLoad += traffic.upPayLoad;
					downPayLoad += traffic.downPayLoad;
			}
			context.write(k2,new TrafficWritable(upPackNum+"",downPackNum+"",upPayLoad+"",
					downPayLoad+""));
			}		
	}
}

  

 
最終輸出結果如下:
結果.png
 
附實驗數據下載地址: https://yunpan.cn/cqcEy6QSzUEs7  訪問密碼 2fb1。數據來源:網易雲課堂hadoop大數據實戰






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM