主要介紹用DataJoin類來鏈接多數據源,先看一下例子,假設二個數據源customs和orders
customer ID Name PhomeNumber
1 趙一 025-5455-566
2 錢二 025-4587-565
3 孫三 021-5845-5875
客戶的訂單號:
Customer ID order ID Price Data
2 1 93 2008-01-08
3 2 43 2012-01-21
1 3 43 2012-05-12
2 4 32 2012-5-14
問題:現在要生成訂單
customer ID name PhomeNumber Price Data
2 錢二 025-4587-565 93 2008-01-08
上面是一個例子,下面介紹一下hadoop中DataJoin類具體的做法。
首先,需要為不同數據源下的每個數據定義一個數據標簽,這一點不難理解,就是標記數據的出處。
其次,需要為每個待鏈接的數據記錄確定一個鏈接主鍵,這一點不難理解。DataJoin類庫分別在map階段和Reduce階段提供一個處理框架,盡可能幫助程序員完成一些處理的工作,僅僅留下一些必須工作,由程序完成。
Map階段
DataJoin類庫里有一個抽象基類DataJoinMapperBase,該基類實現了map方法,該方法為對每個數據源下的文本的記錄生成一個帶表見的數據記錄對象。但是程序必須指定它是來自於哪個數據源,即Tag,還要指定它的主鍵是什么即GroupKey。如果指定了Tag和GroupKey,那么map將會生成一下的記錄,customer表為例
customers 1 趙一 025-5455-566; customers 2 錢二 025-4587-565;
Map過程中Tag和GroupKey都是程序員給定,所以要肯定要就有接口供程序員去實現,DataJoinMapperBase實現下面3個接口。
abstract Text gernerateInputTag(String inuptFile), 看方法名就知道是設置Tag。
abstract Text generateGroupKey(TaggedMapOutput lineRecord), 該方法是設置GroupKey,其中,lineRecord是數據源中的一行數據,該方法可以在這一行數據上設置任意的GroupKey為主鍵。
abstract TaggedMapOutput generateMapOutput(object value), 該抽象方法用於把數據源中的原始數據記錄包裝成一個帶標簽的數據源。TaggedMapOutputs是一行記錄的數據類型。代碼如下:
import org.apache.hadoop.contrib.utils.join.*; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.io.Text; public class MapClass extends DataJoinMapperBase{ @Override protected Text generateGroupKey(TaggedMapOutput arg0) { String line = ((Text)arg0.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } @Override protected Text generateInputTag(String arg0) { String dataSource = arg0.split("-")[0]; return new Text(dataSource); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object arg0) { TaggedWritable tw = new TaggedWritable((Text)arg0); tw.setTag(this.inputTag); return tw; } }
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; public class TaggedWritable extends TaggedMapOutput{ private Writable data; public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } @Override public void readFields(DataInput arg0) throws IOException { this.tag.readFields(arg0); this.data.readFields(arg0); } @Override public void write(DataOutput arg0) throws IOException { this.tag.write(arg0); this.data.write(arg0); } }
每個記錄的數據源標簽可以由generateInputTag()產生,通過setTag()方法設置記錄的Tag。
note:1.該記錄不是關系數據庫,是文本文件,2. TaggedMapOutput在import org.apache.hadoop.contrib.utils.join.*頭文件中,有的時候在eclipse下,每個這個頭文件,這時 只要找到你的hadoop的目錄下contrib/datajoin文件加,把jar文件導入eclipse中即可。
Reduce 階段
DataJoinReduceBase中已經實現reduce()方法,具有同一GroupKey的數據分到同一Reduce中,通過reduce的方法將對來自不同的數據源和據用相同的GroupKey做一次叉積組合。這個比較難懂,舉個例子:
customers 2 錢二 025-4587-565; orders 2 1 93 2008-01-08; orders 2 4 32 2012-5-14 |
按照map()結果的數據,就是下表給出的結果(3個記錄),他們都有一個共同的GroupKey,帶來自於二個數據源,所以叉積的結果為
customers 2 錢二 025-4587-565 orders 2 1 93 2008-01-08 |
customers 2 錢二 025-4587-565 orders 2 4 32 2012-5-14 |
如果Reduce階段看懂了,基本上這個就搞定了,Reduce是系統做的,不需要用戶重載,接下來的工作就是要實現一個combine()函數,它的作用是將每個叉積合並起來,形成訂單的格式。
代碼如下:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceClass extends DataJoinReducerBase{ @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { if(tags.length<2)return null; StringBuffer joinData = new StringBuffer(); int count=0; for(Object value: values){ joinData.append(","); TaggedWritable tw = (TaggedWritable)value; String recordLine = ((Text)tw.getData()).toString(); String[] tokens = recordLine.split(",",2); if(count==0) joinData.append(tokens[0]); joinData.append(tokens[1]); } TaggedWritable rtv = new TaggedWritable(new Text(new String(joinData))); rtv.setTag((Text)tags[0]); return rtv; } public static void main(String[] args){ Configuration conf = new Configuration(); JobConf job = new JobConf(conf, ReduceClass.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); } }
作者:BIGBIGBOAT/Liqizhou