mapreduce學習工程之五---map端join連接


實驗環境 win7 hadoop2.7.3本地模式

實驗數據:訂單數據orders.txt,商品數據pdts.txt

order.txt

1001    pd001    300
1002    pd002    20
1003    pd003    40
1004    pd002    50

pdts.txt

pd001    apple
pd002    xiaomi
pd003    cuizi

實驗解決的問題:解決mapreduce連接過程中的數據傾斜的問題,典型應用場景如下:在電商平台中,買小米手機和買蘋果手機的訂單數量很多,買錘子手機的訂單數量很少,如

果根據傳統的mapreduce方法,3個reduce的數據將不均衡。比如接受小米的reduce接收到的數據會很多,接受錘子數據的reduce接收到的數據就會很少

實驗解決的思路:采用map端連接,直接將排序過程在map中執行,將商品信息加載在map信息中,引入mapreduce的輸入緩存機制

代碼如圖所示:

package com.tianjie.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

    static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{
        
        
        //map 商品的訂單信息k v key為商品編號,v為商品名稱
        Map<String,String>     pdInfoMap = new HashMap<String, String>();
        Text ktext = new Text();
        
        
        /*setup 函數用來加載文件到hadoop緩存中
         * */
        protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            
            //打開輸入文本文件的路徑,獲得一個輸入流
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
            String line;
            while(StringUtils.isNotEmpty(line = br.readLine())){
                //獲得商品信息表 k為商品編號,value為商品名稱
                String[] split = line.split("\t");
                pdInfoMap.put(split[0], split[1]);
                
            }
            
        }
        /*
         * hadoop 的緩沖機制*/
        
        
        /*
         * map 函數的輸入key value ,其中默認輸入為TextInputFormat,
         *     key 為輸入文本的偏移量,value為輸入文本的值
         *     Text,NullWriable為map文件輸入的值
         *     */
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            
            //獲得文本文件的一行
            String orderline  = value.toString();
            //將文本文件按照制表符切分
            String[] fields = orderline.split("\t");
            //更具商品編號,獲得商品名稱
            String pdName = pdInfoMap.get(fields[1]);
            //獲得商品的名字,將商品名稱追加在文本文件中
            ktext.set(orderline+"\t"+pdName);
            //將新的文本文件寫出
            context.write(ktext, NullWritable.get());
        }
        
    }
        
    
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        
        //得到hadoop的一個配置參數
        Configuration conf = new Configuration();
        //獲取一個job實例
        Job job = Job.getInstance(conf);
        //加載job的運行類
        job.setJarByClass(MapSideJoin.class);
        
        //加載mapper的類
        job.setMapperClass(MapSideJoinMappe.class);
        //設置mapper類的輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        //設置文件輸入的路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
        //設置文件的輸出路徑
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(args[1]);
        if(fs.isDirectory(path)){
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //指定需要緩沖一個文件到所有maptask運行節點工作目錄
        //job.addArchiveToClassPath(""); 緩存jar包到task運行節點的classpath中
        //job.addFileToClassPath(file); 緩存普通文件到task運行節點的classpath中
        //job.addCacheArchive(uri);      緩存壓縮包文件到task運行節點的工作目錄中
        
        //1:緩存普通文件到task運行節點的工作目錄
        job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); 
        
        //2:指定map端的加入邏輯不需要reduce階段,設置reducetask數量為0
        job.setNumReduceTasks(0);
        
        //提交job任務,等待job任務的結束
        boolean res =job.waitForCompletion(true);
        System.exit(res?1:0);
        
        }
}

 

需要注意的點有:

1:采用map端連接時,可以不適用reduce,這個時候可以設置reducetask 的數量為0:

2:程序運行的結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM