一、背景
MapReduce提供了表連接操作其中包括Map端join、Reduce端join還有半連接,現在我們要討論的是Map端join,Map端join是指數據到達map處理函數之前進行合並的,效率要遠遠高於Reduce端join,因為Reduce端join是把所有的數據都經過Shuffle,非常消耗資源。
二、具體join
1、join的例子
比如我們有兩個文件,分別存儲 訂單信息:products.txt,和 商品信息:orders.txt ,詳細數據如下:
- products.txt:
//商品ID,商品名稱,商品類型(數字表示,我們假設有一個數字和具體類型的映射) p0001,xiaomi,001 p0002,chuizi,001
- orders.txt:
//訂單號,時間,商品id,購買數量 1001,20170710,p0001,1 1002,20170710,p0001,3 1003,20170710,p0001,3 1004,20170710,p0002,1
我們想象有多個商品,並有海量的訂單信息,並且存儲在多個 HDFS 塊中。
xiaomi,7 chuizi,1該怎么處理? 我們分析上面我們想要的結果,商品名稱和銷量,這兩個屬性分別存放到不同的文件中,那我們就要考慮 在一個地方(mapper)讀取這兩個文件的數據,並把數據在一個地方(reducer)進行結合。這就是 MapReduce 中的 Join 了。
-
- 代碼如下:
- Mapper:
public class joinMapper extends Mapper<LongWritable,Text,Text,Text> { private Text outKey=new Text(); private Text outValue=new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); FileSplit inputSplit = (FileSplit) context.getInputSplit(); String name = inputSplit.getPath().getName(); //兩個文件 在一個 mapper 中處理 //通過文件名判斷是那種數據 if(name.startsWith("a")){ //取商品ID 作為 輸出key 和 商品名稱 作為 輸出value,即 第0、1 的數據 outKey.set(split[0]); outValue.set("product#" + split[1]); context.write(outKey, outValue); }else{ //取商品ID 作為 輸出key 和 購買數量 作為 輸出value,即 第2、3 的數據 outKey.set(split[2]); outValue.set("order#" + split[3]); context.write(outKey, outValue); } } }
-
Reducer
public class joinReducer extends Reducer<Text,Text,Text,Text> { private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //用來存放:商品ID、商品名稱 List<String> productsList = new ArrayList<String>(); //用來存放:商品ID、購買數量 List<Integer> ordersList = new ArrayList<Integer>(); for (Text text:values){ String value = text.toString(); if(value.startsWith("product#")) { productsList.add(value.split("#")[1]); //取出 商品名稱 } else if(value.startsWith("order#")){ ordersList.add(Integer.parseInt(text.toString().split("#")[1].trim())); //取出商品的銷量 } } int totalOrders = 0; for (int i=0; i < productsList.size(); i++) { System.out.println(productsList.size()); for (int j=0; j < ordersList.size(); j++) { System.out.println(ordersList.size()); totalOrders += ordersList.get(j); } outValue.set(productsList.get(i) + "\t" + totalOrders ); //最后的輸出是:商品ID、商品名稱、購買數量 context.write(key, outValue); } } }
-
App:
public class App { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Path path = new Path("F:\\mr\\join\\out"); FileSystem fileSystem = path.getFileSystem(conf); if(fileSystem.isDirectory(path)){ fileSystem.delete(path,true); } Job job = Job.getInstance(conf); //設置job的各種屬性 job.setJobName("App"); //作業名稱 job.setJarByClass(App.class); //搜索類 job.setInputFormatClass(TextInputFormat.class); //設置輸入格式 job.setMapperClass(joinMapper.class); job.setReducerClass(joinReducer.class); //添加輸入路徑 FileInputFormat.addInputPath(job,new Path("F:\\mr\\join\\map")); //設置輸出路徑 FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\join\\out")); //map輸出類型 job.setOutputKeyClass(Text.class); // job.setOutputValueClass(Text.class); // job.waitForCompletion(true); } }
-
輸出結果
p0001 xiaomi 7 p0002 chuizi 1
- Mapper:
- 代碼如下:
2、 Map Join
一個數據集很大,另一個數據集很小(能夠被完全放進內存中),MAPJION會把小表全部讀入內存中,把小表拷貝多份分發到大表數據所在實例上的內存里,在map階段直接 拿另 外一個表的數據和內存中表數據做匹配,由於在map是進行了join操作,省去了reduce運行的效率會高很多;
適用於關聯表中有小表的情形;可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join並輸出最終結果,可以大大提高join操作的並發度,加快處理速度。並用distributedcache機制將小表的數據分發到每一個maptask執行節點,從而每一個maptask節點可以從本地加載到小表的數據,進而在本地即可實現join
-
- left outer join的左表必須是大表
- right outer join的右表必須是大表
- inner join左表或右表均可以作為大表
- full outer join不能使用mapjoin;
- mapjoin支持小表為子查詢,使用mapjoin時需要引用小表或是子查詢時,需要引用別名;在mapjoin中,可以使用不等值連接或者使用or連接多個條件;
1.2、 Map Join事例
- product表
p0001,xiaomi,001
p0002,chuizi,001 - orders表
1001,20170710,p0001,1
1002,20170710,p0001,3
1003,20170710,p0001,3
1004,20170710,p0002,1 - 期望輸出
xiaomi 1001,20170710,p0001,1
xiaomi 1002,20170710,p0001,3
xiaomi 1003,20170710,p0001,3
chuizi 1004,20170710,p0002,1 - 代碼實現
- Mapper
/** * 鏈接操作 map端鏈接 */ public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> { private Map<String,String> pdInfoMap =new HashMap<String,String>(); private Text keyOut=new Text(); /** * 通過閱讀父類Mapper的源碼,發現 setup方法是在maptask處理數據之前調用一次 可以用來做一些初始化工作 */ @Override protected void setup(Context context) { try { Configuration conf = context.getConfiguration(); FileSystem fs= null; fs = FileSystem.get(conf); FSDataInputStream fis = fs.open(new Path("file:/F:/mr/join/map/input/a.txt")); //得到緩沖區閱讀器 BufferedReader br = new BufferedReader(new InputStreamReader(fis)); String line=null; while((line=br.readLine())!=null){ String[] fields = line.split(","); pdInfoMap.put(fields[0],fields[1]); } fis.close(); } catch (IOException e) { e.printStackTrace(); } } // 由於已經持有完整的產品信息表,所以在map方法中就能實現join邏輯了 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //訂單信息 String orderline = value.toString(); String[] fields = orderline.split(","); String pName = pdInfoMap.get(fields[2]); keyOut.set(pName+"\t"+orderline); context.write(keyOut,NullWritable.get()); } }
- App
public class MapJoinApp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); //設置job的各種屬性 job.setJobName("MapJoinApp"); //作業名稱 job.setJarByClass(MapJoinApp.class); //搜索類 //添加輸入路徑 FileInputFormat.addInputPath(job,new Path("F:/mr/join/map/input/b.txt")); //設置輸出路徑 FileOutputFormat.setOutputPath(job,new Path("F:/mr/join/map/output")); job.setMapperClass(MapJoinMapper.class); //mapper類 //沒有reduce job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(NullWritable.class); // job.waitForCompletion(true); } }
- 輸出和期望輸出一致
3、Reduce端Join
- Reduce端連接比Map端連接更為普遍,因為輸入的數據不需要特定的結構,但是效率比較低,因為所有數據都必須經過Shuffle過程。
- 基本思路:
- Map端讀取所有的文件,並在輸出的內容里加上標示,代表數據是從哪個文件里來的。
- 在reduce處理函數中,按照標識對數據進行處理。
- 然后根據Key去join來求出結果直接輸出。
- 例子
- 數據如上
- 計算過程:
- 在Map階段,把所有數據標記成<key,value>的形式,其中key是id,value則根據來源不同取不同的形式:來源於products表的記錄,value的值為"products#"+name;來源於orders的記錄,value的值為"orders#"+score。
- 在reduce階段,先把每個key下的value列表拆分為分別來自表A和表B的兩部分,分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積,形成一條條最終的結果。
- 代碼如下:
- Mapper
/** * map階段打標記 */ public class reduceMapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(",");
FileSplit fileSplit = (FileSplit)context.getInputSplit(); String pathName = fileSplit.getPath().toString(); pathName=pathName.substring(27); //通過文件名判斷是那種數據 if (pathName.startsWith("a")){//product數據 //System.out.println(keyOut+"\t"+valueOut); context.write(new Text(fields[0]),new Text("product#"+fields[1])); }else if (pathName.startsWith("b")){ context.write(new Text(fields[2]),new Text("order#"+fields[0]+"\t"+fields[1]+"\t"+fields[3])); } } } - Reducer
public class reduceReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //存放產品信息 List<String> proInfo = new ArrayList<String>(); //存放訂單信息 List<String> ordInfo = new ArrayList<String>(); for (Text text:values){ System.out.println("key="+key+" value="+text); //將數組中的數據添加到對應的數組中去 if (text.toString().startsWith("product")){ proInfo.add(text.toString().split("#")[1]); }else if(text.toString().startsWith("order")){ ordInfo.add(text.toString().split("#")[1]); } } //獲取兩個數組的大小 int sizePro = proInfo.size(); int sizeOrd = ordInfo.size(); //遍歷兩個數組將結果寫出去 for (int i=0;i<sizePro;i++){ for (int j=0;j<sizeOrd;j++){ context.write(key,new Text(proInfo.get(i)+" "+ordInfo.get(j))); } } } }
- App
public class ReduceApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); Path path = new Path("F:\\mr\\join\\map/output1"); FileSystem fileSystem = path.getFileSystem(conf); if(fileSystem.isDirectory(path)){ fileSystem.delete(path,true); } //設置job的各種屬性 job.setJobName("ReduceApp"); //作業名稱 job.setJarByClass(ReduceApp.class); //搜索類 //添加輸入路徑 FileInputFormat.addInputPath(job,new Path("F:\\mr\\join\\map\\input")); //設置輸出路徑 FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\join\\map/output1")); job.setMapperClass(reduceMapper.class); //mapper類 job.setReducerClass(reduceReducer.class); //reducer類 job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(Text.class); // job.waitForCompletion(true); } }
- 輸出結果
p0001 xiaomi 1003 20170710 3 p0001 xiaomi 1002 20170710 3 p0001 xiaomi 1001 20170710 1 p0002 chuizi 1004 20170710 1
- Mapper
細節:
-
-
- 當map讀取源文件時,如何區分出是file1還是file2
FileSplit fileSplit = (FileSplit)context.getInputSplit(); String path = fileSplit.getPath().toString();
- 當map讀取源文件時,如何區分出是file1還是file2
-
根據path就可以知道文件的來源咯。
