使用MapReduce實現join操作


   在關系型數據庫中,要實現join操作是非常方便的,通過sql定義的join原語就可以實現。在hdfs存儲的海量數據中,要實現join操作,可以通過HiveQL很方便地實現。不過HiveQL也是轉化成MapReduce來完成操作,本文首先研究如何通過編寫MapReduce程序來完成join操作。

一、Map-Join:在Reduce端完成的join操作

   假設存在用戶數據文件users.txt和用戶登錄日志數據文件login_logs.txt,數據內容分別如下所示:

   用戶數據文件user.txt,列:userid、name:

1    LiXiaolong
2    JetLi
3    Zhangsan
4    Lisi
5    Wangwu
View Code

   用戶登錄日志數據文件login_logs.txt,列:userid、login_time、login_ip

1    2015-06-07 15:10:18    192.168.137.101
3    2015-06-07 15:12:18    192.168.137.102
3    2015-06-07 15:18:36    192.168.137.102
1    2015-06-07 15:22:38    192.168.137.101
1    2015-06-07 15:26:11    192.168.137.103
View Code

   期望計算結果:

1    LiXiaolong    2015-06-07 15:10:18    192.168.137.101
1    LiXiaolong    2015-06-07 15:22:38    192.168.137.101
1    LiXiaolong    2015-06-07 15:26:11    192.168.137.103
3    Zhangsan    2015-06-07 15:12:18    192.168.137.102
3    Zhangsan    2015-06-07 15:18:36    192.168.137.102
View Code

   計算思路:

   1) 在map階段可以通過文件路徑判斷來自users.txt還是login_logs.txt,來自users.txt的數據輸出<userid, 'u#'+name>,來自login_logs.txt的數據輸出<userid,'l#'+login_time+'\t'+login_ip>;

   2) 在reduce階段將來自不同表的數據區分開,然后做笛卡爾乘積,輸出結果;

   實現代碼:

package com.hicoor.hadoop.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceJoinDemo {
    public static final String DELIMITER = "\t"; // 字段分隔符

    static class MyMappper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            FileSplit split = (FileSplit) context.getInputSplit();
            String filePath = split.getPath().toString();
            // 獲取記錄字符串
            String line = value.toString();
            // 拋棄空記錄
            if (line == null || line.trim().equals("")) return;

            String[] values = line.split(DELIMITER);
            // 處理user.txt數據
            if (filePath.contains("users.txt")) {
                if (values.length < 2) return;
                context.write(new Text(values[0]), new Text("u#" + values[1]));
            }
            // 處理login_logs.txt數據
            else if (filePath.contains("login_logs.txt")) {
                if (values.length < 3) return;
                context.write(new Text(values[0]), new Text("l#" + values[1] + DELIMITER + values[2]));
            }
        }
    }

    static class MyReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            LinkedList<String> linkU = new LinkedList<String>();  //users值
            LinkedList<String> linkL = new LinkedList<String>();  //login_logs值
              
            for (Text tval : values) {
                String val = tval.toString();  
                if(val.startsWith("u#")) {
                    linkU.add(val.substring(2));
                } else if(val.startsWith("l#")) {
                    linkL.add(val.substring(2));
                }
            }
              
            for (String u : linkU) {
                for (String l : linkL) {
                    context.write(key, new Text(u + DELIMITER + l));
                }
            }
        }
    }

    private final static String FILE_IN_PATH = "hdfs://cluster1/join/in";
    private final static String FILE_OUT_PATH = "hdfs://cluster1/join/out";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        System.setProperty("hadoop.home.dir", "D:\\desktop\\hadoop-2.6.0");
        Configuration conf = getHAContiguration();

        // 刪除已存在的輸出目錄
        FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
        if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
            fileSystem.delete(new Path(FILE_OUT_PATH), true);
        }

        Job job = Job.getInstance(conf, "Reduce Join Demo");
        job.setMapperClass(MyMappper.class);
        job.setJarByClass(ReduceJoinDemo.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
        FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
        job.waitForCompletion(true);
    }

    private static Configuration getHAContiguration() {
        Configuration conf = new Configuration();
        conf.setStrings("dfs.nameservices", "cluster1");
        conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop1,hadoop2");
        conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "172.19.7.31:9000");
        conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop2", "172.19.7.32:9000");
        // 必須配置,可以通過該類獲取當前處於active狀態的namenode
        conf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        return conf;
    }

}
View Code
二、Reduce-join:在Reduce端完成的join操作

    當join的兩個表中有一個表數據量不大,可以輕松加載到各節點內存中時,可以使用DistributedCache將小表的數據加載到分布式緩存,然后MapReduce框架會緩存數據分發到需要執行map任務的節點上,在map節點上直接調用本地的緩存文件參與計算。在Map端完成join操作,可以降低網絡傳輸到Reduce端的數據流量,有利於提高整個作業的執行效率。

   計算思路:

   假設users.txt用戶表數據量較小,則將users.txt數據添加到DistributedCache分布式緩存中,在map計算中讀取本地緩存的users.txt數據並將login_logs.txt中的userid數據翻譯成用戶名,本例無需Reduce參與。

   實現代碼:

package com.hicoor.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Scanner;
import java.util.StringTokenizer;

import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SecondarySort.Reduce;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.example.MyApp.MyController;

public class DistributedCacheDemo {
    public static final String DELIMITER = "\t"; // 字段分隔符

    static class MyMappper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String, String> userMaps = new HashedMap();
        
        @Override
        protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException ,InterruptedException {
            //可以通過localCacheFiles獲取本地緩存文件的路徑
            //Configuration conf = context.getConfiguration();
            //Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
            
            //此處使用快捷方式users.txt訪問
            FileReader fr = new FileReader("users.txt");
            BufferedReader br = new BufferedReader(fr);
            String line;
            while((line = br.readLine()) != null) {
                //map端加載緩存數據
                String[] splits = line.split(DELIMITER);
                if(splits.length < 2) continue;
                userMaps.put(splits[0], splits[1]);
            }
        };
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            // 獲取記錄字符串
            String line = value.toString();
            // 拋棄空記錄
            if (line == null || line.trim().equals("")) return;
            
            String[] values = line.split(DELIMITER);
            if(values.length < 3) return;
            
            String name = userMaps.get(values[0]);
            Text t_key = new Text(values[0]);
            Text t_value = new Text(name + DELIMITER + values[1] + DELIMITER + values[2]);
            context.write(t_key, t_value);
        }
    }

    private final static String FILE_IN_PATH = "hdfs://cluster1/join/in/login_logs.txt";
    private final static String FILE_OUT_PATH = "hdfs://cluster1/join/out";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        System.setProperty("hadoop.home.dir", "D:\\desktop\\hadoop-2.6.0");
        Configuration conf = getHAConfiguration();

        // 刪除已存在的輸出目錄
        FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
        if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
            fileSystem.delete(new Path(FILE_OUT_PATH), true);
        }

        //添加分布式緩存文件 可以在map或reduce中直接通過users.txt鏈接訪問對應緩存文件
        DistributedCache.addCacheFile(new URI("hdfs://cluster1/join/in/users.txt#users.txt"), conf);
        
        Job job = Job.getInstance(conf, "Map Distributed Cache Demo");
        job.setMapperClass(MyMappper.class);
        job.setJarByClass(DistributedCacheDemo.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
        FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
        job.waitForCompletion(true);
    }

    private static Configuration getHAConfiguration() {
        Configuration conf = new Configuration();
        conf.setStrings("dfs.nameservices", "cluster1");
        conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop1,hadoop2");
        conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "172.19.7.31:9000");
        conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop2", "172.19.7.32:9000");
        //必須配置,可以通過該類獲取當前處於active狀態的namenode
        conf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        return conf;
    }

}
View Code
三、使用HiveQL來完成join

   使用HiveQL可以輕松完成該任務,只需使用表連接語句,hive會自動生成並優化mapreduce程序來執行查詢操作。

   實現步驟:

   1) 在/join/in/目錄下創建users目錄和login_logs目錄,分別將users.txt和login_logs.txt移動到對應目錄中;

   2) 創建users外部表:create external table users(userid int, name string) row format delimited fields terminated by '\t' location '/join/in/users';

   3) 創建login_logs外部表:create external table login_logs(userid string,login_time string,login_ip string) row format delimited fields terminated by '\t' location '/join/in/login_logs';

   4)執行連接查詢並保存結果:create table user_login_logs as select A.*,B.login_time,B.login_ip from users A,login_logs B where A.userid=B.userid;

四、總結

    通常情況下我們會使用hive來幫助我們完成join操作,map-join和reduce-join用於實現一些復雜的、特殊的需求。此外還有一種實現方式:SemiJoin,這是一種介於map-join和reduce-join之間的方法,就是在map端過濾掉一些數據,在網絡中只傳輸參與連接的數據不參與連接的數據不必在網絡中進行傳輸,從而減少了shuffle的網絡傳輸量,使整體效率得到提高。

   執行效率:map-join>SemiJoin>reduce-join。

   參考:http://database.51cto.com/art/201410/454277.htm


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM