MapReduce 編程模型 & WordCount 示例


 

學習大數據接觸到的第一個編程思想 MapReduce。

 

前言

之前在學習大數據的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大數據或者說正在學習大數據的朋友。如果你看到里面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

 

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大數據場景中,最先讓人了解到的就是數據量大。當數據量大了以后,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個文件,就10M,里面存放的是一篇英文文檔,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 代碼的套路來做,大概就是讀取文件,把數據加載到內存,然后new 一個map來存最后的結果。key 就是單詞,value 就是單詞出現的次數。

 然后從文件中讀取一行數據,然后對這行數據按空格進行切割,然后對切割后的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設置 value 為 1 。

 然后再讀取一行數據重復上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才文件是 10M,處理完成秒秒鍾的事情,但是現在我的文件是 2T 的大小,看清楚呃,是兩個 T 的文件需要處理,那你現在要怎么做?還去加載到內存么?

想想你公司的機器配置,內存多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在內存價格是多少,128G 的內存得花多少錢。很顯然,現在這么玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少台吧。那么如果說我們現在把這些機器組成一個 N 節點的集群,然后把這 2T 的文件切分成很多個小文件,然后丟到這些機器上面去計算執行統計,最后再進行一個匯總,是不是就解決了上面的內存不足的問題。

 

MapReduce 思想

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑒了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(映射)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的數據片的階段,Reduce 階段則是最后統計匯總的階段。

那么,針對前面說的例子大概可以用下面這個圖來描述它:

 

簡單說一下上面的思路:

第一步:把兩個T 的文件分成若干個文件塊(block)分散存在整個集群上,比如128M 一個。

第二步:在每台機器上運行一個map task 任務,分別對自己機器上的文件進行統計:

1.先把數據加載進內存,然后一行一行的對數據進行讀取,按照空格來進行切割。

2.用一個 HashMap 來存儲數據,內容為 <單詞,數量>

3.當自己本地的數據處理完成以后,將數據進行輸出准備

4.輸出數據之前,先把HashMap 按照首字母范圍分成 3 個HashMap5.將3個 HashMap 分別發送給 3個 Reduce task 進行處理,分發的時候,同一段單詞的數據,就會進入同一個 Reduce task 進行處理,保證數據統計的完整性。

第三步: Reduce task 把收到的數據進行匯總,然后輸出到 hdfs 文件系統進程存儲。

上面的過程可能遇到的問題

上面我們只是關心了我們業務邏輯的實現,其實系統一旦做成分布式以后,會面臨非常多的復雜問題,比如:

•你的 Map task 如何進行任務分配?

•你的 Reduce task 如何分配要處理的數據任務?

•Map task 和 Reduce task 之間如何進行銜接,什么時候去啟動Reduce Task 呀?

•如果 Map task 運行失敗了,怎么處理?

•Map task 還要去維護自己要發送的數據分區,是不是也太麻煩了。

•等等等等等

 

為什么要用 MapReduce

可見在程序由單機版擴成分布式時,會引入大量的復雜工作。為了提高開發效率,可以將分布式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而 MapReduce 就是這樣一個分布式程序的通用框架。

 

WordCount 示例

用一個代碼示例來演示,它需要3個東西,一個是map task ,一個是 reduce task ,還有就是啟動類,不然怎么關聯他們的關系呢。

首先是 map task :

package com.zhouq.mr;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * KEYIN 默認情況下,是MR 框架中讀取到的一行文本的起始偏移量,long 類型 * 在hadoop 中有自己更精簡的序列化接口,我們不直接用Long ,而是用 LongWritable * VALUEIN : 默認情況下,是MR 中讀取到的一行文本內容,String ,也有自己的類型 Text 類型 * <p> * KEYOUT : 是用戶自定義的邏輯處理完成后的自定義輸出數據的key ,我們這里是單詞,類型為string 同上,Text * <p> * VALUEOUT: 是用戶自定義的邏輯處理完成后的自定義輸出value 類型,我們這里是單詞數量Integer,同上,Integer 也有自己的類型 IntWritable * <p> */public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map 階段的業務邏輯就寫在map 方法內 * maptask 會對每一行輸入數據 就調用一次我們自定義的map 方法。 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到輸入的這行數據 String line = value.toString(); //根據空格進行分割得到這行的單詞 String[] words = line.split(" "); //將單詞輸出為 <word,1> for (String word : words) { //將單詞作為key ,將次數 做為value輸出, // 這樣也利於后面的數據分發,可以根據單詞進行分發, // 以便於相同的單詞落到相同的reduce task 上,方便統計 context.write(new Text(word), new IntWritable(1)); } }}

接下來是 reduce task 邏輯:

/** * KEYIN VALUEIN 對於map 階段輸出的KEYOUT VALUEOUT * <p> * KEYOUT :是自定義 reduce 邏輯處理結果的key * VALUEOUT : 是自定義reduce 邏輯處理結果的 value */public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <zhouq,1>,<zhouq,1>,<zhouq,2> ...... * 入參key 是一組單詞的kv對 的 key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //拿到當前傳送進來的 單詞// String word = key.toString(); // int count = 0; for (IntWritable value : values) { count += value.get(); } //這里的key 就是單詞 context.write(key, new IntWritable(count)); }}

 

最后是啟動類:

 

/** * wc 啟動類 */public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // mapreduce.framework.name 配置成 local 就是本地運行模式,默認就是local // 所謂的集群運行模式 yarn ,就是提交程序到yarn 上. 要想集群運行必須指定下面三個配置.// conf.set("mapreduce.framework.name", "yarn");// conf.set("yarn.resoucemanager.hostname", "mini1"); //conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/"); Job job = Job.getInstance(conf); //指定本程序的jar 包 所在的本地路徑 job.setJarByClass(WordCountDriver.class); //指定本次業務的mepper 和 reduce 業務類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordcountReduce.class); //指定mapper 輸出的 key value 類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定 最終輸出的 kv 類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job,new Path(args[0])); //指定job 輸出的文件目錄 FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion ? 0 : 1); }}

 

配置啟動類參數:填寫輸入目錄和輸出目錄,注意輸出目錄不能存在,不然會執行失敗的。

 

執行我們就用編輯器執行,用本地模式,不提交到hadoop 集群上,執行完成后,去到輸出目錄下可以看到這些文件: 

 

然后輸出一下 part-r-00000 這個文件:

代碼地址:https://github.com/heyxyw/bigdata/blob/master/bigdatastudy/mapreduce/src/main/java/com/zhouq/mr/WordCountDriver.java

 

最后

希望對你有幫助。后面將會去講 MapReduce 是如何去運行的。

 

作者·往期內容:

記一次阿里巴巴一面的經歷

 


 

作者介紹:喬二爺,在成都喬二爺這個名字是之前身邊的同事給取的,也不知道為啥。也習慣了他們這樣叫我。

一直待在相對傳統一點的企業,有四年半的 Java 開發經驗,會點大數據的內容,也跟客戶打過一年的交道,還帶過 10個月 10人+的技術團隊,有一定的協調組織能力,能夠理解 boss 的工作內容,也能很好的配合別人做事。

 


 

Java 極客技術公眾號,是由一群熱愛 Java 開發的技術人組建成立,專注分享原創、高質量的 Java 文章。如果您覺得我們的文章還不錯,請幫忙贊賞、在看、轉發支持,鼓勵我們分享出更好的文章。

關注公眾號,大家可以在公眾號后台回復“博客園”,免費獲得作者 Java 知識體系/面試必看資料。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM