MapReduce簡介


MapReduce簡介##

參考自![http://www.cnblogs.com/swanspouse/p/5130136.html]

MapReduce定義:

MapReduce是一種可用於數據處理的編程框架。MapReduce采用"分而治之"的思想,把對大規模數據集的操作,分發給一個主節點管理下的各個分節點共同完成,然后通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是"任務的分解與結果的匯總"。

在分布式計算中,MapReduce框架負責處理了並行編程中分布式存儲、工作調度、負載均衡、容錯均衡、容錯處理以及網絡通信等復雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解后多任務處理的結果匯總起來。

MapReduce適用的問題:

用MapReduce來處理的數據集(或任務)必須具備這樣的特點:待處理的數據集可以分解成許多小的數據集,而且每一個小數據集都可以完全並行地進行處理。

MapReduce框架中的名詞解釋:

split:


分片是指MapReduce框架將數據源根據一定的規則將源數據分成若干個小數據的過程;其中,一個小數據集,也被稱為一個分片。

Map:


Map有兩層含義:

  • 其一、是指MapReduce框架中的Map過程,即將一個分片根據用戶定義的Map邏輯處理后,經由MapReduce框架處理,形成輸出結果,供后續Reduce過程使用;
  • 其二,是指用戶定義Java程序實現Mapper類的map接口的用戶自定義邏輯,此時通常被稱為mapper。

Reduce:


Reduce也有兩層含義:

  • 其一,是指MapReduce框架中的Reduce過程,即將Map的結果作為輸入,根據用戶定義的Reduce邏輯,將結果處理並匯總,輸出最后的結果;
  • 其二,是指用戶定義Java程序實現Reducer類的reduce接口的用戶自定義邏輯,此時通常被稱為reducer。

Combine:


Combine是一個可由用戶自定的過程,類似於Map和Reduce,MapReduce框架會在Map和Reduce過程中間調用Combine邏輯(會在下面章節中仔細講解),通常Combine和reduce的用戶代碼是一樣的(也可被稱為本地的reduce過程),但是請注意並不是所有用MapReduce框架實現的算法都適合增加Combine過程(比如求平均值)。

Partition:


在MapReduce框架中一個split對應一個map,一個partiton對應一個reduce(無partition指定時,由用戶配置項指定,默認為1個)。 reduce的個數決定了輸出文件的個數。比如,在需求中,數據是從對每個省匯總而成,要求計算結果按照省來存放,則需要根據源數據中的表明省的字段分區,用戶自定義partition類,進行分區。

MapReduce的原理:

wordcount:


wordcount是最簡單也是最能體現MapReduce思想的程序之一,可以稱為MapReduce版"Hello World",該程序的完整代碼可以在Hadoop安裝包的"src/examples"目錄下找到。單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數,即簡單如下圖所示:

由於是示例,這里不涉及分區,更復雜的情況會在shuffle過程中仔細討論。故這里n個split分別對應n個map,reduce數為1。

java代碼:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
public static class TokenizerMapper extends
        Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

public static class IntSumReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException{
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("mapred.job.tracker", "10.1.69.179:9001");
    String[] ars = new String[] { "input", "newout" };
    String[] otherArgs = new GenericOptionsParser(conf, ars)
            .getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: wordcount  ");
        System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

上述就是WordCount的用戶代碼和流程圖。由於MapReduce作為一個分布式的框架,肯定有其用戶定義的約束和規范,那么對於一個簡單的MapReduce作業,用戶需要定義Map和Reduce過程,在代碼層級,就是繼承Maper類實現map函數和繼承Reducer類實現Reduce函數,用戶分別在這兩個方法中實現具體的計算邏輯。

wordcount流程概述:

下面結合代碼和流程圖,描述一下wordcount項目在MapReduce計算框架下的處理流程:

  1. 首先,通過job.waitForCompletion(true)開啟了WordCount這個MapReduce作業,后續通過InputFormat的實現類FileInputFormat將輸入數據,即輸入文件,分片從而得到Map方法,即Map用戶定義的方法的輸入,即圖中所示,FileInputFormat將文件按照行分割,並組織成為的形式,成為用戶Map方法的輸入,其中Key是字符的偏移量,value即一行的內容。
  2. 而后,數據被輸入到用戶定義的map方法中,map方法以文件中的每行數據作為輸入,將每行按照空格分詞,並將每個詞組織為K-V對,輸出;
  3. Map的輸出交予了MapReduce框架來進行處理,簡單來說MapReduce框架將這些K-V對依照key的字典順序由小到大排列,並對相同的key的value進行合並為數組list,輸出給combine過程【其中的詳細過程會在講解shuffle過程時仔細討論】;
  4. 將map方法的輸出結果根據Key排序完成之后,如果有combine過程被定義這時候MapReduce框架就會調用Combine過程。Combine過程是由用戶指定的,必須的過程,一般Combine過程在邏輯上就是Reduce過程,map的輸出結果需要通過網絡傳遞給reduce,其作用是減少Map的輸出的結果集的大小,從而降低網絡的開銷。用戶通過job.setCombinerClass(IntSumReducer.class)指定Combine的實現類;Combine其實就是在Map端先執行一次用戶的reduce方法,先在中間進行一次計算,從而將結果集減少;但是需要注意的是,並不是所有的算法都適用進行多次reduce計算,請謹慎選擇;
  5. 然后,多個map的結果,匯集到reduce,由於WordCount就開啟了一個reduce,故只有一個reduce接收所有map端的輸出;在輸入到用戶定義的reduce方法之前,MapReduce框架還會進行一步排序操作,這步操作類似於在map端進行的排序,將相同key的value合並為list,不同的是排序的輸入,是來自於多個Map的輸出,是根據key排序的K-V對數據;
  6. 經過排序后的K-ValueList對,被輸入到的Reduce方法,在WordCount的reduce方法中,它對每個key對應的value的list進行求和,從而獲得每個單詞的總的出現次數。

以上就是WordCount的概要的計算流程,后面的小節中將對一個MapReduce作業的工作機制,進行詳細的描述,並針對shuffle過程對MapReduce的計算過程進行更加深入的討論,設計到多分區的數據的計算的情況,展現真正的大數據量情況下Map和Reduce之間的計算流程和協同工作。

MapReduce的工作機制

相關名詞解釋:


jvm

JVM即Java Virtual Machine(Java虛擬機)的縮寫,是一個Java用戶程序進程的運行環境。

Client

Client(客戶端):編寫MapReduce代碼,配置作業,提交作業

JobTracker

JobTracker用於協調作業的運行。是一個Java應用程序,其主類是JobTracker。JobTracker是運行在Master節點的Java進程。Hadoop集群是一個采用主從方式的分布式處理系統,其中我們稱主節點,即負責監控、管理從節點的節點為Master節點。而Jobtracker正是,管理、監控MapReduce作業運行的Java進程,故部署於Master節點之上。

TaskTracker

TaskTracker即為運行於從節點之上,真正執行划分后的任務的Java進程,主類是TaskTracker。

HDFS

Hdfs是的Hadoop系統中除了MapReduce外,另一個重要的組成部分,Hdfs是一套分布式的邏輯概念上的文件系統,也是主從模式,Master節點稱為NameNode,Slave節點稱為DataNode,內部將數據分散、冗余的存儲在各個DataNode,由NameNode統一管理,對外呈現一個完整的屏蔽物理存儲的邏輯文件系統。

MapReduce 1工作原理詳述:


MapReduce工作機制詳述:

  1. 用戶代碼通過job.waitForCompletion(true)開啟了該MapReduce作業的執行,創建JobClient類對象,調用submitJob()函數;submitJob()函數完成了如下操作:
  2. 通過New JobId() 函數向JobTracker獲取作業Id,
    JobClient對MapReduce作業的相關信息進行檢查和計算:檢查輸出路徑是否存在,若存在則拋出異常,跳出終止作業;計算作業的輸入分片,如果分片無法計算,則報錯退出程序,例如輸入路徑不存在等;
  3. 將運行作業所需資源(包括JAR包,作業配置,計算所得的分片信息),發送到Hdfs上的公共目錄中;
  4. 告知JobTracker作業准備就緒,向JobTracker提交作業;至此完成了submitJob()函數所有的工作。
    提交作業后,waitForCompletion()每秒輪詢作業的進度,如果發現自上次報告后有改變,便把進度報告到控制台。作業完成后,如果成功就顯示作業計數器;如果失敗則導致作業失敗的錯誤被記錄到控制台。
  5. JobTracker接收到submitJob()的請求,初始化作業,創建JobInProgress對象來跟蹤和調度這個作業,並將JobInProgress對象加入作業調度隊列;
  6. JobTracker從HDFS中獲取作業的分片信息,根據分片的個數創建對應的TaskInProgress對象監控和調度Map任務,並根據分區的個數或者是用戶指定的數目(缺省為1),創建TaskInProgress對象監控和調度Reduce任務;
  7. Map/Reduce任務的分配:
    Tasktracker作為MapReduce框架中的Slave節點,會通過一個簡單的循環定時(可配置,缺省為10秒)通過RPC向JobTracker發送心跳,以便使JobTracker知曉TaskTracker是否存活,同時充當JobTracker與TaskTracker之間的通信通道;TaskTracker在發送的心跳同時,會告知JobTracker自己是否准備好運行新的任務,JobTracker將其組建為可運行任務的列表,供Map/Reduce任務使用。在向TaskTracker分配任務之前,JobTracker會根據作業調度(默認方法是維護一個作業優先級列表)的算法,首先選定一個要執行的作業,而后對作業的任務進行分配。對於Map和Reduce任務,每一個TaskTracker都有一定數量的限制,被稱為任務槽位,例如,一個TaskTracker只能同時運行兩個Map作業和兩個Reduce作業,這個個數受到TaskTracker所在的機器的Cpu和內存的限制;其次,對於任務槽的分配,會先對Map任務進行分配,然后再對Reduce任務進行分配,原因很明顯,由於Map任務是首先執行的,一個作業中,只有執行完了Map任務,才能執行Reduce任務。對於Map任務的分配,為了減少網絡的延遲和消耗,JobTracker對於Map任務的分配采取數據本地化和機架本地化原則,盡量使Map任務所在的機器與該Map任務輸入的分片數據所在的物理存儲處於最近的網絡位置;而對於Reduce任務則沒有必要做這種考慮,由於Reduce的輸入數據來源於Map任務的輸出,而Map會將結果數據保存在本地的磁盤上,由Reduce任務從相應的Map任務所在的機器上拉去,故無法避免網絡延遲,故對Reduce任務分配 ,即是取可運行列表第一個即可。
  8. TaskTracker通過心跳通信,獲得了一個任務,將作業的JAR包和配置,從Hdfs共享目錄中復制到本地文件系統,在本地創建臨時工作目錄,將JAR包解壓到臨時工作目錄中;
  9. TaskTracker創建TaskInProgress對象監控和調度Map/Reduce任務。TaskInproress會創建TaskRunner,從而啟動真正運行JAR包程序的子進程Child。
  10. Child子進程會加載JAR包執行Map/Reduce任務,開始任務的執行。至此一個MapReduce作業完成了從提交、分配、執行的過程,但是在執行過程中,還涉及到一個重要的問題即作業的進度和狀態是如何更新的呢?下面我們將詳細講解。
    除了map任務和reduce任務,還會創建兩個任務:作業創建和作業清理。這兩個任務在TaskTracker中執行,在map任務運行之前來創建任務,在所有reduce任務完成之后完成清理工作,作業創建任務為作業創建輸出路徑和臨時工作空間,作業清理清除作業運行過程中的臨時目錄。

MapReduce作業狀態和進度的更新

狀態和進度的定義

MapReduce作業是一個時間運行的批量作業,運行時間范圍從數秒到數小時。這是一個很長的時間段,對於用戶而言,能夠得知作業的進展是很重要的。一個作業和它的每個任務都有一個狀態,包括:作業或任務的狀態(比如,運行狀態、成功完成,失敗狀態等)、作業計數器的值、狀態消息等。這些狀態是如何在作業執行期間不斷改變,他們是如何與客戶端通信的呢?

一個Mapreduce作業在運行時,對其進度(即任務完成的百分比)要要保持追蹤,在MapReduce框架中是如何定義進度的呢?對於Map任務而言,任務進度就是已處理的輸入所占的比例;而對於Reduce任務就稍微復雜一些。Reduce的整個過程一般分為三個部分,與shuffle(后面詳細介紹)的三個階段相對應,即復制、排序和Reducer計算。MapReduce定義這三個階段各占1/3,故例若已經執行的reducer計算一半的輸入了,那么整個Reduce任務就執行了5/6了(1/3 + 1/3 + 1/6)。

作業狀態和進度更新

Map/Reduce任務中,也有一組計數器,負責對任務運行過程中的各個事件進行計數,這些計數器要么負責內置框架中,例如已寫入的map輸入記錄數,要么由用戶自己定義。

如果任務報告了進度,便會設置一個標志以表明狀態變化將被發送到tasktracker。有一個獨立的線程每隔3秒檢查一次此標志,如果已設置,則告知tasktracker當前任務狀態。同時,tasktracker每隔5秒發送“心跳”到JobTracker,並且由tasktracker運行的所有任務的狀態都會在調用中被發送至JobTracker。

JobTracker將這些更新合並起來,產生一個表明所有運行作業及其所含任務狀態的全視圖。JobClient通過getJobStatus()方法每秒查詢JobTracker來接收最新狀態。

客戶端通過使用JobClient的getJob()方法來得到一個RunningJob的實例,包含了作業的所有狀態信息。

MapReduce子任務失敗
  • 用戶代碼拋出異常
    一般最常見的是用戶代碼拋出運行時異常導致失敗。如果發生這種情況,子任務JVM進程,即Child進程,會在退出前向其父進程TaskTracker發送錯誤報告,錯誤報告將會被記錄在用戶日志當中。TaskTracker會將此次task attempt(MapReduce中將一次Map/Reduce任務的執行成為task attempt)標記為failed,並釋放該任務的槽位運行其他任務。
  • JVM突然退出
    子進程JVM突然退出,可能由於JVM bug而導致MapReduce用戶代碼造成的某些特殊原因造成JVM退出。在這種情況下,TaskTracker會注意到進程已退出,並將此次嘗試標志位failed。
  • 子進程掛起
    一旦TaskTracker注意到已經有一段時間沒有收到進度更新,便會將任務標記為failed。在此之后,JVM子進程將會被自動殺死,任務超時時間是可配置的(mapred.task.timeout)。
  • 任務被終止
    由於推測執行機制(后續介紹),任務的推測副本中有一個成功執行時,其他的副本將會被中止執行,並kill掉。JobTracker得知一個task attempt失敗后(通過與TaskTracker心跳通信得知),將重新調度該任務的執行。JobTracker會嘗試避免重新調度失敗過的TaskTracker上的任務。此外,失敗重試是有次數的限制,這個值是可以配置的,如果有任務的失敗次數大於重試的限制,則會被標志為failed。
    在默認的情況下,一個MapReduce作業的任何一個子任務失敗,都會導致整個作業被標記為失敗,並退出。但是,對於一些應用程序,我們不希望一旦由上述的幾個任務失敗就中止運行整個作業,應為即是有任務失敗,作業的一些結果可能還是可用的。在這種情況下,可以為作業設置在不觸發作業失敗的情況下允許失敗的最大百分比。
TaskTracker失敗

如果一個TaskTracker由於崩潰或者是運行過緩而失敗,他將停止或者是很少發送“心跳”。JobTracker會注意到已經停止發送“心跳”的TaskTracker,並將它從等待任務調度的TaskTracker的列表中移除,並且重新調度任何在其上執行中的任務。

JobTracker失敗

由於本文檔中所討論的hadoop是Master單點架構的,所以一旦JobTracker宕掉了,只能手動重啟了。

推測執行

MapReduce模型將作業分解成任務,然后並行執行各個任務以使整體作業的完成時間少於每個任務順序執行的時間。這使得整個作業對於個別執行緩慢的任務很敏感,因為一個任務執行緩慢會使整個任務的實行時間增長,即”拖后腿“。

推測執行的策略就是在一個作業的所有任務啟動之后,針對那些已經運行了一段時間的任務且該任務比其他任務的平均進度都要慢的任務,則會啟動一個相同的任務作為備份,這就是”推測執行“。當其中任何副本任務成功執行時,其他的備份任務都會被中止,殺掉。

推測執行只是一種優化措施,並不能使作業運行的更加可靠。更應該查找並修復軟件缺陷,是任務不會運行速度過慢。

Shuffle過程剖析

Shuffle描述着將mapper的輸出作為輸入傳遞給reducer的過程(注意不是reduce,是用戶代碼reducer),如圖所示:

在Hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去拉取其它節點上的map task結果。如果集群正在運行的job有很多,那么task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的期望可以有:

  • 完整地從map task端拉取數據到reduce端。
  • 在跨節點拉取數據時,盡可能地減少對帶寬的不必要消耗。
  • 減少磁盤IO對task執行的影響。
map任務端

整個流程可以分為四步。簡單些可以這樣說,每個map task都有一個內存緩沖區,存儲着map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合並,生成最終的正式輸出文件,然后等待reduce task來拉數據。

  1. 每個map任務都有一個環形內存緩沖區,用於緩存mapper函數的輸出,其中緩沖區的大小可通過配置io.sort.mb來進行調整,一旦緩沖區中數據到達某一個閾值(io.sort.spill.prcent,默認80%),一個后台線程便被觸發,鎖定這80%的內存,將它們溢寫(spill)到磁盤中。在寫磁盤的過程中,mapper函數的輸出會繼續寫入余下20%的內存緩沖區中,溢寫和mapper輸出互不影響,如果在此期間內存緩沖區被填滿,map操作會被阻塞直到寫磁盤過程完成。
  2. 守護線程被觸發,將從內存中溢出的數據刷寫到磁盤時,經過如下幾個過程:
    • 排序
      如圖中所示的情況,輸入分片,含有分區,在刷寫到磁盤時,進行二元排序:首先按照分區值由小到大排序、而后按照key值進行字典序排序。

    • Combine
      如果用戶代碼中指定了combiner,則combiner就會在輸出文件寫到磁盤之前、二元排序之后運行。運行combiner的意義在於使map的輸出更緊湊,減少溢寫到磁盤的數據量,使得寫到本地磁盤和傳送給reducer的數據更少。

    • 合並
      一旦內存緩沖區達到溢寫的閾值,就會新建一個溢出寫文件(spill files),一次spill操作就會產生一個溢寫文件,因此在map任務完成最后一條輸出記錄之后,可能會有幾個溢出寫文件。合並是將多個溢寫文件合並到一個文件,所以可能也有相同的key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合並相同的key。 在任務完成之前,溢出寫文件就被合並成一個按照分區、key排序並在排序后經過combiner過程的輸出文件。

即combnier過程在內存溢寫到磁盤和合並磁盤溢寫文件的最后都會發生。

reduce任務端
  • 復制階段
    Reduce端,有一個線程會定期向JobTracker詢問自己所需要的分片數據的map任務是否完成,直到獲取到map的網絡位置后,由多個復制線程從map任務所在機器的磁盤拉去相應的數據
  • 排序階段
    這里的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩沖區中,但是當緩沖區中的數據超過閾值,則會觸發排序。這里需要強調的是,merge有三種形式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認情況下第一種形式不啟用,當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的那個文件,將輸入的key有序數據按照key排序排序,並將相同key的value合並為list。
  • Reducer階段
    排序階段的最后一次合並會作為reducer階段的輸入直接輸出到reducer階段,而后已排序輸入中的每一個key都要調用reducer函數,此階段的輸出直接寫到輸出的文件系統,一般為HDFS。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM