[Hadoop]-從數據去重認識MapReduce


  這學期剛好開了一門大數據的課,就是完完全全簡簡單單的介紹的那種,然后就接觸到這里面最被人熟知的Hadoop了。看了官網的教程【吐槽一下,果然英語還是很重要!】,嗯啊,一知半解地搭建了本地和偽分布式的,然后是在沒弄懂,求助了Google,搞來了一台機子,嗯,搭了個分布式的。其實是作業要求啦,覺得自己平時用單機的完全夠了啦~

  然后被要求去做個WordCount和數據去重的小例子,嗯啊,我就抱着半桶水的Java知識就出發走向“大數據“【其實很小】了。

  立馬求助官網【官網就是好,雖然看的慢,英語技術兩不誤!】,看了給出來的WordCount的例子,自己就寫了一下數據去重的小例子,還好成功了,順便對MapReduce了解了更多。下面說一下自己的認識和實現的思路。

 

  首先整個的流程大致是這樣的

  1.輸入數據InputData在被Map()處理之前會先由InputFormat調用getRecordReader()生成RecordReader,RecordReader再調用creatKey()和creatValue()生成可供Map使用的<key,value>對。其中有很多格式可繼承於InputFormat,如我們最常用TextInputFor就是繼承於FileInputFormat,將每一行數據都生成一個記錄。

  2.到Map呢就是發揮數據價值的時候了。想想這些數據都能拿來干嘛,想干嘛,再coding你想要做的一切一切吧。

  3.Map輸出的<key,value>對在被送到Reduce之前呢,會被先送到Shuffle處理一下成為<key,value-list>的樣子,Reduce最喜歡這樣的了。怎么變成這樣呢?嗯,就是將相同的Key數據合並在一起了,還可以指定Job.setCombinerClass(class)來指定組合的方式。還有亂糟糟的Reduce也是不喜歡的,所以在它們組合之后還要再sort一下才行,如果遇到等值的數據呢,你又想自己來定怎么排序,那就指定Job.setGroupingComparaterClass(class)來‘二次排序’吧。

  4.又是一個發揮想象力的過程了。也是要好好想想Reduce能幫你干什么呢,如果覺得沒必要的話,我們也可以不指定Reduce的,讓Map處理后的數據就直接輸出好了。在Map和Reduce階段都可以借助Counter來獲得一些統計信息哦。

  5.快到尾聲了,記得每一種InputFormat都有一種OutputFormat和它對應的,最常用的還是Text類型的。

  6.上面我們提到了Job這個東西,其實一個Job可以理解分為Map和Reduce兩個過程。所以我們既可以定義Map和Reduce,也是給Job設定各種各樣的配置。最簡單的設置會在后面程序的注釋里給出。

 

  數據去重實現的思路:

  根據上面的流程分析,既然數據是一行一行split之后再傳進去Map的,而Map的輸出結果是會經過Shuffle合並相同key之后再給Reduce的,那我們將Map輸入的value變成Map輸出的key就ok了,這里就不用管輸出的value-list是什么東西了,重復的就combine了,達到需要。

 

  這是代碼的具體實現:

  

/*
    運行環境
    CentOS7
    OpenJDK-1.7.0.91
    Hadoop2.7.1
    !單機節點測試!
*/

//導入所需的包
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Single
{

    //map將輸入中的value復制到輸出數據的key上,並直接輸出
    /*
        四個參數的意思分別是:
        Object:輸入到Map中的key的類型
        Text:輸入到Map中的value的類型
        Text:輸出到Reduce中的key的類型
        Text:輸出到Reduce中的value的類型
    */
    public static class Map extends Mapper<Object, Text, Text, Text>
    {
        //從輸入中得到的每行的數據的類型
        private static Text line = new Text();

        //實現map函數
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            //獲取並輸出每一次的處理過程
            line = value;
            System.out.println("The process of the Map:" + key);
            context.write(line, new Text(""));
        }
    }

    //reduce將輸入中的key復制到輸出數據的key上,並直接輸出
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {

        //實現reduce函數
        public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException
        {
            //獲取並輸出每一次的處理過程
            System.out.println("The process of the Reduce:" + key);
            context.write(key, new Text(""));

        }

    }

    public static void main(String[] args) throws Exception
    {

        //設置配置類
        Configuration conf = new Configuration();

        //是從命令行里獲取輸入數據和輸出數據的路徑,所以這里要獲取和判斷一下
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if(pathArgs.length != 2)
        {
            System.err.println("Please set the path of <InputData> & <OutputData> in the command!");
            System.exit(2);
        }

        //Job job = new Job(conf, "Date-Single");
        //照着葫蘆畫不出來,就Google一下解決方法
        Job job = Job.getInstance();
        job.setJobName("single");
        job.setJarByClass(Single.class);
        //設置Map、Combine和Reduce處理類
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        //設置輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //設置輸入和輸出目錄
        FileInputFormat.addInputPath(job, new Path(pathArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(pathArgs[1]));
        //這里是根據是否等待job完成之后再返回結果並退出程序
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

 

  后記:

  在這個學習過程中,很多東西都是”不求甚解“,對於它們只有一些很淺顯的理解,如有錯誤之處,勞煩告知,謝謝。

  本文由AnnsShadoW發表於:http://www.cnblogs.com/annsshadow/p/5006317.html 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM