MapReduce框架結構及代碼示例


一個完整的 mapreduce 程序在分布式運行時有三類實例進程:

1、MRAppMaster:負責整個程序的過程調度及狀態協調

2、MapTask:負責 map 階段的整個數據處理流程

3、ReduceTask:負責 reduce 階段的整個數據處理流程

 

 

 

設計構思

    MapReduce 是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,並發運行在Hadoop 集群上。

    既然是做計算的框架,那么表現形式就是有個輸入(input),MapReduce 操作這個輸入(input),通過本身定義好的計算模型,得到一個輸出(output)。

    對許多開發者來說,自己完完全全實現一個並行計算程序難度太大,而MapReduce 就是一種簡化並行計算的編程模型,降低了開發並行應用的入門門檻。

  Hadoop MapReduce 構思體現在如下的三個方面:

  • 如何對付大數據處理:分而治之

    對相互間不具有計算依賴關系的大數據,實現並行最自然的辦法就是采取分而治之的策略。並行計算的第一個重要問題是如何划分計算任務或者計算數據以便對划分的子任務或數據塊同時進行計算。不可分拆的計算任務或相互間有依賴關系的數據無法進行並行計算!

  • 構建抽象模型:Map 和 Reduce

    MapReduce 借鑒了函數式語言中的思想,用 Map 和 Reduce 兩個函數提供了高層的並行編程抽象模型。

    Map: 對一組數據元素進行某種重復式的處理;

    Reduce: 對 Map 的中間結果進行某種進一步的結果整理。

    MapReduce 中定義了如下的 Map 和 Reduce 兩個抽象的編程接口,由用戶去編程實現:

      map: (k1; v1) → [(k2; v2)]

      reduce: (k2; [v2]) → [(k3; v3)]

    Map 和 Reduce 為程序員提供了一個清晰的操作接口抽象描述。通過以上兩個編程接口,大家可以看出 MapReduce 處理的數據類型是<key,value>鍵值對

  • 統一構架,隱藏系統層細節

    如何提供統一的計算框架,如果沒有統一封裝底層細節,那么程序員則需要考慮諸如數據存儲、划分、分發、結果收集、錯誤恢復等諸多細節;為此,MapReduce 設計並提供了統一的計算框架,為程序員隱藏了絕大多數系統層面的處理細節。

    MapReduce 最大的亮點在於通過抽象模型和計算框架把需要做什么(whatneed to do)與具體怎么做(how to do)分開了,為程序員提供一個抽象和高層的編程接口和框架。程序員僅需要關心其應用層的具體計算問題,僅需編寫少量的處理應用本身計算問題的程序代碼。如何具體完成這個並行計算任務所相關的諸多系統層細節被隱藏起來,交給計算框架去處理:從分布代碼的執行,到大到數千小到單個節點集群的自動調度使用。

 

代碼示例

 

  • pom依賴
<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>hadoop.mr.wc.WordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

 

  • Mapper類
/**
 * mr程序執行的時候mapper階段運行的類,也就是maptask
 */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    //該方法為map階段具體的業務邏輯的實現
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取傳入的一行內容
        String line = value.toString();
        //按照分隔符切割數據返回數組
        String[] words = line.split(" ");
        //遍歷數組
        for (String word : words) {
            //每出現一個單詞都標記1
            context.write(new Text(word),new IntWritable(1));
        }
    }
}

 

  • Reducer類
/**
 * mr程序執行的時候reducer階段運行的類,也就是reducertask
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    //該方法為reducer階段具體的業務邏輯的實現
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定義一個變量統計
        int count = 0;
        //遍歷所有value所在的迭代器  累加構成該單詞最終的總個數
        for (IntWritable value : values) {
            count += value.get();
        }
        //相同key的一組調用reduce完畢,直接輸出
        context.write(key,new IntWritable(count));
    }
}

 

  • 入口函數
    /**
     * mr程序運行時的主類,除了入口函數之外,還要對mr程序做具體描述
     */
    public class WordCountDriver {
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            //指定mr程序使用本地模式模擬一套環境執行mr程序,一般用於本地代碼測試
    //        conf.set("mapreduce.framework.name","local");
    
            //通過job方法獲得mr程序運行的實例
            Job job = Job.getInstance(conf);
    
            //指定本次mr程序的運行主類
            job.setJarByClass(WordCountDriver.class);
            //指定本次mr程序使用的mapper reduce
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //指定本次mr程序map輸出的數據類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //指定本次mr程序reduce輸出的數據類型,也就是說最終的輸出類型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //指定本次mr程序待處理數據目錄   輸出結果存放目錄
            FileInputFormat.addInputPath(job,new Path("/wordcount/input"));
            FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));
    //        FileInputFormat.addInputPath(job,new Path("D:\\wordcount\\input"));
    //        FileOutputFormat.setOutputPath(job,new Path("D:\\wordcount\\output"));
    
            //提交本次mr程序
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);//程序執行成功,退出狀態碼為0,退出程序,否則為1
        }
    }

     

  • 測試

    入口類中被注釋的部分為本地測試方法,也就是在windows指定路徑中准備測試數據,直接run執行,

    而另一種方法是將代碼打成jar包上傳到集群中,在hdfs上指定路徑准備數據,使用hadoop命令啟動

hadoop jar wordcount.jar

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM