MapReduce中的Join算法


  在關系型數據庫中Join是非常常見的操作,各種優化手段已經到了極致。在海量數據的環境下,不可避免的也會碰到這種類型的需求,例如在數據分析時需要從不同的數據源中獲取數據。不同於傳統的單機模式,在分布式存儲下采用MapReduce編程模型,也有相應的處理措施和優化方法。

  我們先簡要地描述待解決的問題。假設有兩個數據集:氣象站數據庫和天氣記錄數據庫

  氣象站的示例數據,如下

Station ID

Station Name

011990-99999

SIHCCAJAVRI

012650-99999

TRNSET-HANSMOEN

  天氣記錄的示例數據,如下

Station ID

Timestamp

Temperature

012650-99999

194903241200

111

012650-99999

194903241800

78

011990-99999

195005150700

0

011990-99999

195005151200

22

011990-99999

195005151800

-11

  假設我們想要如下結果

Station ID

Station Name

Timestamp

Temperature

011990-99999

SIHCCAJAVRI

195005150700

0

011990-99999

SIHCCAJAVRI

195005151200

22

011990-99999

SIHCCAJAVRI

195005151800

-11

012650-99999

TYNSET-HANSMOEN

194903241200

111

012650-99999

TYNSET-HANSMOEN

194903241800

78

  想想看,我們該怎么通過MapReduce實現上面的需求?

 

   MapReduce連接操作的實現技術取決於數據集的規模及分區方式。如果一個數據集很大而另外一個數據集很小,以至於小的數據集可以分發到集群中的每一個節點之中,然后在mapper階段讀取大數據集中的數據;到reducer時,reduce獲取本節點上的數據(也就是小數據集中的數據)並完成連接操作;我們以上面的天氣數據連接來做具體闡述,假設氣象站數據集很少,那將氣象站數據集分發到集群中的每個節點中,在mapper階段讀取天氣記錄數據,在reduce階段讀取本節點上的氣象站數據,然后通過氣象站數據中的氣象站ID和天氣數據中的氣象ID做連接,從而完成氣象站數據和天氣記錄數據的連接。在這種情況下,我們就用到了Hadoop的分布式緩存機制,它能夠在任務運行過程中及時地將文件和存檔復制到任務節點以供使用。為了節約網絡寬帶,在每一個作業中,各個文件通常只需要復制到一個節點一次

  如果兩個數據集的規模均很大,以至於沒有哪個數據集可以被完全復制到集群的每個節點中,我們仍然可以使用 MapReduce來進行連接,至於到底采用map端連接(連接操作如果由mapper執行,則稱為 “map 端連接”)還是reduce端連接(連接操作如果由reducer執行,則稱為“reduce端連接”),則取決於數據的組織方式。下面我們分別介紹map端連接和reduce端連接。

    map 端連接

      在兩個大規模輸入數據集到達map函數之前就應該執行連接操作。為達到該目的,各map的輸入數據必須先分區並且以特定方式排序。各個輸入數據集被划分成相同數量的分區,並且均按相同的鍵(連接鍵)排序。同一鍵的所有記錄均會放在同一分區之中。聽起來似乎要求非常嚴格,但這的確合乎MapReduce作業的輸出。

     map端連接操作可以連接多個作業的輸出,只要這些作業的reducer數量相同、鍵相同並且輸出文件是不可切分的(例如,小於一個 HDFS 塊)。在上面講的天氣例子中,如果氣象站文件以氣象站ID部分排序,天氣記錄也以氣象站ID部分排序,而且reducer的數量相同,則就滿足了執行map端連接的前提條件。

     利用 org.apache.hadoop.mapreduce.join 包中的CompositeInputFormat類來運行一個 map 端連接。CompositeInputFormat類的輸入源和連接類型(內連接或外連接)可以通過一個連接表達式進行配置,連接表達式的語法簡單。此種方法不常用,這里不再贅述。

    reduce 端連接

      由於reduce端連接並不要求輸入數據集符合特定結構,因而reduce端連接比 map 端連接更為常用。但是,由於兩個數據集均需經過MapReduce的shuffle過程, 所以reduce 端連接的效率往往要低一些。基本思路是mapper為各個記錄標記源,並且使用連接鍵作為 map 輸出鍵,使鍵相同的記錄放在同一reducer中。 我們通過下面兩種技術實現reduce端連接。

     1、多輸入

       數據集的輸入源往往有多種格式,因此可以使用 MultipleInputs 類來方便地解析各個數據源。MultipleInputs的用法,在“MapReduce輸入格式”已經介紹過,這里就不再贅述。

     2、二次排序

       如前所述,reducer在兩個數據源中選出鍵相同的記錄並不介意這些記錄是否已排好序。此外,為了更好地執行連接操作,先將某一個數據源傳輸到reducer會非常重要。還以上面的天氣數據連接為例,當天氣記錄發送到reducer的時候,與這些記錄有相同鍵的氣象站信息最好也已經放在reducer,使得reducer能夠將氣象站名稱填到天氣記錄之中就馬上輸出。雖然也可以不指定數據傳輸次序,並將待處理的記錄緩存在內存之中,但應該盡量避免這種情況,因為其中任何一組的記錄數量可能非常龐大,遠遠超出reducer的可用內存容量。 因此我們用到二次排序技術,對map階段輸出的每個鍵的值進行排序,實現這一效果。

 

  下面我們分別介紹兩種實現方式分布式緩存機制、reduce端連接

  1、分布式緩存機制

    1、用法

      Hadoop 命令行選項中,有三個命令可以實現文件分發到任務的各個節點。

        1)可以使用-files選項指定待分發的文件,文件內包含以逗號隔開的URL列表。文件可以存放在本地文件系統、HDFS、或其它Hadoop可讀文件系統之中。如果尚未指定文件系統,則這些文件被默認是本地的。即使默認文件系統並非本地文件系統,這也是成立的。

        2)可以使用-archives選項向自己的任務中復制存檔文件,比如JAR文件、ZIP 文件、tar文件和gzipped tar文件,這些文件會被解檔到任務節點。

        3)可以使用-libjars選項將JAR文件添加到mapper和reducer任務的類路徑中。如果作業JAR文件中並非包含很多庫JAR文件,使用-libjars選項是很方便的。

    2、工作機制

      當啟動一個作業,Hadoop會把由-files、-archives、和-libjars等選項所指定的文件復制到分布式文件系統之中。接着,在任務運行之前,tasktracker將文件從分布式文件系統復制到本地磁盤(緩存)使任務能夠訪問文件。此時,這些文件就被視為“本地化” 了。從任務的角度來看, 這些文件就已經在那兒了,它並不關心這些文件是否來自 HDFS 。此外,有-libjars指定的文件會在任務啟動前添加到任務的類路徑(classpath)中。

   3、分布式緩存API

     由於可以通過Hadoop命令行間接使用分布式緩存,所以大多數應用不需要使用分布式緩存API。然而,一些應用程序需要用到分布式緩存的更高級的特性,這就需要直接使用API了。 API包括兩部分:將數據放到緩存中的方法,以及從緩存中讀取數據的方法。

      1)首先掌握數據放到緩存中的方法,以下列舉 Job 中可將數據放入到緩存中的相關方法:

public void addCacheFile(URI uri); 
public void addCacheArchive(URI uri);// 以上兩組方法將文件或存檔添加到分布式緩存 
public void setCacheFiles(URI[] files); 
public void setCacheArchives(URI[] archives);// 以上兩組方法將一次性向分布式緩存中添加一組文件或存檔 
public void addFileToClassPath(Path file); 
public void addArchiveToClassPath(Path archive);// 以上兩組方法將文件或存檔添加到 MapReduce 任務的類路徑

           在緩存中可以存放兩類對象:文件(files)和存檔(achives)。文件被直接放置在任務節點上,而存檔則會被解檔之后再將具體文件放置在任務節點上。
     2)其次掌握在map或者reduce任務中,使用API從緩存中讀取數據。          

public Path[] getLocalCacheFiles() throws IOException; 
public Path[] getLocalCacheArchives() throws IOException; 
public Path[] getFileClassPaths(); 
public Path[] getArchiveClassPaths();

     我們可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法獲取緩存中的文件或者存檔的引用。當處理存檔時,將會返回一個包含解檔文件的目錄。相應的,用戶可以通過 getFileClassPaths()和getArchivesClassPaths()方法獲取被添加到任務的類路徑下的文件和文檔。

 

  下面我們仍然以前面的氣象站數據和天氣記錄數據為例,使用分布式緩存API,完成兩個數據集的連接操作。完整的 MapReduce 程序如下所示。

package com.buaa.distributedgache;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa.distributedgache
* @ClassName JoinRecordWithStationName
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-25 19:34:57
*/
public class JoinRecordWithStationName extends Configured implements Tool {
    
    public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] arr = value.toString().split("\t", 2);
            if (arr.length == 2) {
                context.write(new Text(arr[0]), value);
            }
        }
    }
    
    public static class TemperatureReducer extends Reducer<Text, Text, Text, Text> {
        // 定義Hashtable存放緩存數據
        private Hashtable<String, String> table = new Hashtable<String, String>();
        
        /**
         * 獲取分布式緩存文件
         */
        @SuppressWarnings("deprecation")
        protected void setup(Context context) throws IOException, InterruptedException {
            // 返回本地文件路徑
            Path[] localPaths = (Path[]) context.getLocalCacheFiles();
            if (localPaths.length == 0) {
                throw new FileNotFoundException("Distributed cache file not found.");
            }
            
            // 獲取本地 FileSystem實例
            FileSystem fs = FileSystem.getLocal(context.getConfiguration());
            // 打開輸入流
            FSDataInputStream in = fs.open(new Path(localPaths[0].toString()));
            // 創建BufferedReader讀取器
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            // 按行讀取並解析氣象站數據
            String infoAddr = null;
            while ((infoAddr = br.readLine()) != null) {
                String[] records = infoAddr.split("\t");
                // key為stationID,value為stationName
                table.put(records[0], records[1]);
            }
        }

        public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException {
            // 天氣記錄根據stationId獲取stationName
            String stationName = table.get(key.toString());
            for (Text value : values) {
                context.write(new Text(stationName), value);
            }
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        // 讀取配置文件
        Configuration conf = new Configuration();
        
        // 判斷路徑是否存在,如果存在,則刪除
        Path mypath = new Path(args[2]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        
        // 獲取一個job實例
        Job job = Job.getInstance(conf,"join");
        // 主類
        job.setJarByClass(JoinRecordWithStationName.class);
        
        // 設置record.txt文件作為輸入
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 添加station.txt到分布式緩存
        job.addCacheFile(new URI(args[1]));
        // 輸出目錄
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // mapper
        job.setMapperClass(TemperatureMapper.class);
        // reduce
        job.setReducerClass(TemperatureReducer.class);
        
        // 輸出key類型
        job.setOutputKeyClass(Text.class);
        // 輸出value類型
        job.setOutputValueClass(Text.class);
        
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://hadoop1:9000/buaa/join/record.txt",
                "hdfs://hadoop1:9000/buaa/join/station.txt",
                "hdfs://hadoop1:9000/buaa/join/out/" 
            };
        int ec = ToolRunner.run(new Configuration(), new JoinRecordWithStationName(), args0);
        System.exit(ec);
    }
}

  添加分布式緩存文件相對簡單,只需使用job.addCacheFile(new URI(cacheFilePath))方法添加緩存文件即可。需要注意的是,在獲取獲取緩存文件時,文件將以“本地的”Path 對象的形式返回。為了讀取文件,用戶需要首先使用getLocal()方法獲得一個Hadoop本地FileSystem實例。本程序中,我們在Reduce的setup()方法中獲取緩存文件。

  以下是輸出結果,達到我們預期的效果。

  clip_image002

  2、Reduce端連接

  我們使用 TextPair 類構建組合鍵,包括氣象站ID 和“標記”。在這里,“標記” 是一個虛擬的字段,其唯一目的是對記錄排序,使氣象站記錄比天氣記錄先到達。一種簡單的做法就是:對於氣象站記錄,設置“標記”的值設為 0;對於天氣記錄,設置“標記”的值設為1,代碼如下所示

package com.buaa.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName TextPair
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:54:05
*/
public class TextPair implements WritableComparable<TextPair>{
    // Text類型的實例變量first
    private Text first;
    // Text類型的實例變量second
    private Text second;
    
    public TextPair(){
        set(new Text(),new Text());
    }
    
    public TextPair(String first,String second){
        set(new Text(first),new Text(second));
    }
    
    public void set(Text first,Text second){
        this.first = first;
        this.second = second;
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }
    
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }
    
    public boolean equals(TextPair tp) {
        return first.equals(tp.first) && second.equals(tp.second);
    }
    
    public String toStirng() {
        return first + "\t" + second;
    }
    
    @Override
    public int compareTo(TextPair o) {
        if(!first.equals(o.first)){
            return first.compareTo(o.first);
        }else if(!second.equals(o.second)){
            return second.compareTo(o.second);
        }
        
        return 0;
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }

    public void setSecond(Text second) {
        this.second = second;
    }
}

  JoinStationMapper處理來自氣象站數據,代碼如下所示。

package com.buaa.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinStationMapper
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:55:42
*/
public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        // 解析氣象站數據
        String[] arr = line.split("\\s+");
        
        if (arr.length == 2) {// 滿足這種數據格式
            // key=氣象站id value=氣象站名稱
            context.write(new TextPair(arr[0], "0"), new Text(arr[1]));
        }
    }
}

  JoinRecordMapper處理來自天氣記錄數據,代碼如下所示

package com.buaa.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinRecordMapper
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:56:55
*/
public class JoinRecordMapper extends Mapper<LongWritable,Text,TextPair,Text>{ 
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String line = value.toString();
        // 解析天氣記錄數據
        String[] arr = line.split("\\s+",2);
        
        if(arr.length == 2){
            //key=氣象站id  value=天氣記錄數據
            context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
        }  
    }
}

  由於 TextPair 經過了二次排序,所以 reducer 會先接收到氣象站數據。因此從中抽取氣象站名稱,並將其作為后續每條輸出記錄的一部分寫到輸出文件。JoinReducer 的代碼如下所示。

package com.buaa.secondarysort;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinReducer
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:54:24
*/
public class JoinReducer extends Reducer< TextPair,Text,Text,Text>{
    protected void reduce(TextPair key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
        Iterator<Text> iter = values.iterator();
        // 氣象站名稱
        Text stationName = new Text(iter.next());
        
        while(iter.hasNext()){
            // 天氣記錄的每條數據
            Text record = iter.next();
            
            Text outValue = new Text(stationName.toString() + "\t" + record.toString());
            
            context.write(key.getFirst(),outValue);
        }
    }        
}

  下面我們定義作業的驅動類 JoinRecordWithStationName,在該類中,關鍵在於根據組合鍵的第一個字段(即氣象站 ID)進行分區和分組,JoinRecordWithStationName 類的代碼如下所示。

package com.buaa.secondarysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinRecordWithStationName
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:57:24
*/
public class JoinRecordWithStationName extends Configured implements Tool {
    public static class KeyPartitioner extends Partitioner<TextPair, Text> {
        public int getPartition(TextPair key, Text value, int numPartitions) {
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }
    
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(TextPair.class, true);
        }
        
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable wc1, WritableComparable wc2) {
            TextPair tp1 = (TextPair) wc1;
            TextPair tp2 = (TextPair) wc2;
            
            return tp1.getFirst().compareTo(tp2.getFirst());
        }
    }

    public int run(String[] args) throws Exception {
        // 讀取配置文件
        Configuration conf = new Configuration();
        
        // 判斷路徑是否存在,如果存在,則刪除
        Path mypath = new Path(args[2]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        // 新建一個任務
        Job job = Job.getInstance(conf, "join");
        // 主類
        job.setJarByClass(JoinRecordWithStationName.class);
        
        // 天氣記錄數據源
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, JoinRecordMapper.class);
        // 氣象站數據源
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, JoinStationMapper.class);
        // 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // 自定義分區
        job.setPartitionerClass(KeyPartitioner.class);
        // 自定義分組
        job.setGroupingComparatorClass(GroupComparator.class);
        
        // 指定Reducer
        job.setReducerClass(JoinReducer.class);
        
        // map key輸出類型
        job.setMapOutputKeyClass(TextPair.class);
        // reduce key輸出類型
        job.setOutputKeyClass(Text.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://hadoop1:9000/buaa/join/record.txt",
                "hdfs://hadoop1:9000/buaa/join/station.txt",
                "hdfs://hadoop1:9000/buaa/join/out/" 
        };
        int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args0);
        System.exit(exitCode);
    }
}

  以下是輸出結果,也達到我們預期的效果。

  clip_image004

 

如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM