MapReduce過程詳解(基於hadoop2.x架構)


本文基於hadoop2.x架構詳細描述了mapreduce的執行過程,包括partition,combiner,shuffle等組件以及yarn平台與mapreduce編程模型的關系。

mapreduce的簡介和優點

  • mapreduce是一個分布式運算程序的編程框架,是hadoop數據分析的核心.
  • mapreduce的核心思想是將用戶編寫的邏輯代碼和架構中的各個組件整合成一個分布式運算程序,實現一定程序的並行處理海量數據,提高效率.
  • 海量數據難以在單機上處理,而一旦將單機版程序擴展到集群上進行分布式運行勢必將大大增加程序的復雜程度.引入mapreduce架構,開發人員可以將精力集中於數據處理的核心業務邏輯上,而將分布式程序中的公共功能封裝成框架,以降低開發的難度.
  • 一個完整的mapreduce程序有三類實例進程
    1. MRAppMaster:負責整個程序的協調過程
    2. MapTask:負責map階段的數據處理
    3. ReduceTask:負責reduce階段的數據處理

案例(統計各個手機號的上傳和下載流量總和)

數據展示

1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200

數據解釋:

每行數據的第二列數據是手機號,倒數第三列表示上行流量,倒數第二列表示下行流量

要求:

根據總流量降序排列

輸出格式要求:

手機號	上行流量	下行流量	總流量

創建bean對象用於封裝上行流量,下行流量和總流量:

package com.xiaojie.flowcount;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

//作為key輸出的時候都要排序
//不要排序的話,可實現Writable
//實現WritableComparable是為了實現比較大小,排序的功能
public class FlowBean implements WritableComparable<FlowBean>{
    private Long upFlow;
    private Long downFlow;
    private Long sumFlow;

    //反序列化時需要反射調用空參構造函數,顯示地定義一個
    public FlowBean(){}

    public FlowBean(Long upFlow, Long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow; 
    }
    public void set(Long upFlow, Long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow; 
    }
    public Long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }
    public Long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    //反序列化方法
    public void readFields(DataInput in) throws IOException {
        //反序列化的順序和序列化的順序一致
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    //序列化方法
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }
	//toString方法可控制bean對象被寫出在文件時的格式
    @Override
    public String toString() {
        return  upFlow + "\t" + downFlow + "\t" + sumFlow ;
    }
    //大的話返回-1,表示排在前面,即降序排序
    public int compareTo(FlowBean o) {	
        return this.sumFlow > o.getSumFlow()?-1:1;
    }
}

第一個map方法:

    static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
//			每一行讀進來的數據轉化為String類型
            String line = value.toString();
            //根據tab分割
            String[] fields = line.split("\t");
            //取出手機號
            String phonenum = fields[1];
            //取出上行流量  將String轉為Long
            Long upFlow = Long.parseLong(fields[fields.length-3]);
            //取出下行流量
            long downFlow = Long.parseLong(fields[fields.length-2]);
//			把數據發送給reduce
            context.write(new Text(phonenum), new FlowBean(upFlow, downFlow));
        }
    }

partition(分區方法):

//根據省份分發給不同的reduce程序,其輸入數據是map的輸出
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{
    public static HashMap<String, Integer> provinceDict = new HashMap<String, Integer>();
    static{
        provinceDict.put("136", 0);
        provinceDict.put("137", 1);
        provinceDict.put("138", 2);
        provinceDict.put("139", 3);
    }
    //返回的是分區號  給哪個reduce
    @Override
    public int getPartition(Text key, FlowBean value, int num_partitioner) {
//		根據手機號前三位分省份,分給不同的reduce
        String phone_num = key.toString().substring(0, 3);
        Integer provinceId = provinceDict.get(phone_num);
        return provinceId==null?4:provinceId;
    }
}

第一個reduce方法:

static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    //(18989,[bean1,bean2,bean3])
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Context context)
			throws IOException, InterruptedException {
		long sum_upflow = 0;
		long sum_downflow = 0;
		
//			將上行流量和下行流量分別累加
		
		for(FlowBean bean:values){
			sum_upflow += bean.getUpFlow();
			sum_downflow += bean.getDownFlow();
		}
		FlowBean resultBean = new FlowBean(sum_upflow,sum_downflow);
		context.write(key, resultBean);
	}
}

第一個驅動類

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
//		將默認配置文件傳給job
        Job job = Job.getInstance(conf);
//		告訴yarn  jar包在哪
        job.setJarByClass(FlowCount.class);
        //指定job要使用的map和reduce
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
//		指定map的輸出類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
//		指定最終輸出的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
//		job的輸入數據所在的目錄
//		第一個參數:給哪個job設置
//		第二個參數:輸入數據的目錄,多個目錄用逗號分隔
        FileInputFormat.setInputPaths(job, new Path("/home/miao/input/flowcount/"));
//		job的數據輸出在哪個目錄
        FileOutputFormat.setOutputPath(job, new Path("/home/miao/output/flowcount/"));
        //將jar包和配置文件提交給yarn
//		submit方法提交作業就退出該程序
//		job.submit();
//		waitForCompletion方法提交作業並等待作業執行
//		true表示將作業信息打印出來,該方法會返回一個boolean值,表示是否成功運行
        boolean result = job.waitForCompletion(true);
//		mr運行成功返回true,輸出0表示運行成功,1表示失敗
        System.exit(result?0:1);
    }

執行結果:

13726230503	4962	49362	54324
13826544101	528	0	528
13926251106	480	0	480
13926435656	264	3024	3288

結果分析:

輸出數據的格式已經符合了要求,但是並沒有按照總流量大小降序排列,需要再寫第二個mapreduce來達到最終結果

第二個map方法:

        static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

        FlowBean bean = new FlowBean();
        Text phone = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //拿到的是上一個mapreduce程序的輸出結果,各手機號和流量信息
            String line = value.toString();
            String[] fields = line.split("\t");
            //獲取手機號
            String phonenum = fields[0];
            //獲取上行流量
            long upFlow = Long.parseLong(fields[1]);
            //獲取下行流量
            long downFlow = Long.parseLong(fields[2]);
            //多次調用map函數時,只創建一個對象
            bean.set(upFlow, downFlow);
            phone.set(phonenum);

//			write時,就將bean對象序列化出去了  reducer那邊反序列化回對象  根據bean對象的sumFlow排序
            //map結束后會分發給reduce,默認根據key的hash函數進行分發
            //reduce要實現全局有序,必須只有一個reduce,否則分成多個reduce,只有在每個reduce產生的文件里是有序的
            context.write(bean, phone);
        }

第二個reduce方法:

static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
	
	//<bean(),phonenum> 相同key的被分為一組,一起執行一次reduce
	//對於key是對象的情況下,不可能有兩個對象相同(即使上行流量下行流量都相同),所以每組都只有一條數據
	@Override
	protected void reduce(FlowBean bean, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		context.write(values.iterator().next(), bean);
	}
}

第二個驅動方法:

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
//		將默認配置文件傳給job
        Job job = Job.getInstance(conf);
        //指定自定義的map數據分區器
		//job.setPartitionerClass(ProvincePartitioner.class);
        //根據partitioner里的分區數量,設置reduce的數量
		//job.setNumReduceTasks(5);
//		告訴yarn  jar包在哪
        job.setJarByClass(FlowCountSort.class);
        //指定job要使用的map和reduce
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);
//		指定map的輸出類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
//		指定最終輸出的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
//		job的輸入數據所在的目錄
//		第一個參數:給那個job設置
//		第二個參數:輸入數據的目錄,多個目錄用逗號分隔
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //適用於做測試,不建議這么做 
        Path outpath = new Path(args[1]);
        //根據配置文件獲取hdfs客戶端對象
        FileSystem fs = FileSystem.get(conf);
//		如果輸出目錄存在就將其刪除
        if(fs.exists(outpath)){
           fs.delete(outpath, true);
        }
//		job的數據輸出在哪個目錄
        FileOutputFormat.setOutputPath(job, outpath);
        //將jar包和配置文件提交給yarn
//		submit方法提交作業就退出該程序
//		job.submit();
//		waitForCompletion方法提交作業並等待作業執行
//		true表示將作業信息打印出來,該方法會返回一個boolean值,表示是否成功運行
        boolean result = job.waitForCompletion(true);
//		mr運行成功返回true,輸出0表示運行成功,1表示失敗
        System.exit(result?0:1);
    }

輸出結果:

13726230503	4962	49362	54324
13926435656	264	3024	3288
13826544101	528	0	528
13926251106	480	0	480

結果分析:

已滿足格式要求,並按總流量降序

mapreduce詳細流程圖文詳解

1.切片

  • 在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize))
  • minSize的默認值是1,而maxSize的默認值是long類型的最大值,即可得切片的默認大小是blockSize(128M)
  • maxSize參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的值
  • minSize參數調的比blockSize大,則可以讓切片變得比blocksize還大
  • hadoop為每個分片構建一個map任務,可以並行處理多個分片上的數據,整個數據的處理過程將得到很好的負載均衡,因為一台性能較強的計算機能處理更多的數據分片.
  • 分片也不能切得太小,否則多個map和reduce間數據的傳輸時間,管理分片,構建多個map任務的時間將決定整個作業的執行時間.(大部分時間都不在計算上)

如果文件大小小於128M,則該文件不會被切片,不管文件多小都會是一個單獨的切片,交給一個maptask處理.如果有大量的小文件,將導致產生大量的maptask,大大降低集群性能.

大量小文件的優化策略:

(1) 在數據處理的前端就將小文件整合成大文件,再上傳到hdfs上,即避免了hdfs不適合存儲小文件的缺點,又避免了后期使用mapreduce處理大量小文件的問題。(最提倡的做法)

(2)小文件已經存在hdfs上了,可以使用另一種inputformat來做切片(CombineFileInputFormat),它的切片邏輯和FileInputFormat(默認)不同,它可以將多個小文件在邏輯上規划到一個切片上,交給一個maptask處理。

2.環形緩存區

  • 經過map函數的邏輯處理后的數據輸出之后,會通過OutPutCollector收集器將數據收集到環形緩存區保存。
  • 環形緩存區的大小默認為100M,當保存的數據達到80%時,就將緩存區的數據溢出到磁盤上保存。

3.溢出

  • 環形緩存區的數據達到其容量的80%時就會溢出到磁盤上進行保存,在此過程中,程序會對數據進行分區(默認HashPartition)和排序(默認根據key進行快排)
  • 緩存區不斷溢出的數據形成多個小文件

4.合並

  • 溢出的多個小文件各個區合並在一起(0區和0區合並成一個0區),形成大文件
  • 通過歸並排序保證區內的數據有序

5.shuffle

從過程2到過程7之間,即map任務和reduce任務之間的數據流稱為shuffle(混洗),而過程5最能體現出混洗這一概念。一般情況下,一個reduce任務的輸入數據來自與多個map任務,多個reduce任務的情況下就會出現如過程5所示的,每個reduce任務從map的輸出數據中獲取屬於自己的那個分區的數據。

6.合並

運行reducetask的節點通過過程5,將來自多個map任務的屬於自己的分區數據下載到本地磁盤工作目錄。這多個分區文件通過歸並排序合並成大文件,並根據key值分好組(key值相同的,value值會以迭代器的形式組在一起)。

7.reducetask

reducetask從本地工作目錄獲取已經分好組並且排好序的數據,將數據進行reduce函數中的邏輯處理。

8.輸出

每個reducetask輸出一個結果文件。

partition(分區)

數據從環形緩存區溢出到文件的過程中會根據用戶自定義的partition函數進行分區,如果用戶沒有自定義該函數,程序會用默認的partitioner通過哈希函數來分區,hash partition 的好處是比較彈性,跟數據類型無關,實現簡單,只需要設置reducetask的個數。分區的目的是將整個大數據塊分成多個數據塊,通過多個reducetask處理后,輸出多個文件。通常在輸出數據需要有所區分的情況下使用自定義分區,如在上述的流量統計的案例里,如果需要最后的輸出數據再根據手機號碼的省份分成幾個文件來存儲,則需要自定義partition函數,並在驅動程序里設置reduce任務數等於分區數(job.setNumReduceTasks(5);)和指明自己定義的partition(job.setPartitionerClass(ProvincePartitioner.class))。在需要獲取統一的輸出結果的情況下,不需要自定義partition也不用設置reducetask的數量(默認1個)。

自定義的分區函數有時會導致數據傾斜的問題,即有的分區數據量極大,各個分區數據量不均勻,這會導致整個作業時間取決於處理時間最長的那個reduce,應盡量避免這種情況發生。

combiner(map端的reduce)

集群的帶寬限制了mapreduce作業的數量,因此應該盡量避免map和reduce任務之間的數據傳輸。hadoop允許用戶對map的輸出數據進行處理,用戶可自定義combiner函數(如同map函數和reduce函數一般),其邏輯一般和reduce函數一樣,combiner的輸入是map的輸出,combiner的輸出作為reduce的輸入,很多情況下可以直接將reduce函數作為conbiner函數來使用(job.setCombinerClass(FlowCountReducer.class);)。combiner屬於優化方案,所以無法確定combiner函數會調用多少次,可以在環形緩存區溢出文件時調用combiner函數,也可以在溢出的小文件合並成大文件時調用combiner。但要保證不管調用幾次combiner函數都不會影響最終的結果,所以不是所有處理邏輯都可以使用combiner組件,有些邏輯如果在使用了combiner函數后會改變最后rerduce的輸出結果(如求幾個數的平均值,就不能先用combiner求一次各個map輸出結果的平均值,再求這些平均值的平均值,這將導致結果錯誤)。

combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量。(原先傳給reduce的數據是(a,(1,1,1,1,1,1...)),使用combiner后傳給reduce的數據變為(a,(4,2,3,5...)))

分組

分組和上面提到的partition(分區)不同,分組發生在reduce端,reduce的輸入數據,會根據key是否相等而分為一組,如果key相等的,則這些key所對應的value值會作為一個迭代器對象傳給reduce函數。以單詞統計為例,reduce輸入的數據就如:第一組:(a,(1,3,5,3,1))第二組:(b,(6,2,3,1,5))。上述例子也可以看出在map端是執行過combiner函數的,否則reduce獲得的輸入數據是:第一組:(a,(1,1,1,1,1,...))第二組:(b,(1,1,1,1,1...))。對每一組數據調用一次reduce函數。

值得一提的是如果key是用戶自定義的bean對象,那么就算兩個對象的內容都相同,這兩個bean對象也不相等,也會被分為兩組。如上述流量統計案例里自定義的flowbean對象,就算是上行流量下行流量相等的兩個flowbean對象也不會被分為一組。這種bean作為key的情況下,如果處理邏輯需要將兩個bean歸為一個組,則需要另外的方法(我會在之后的文章中給出)。

排序

在整個mapreduce過程中涉及到多處對數據的排序,環形緩存區溢出的文件,溢出的小文件合並成大文件,reduce端多個分區數據合並成一個大的分區數據等都需要排序,而這排序規則是根據key的compareTo方法來的。

map端輸出的數據的順序不一定是reduce端輸入數據的順序,因為在這兩者之間數據經過了排序,但reduce端輸出到文件上顯示的順序就是reduce函數的寫出順序。在沒有reduce函數的情況下,顯示地在驅動函數里將reduce的數量設置為0(設置為0后表示沒有reduce階段,也就沒有shuffle階段,也就不會對數據進行各種排序分組),否則雖然沒有reduce邏輯,但是還是會有shuffle階段,map端處理完數據后將數據保存在文件上的順序也不是map函數的寫出順序,而是經過shuffle分組排序過后的順序

MapTask和ReduceTask的並行度

有幾個maptask是由程序決定的,默認情況下使用FileInputFormat讀入數據,maptask數量的依據有一下幾點:

1.文件大小小於128M(默認)的情況下,有幾個文件就有幾個maptask

2.大於128M的文件,根據切片規則,有幾個分片就有幾個maptask

3.並不是maptask數量越多越好,太多maptask可能會占用大量數據傳輸等時間,降低集群計算時間,降低性能。大文件可適當增加blocksize的大小,如將128M的塊大小改為256M或512M,這樣切片的大小也會增大,切片數量也就減少了,相應地減少maptask的數量。如果小文件太多,可用上述提到過的小文件優化策略減少maptask的數量。

有幾個reducetask是用戶決定的,用戶可以根據需求,自定義相應的partition函數,將數據分成幾個區,相應地將reducetask的數量設置成分區數量。(設置5個reducetask,job.setNumReduceTasks(5))

YARN

1、用戶提交的程序的運行邏輯對yarn是透明的,yarn並不需要知道。

2、yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)。

3、yarn中的老大叫ResourceManager(知道所有小弟的資源情況,以做出資源分配),yarn中具體提供運算資源的角色叫NodeManager(小弟)。

4、yarn與運行的用戶程序完全解耦,就意味着yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序...只要他們各自的框架中有符合yarn規范的資源請求機制即可。

6、Yarn是一個通用的資源調度平台,企業中存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享。

7、Yarn是一個資源調度平台,負責為運算程序提供服務器運算資源,相當於一個分布式的操作系統平台,而mapreduce等運算程序則相當於運行於操作系統之上的應用程序。

MapReduce與Yarn

客戶端的配置信息mapreduce.framework.name為yarn時,客戶端會啟動YarnRunner(yarn的客戶端程序),並將mapreduce作業提交給yarn平台處理。

1.向ResourceManager請求運行一個mapreduce程序。

2.ResourceManager返回hdfs地址,告訴客戶端將作業運行相關的資源文件上傳到hdfs。

3.客戶端提交mr程序運行所需的文件(包括作業的jar包,作業的配置文件,分片信息等)到hdfs上。

4.作業相關信息提交完成后,客戶端用過調用ResourcrManager的submitApplication()方法提交作業。

5.ResourceManager將作業傳遞給調度器,調度器的默認調度策略是先進先出。

6.調度器尋找一台空閑的節點,並在該節點隔離出一個容器(container),容器中分配了cpu,內存等資源,並啟動MRAppmaster進程。

7.MRAppmaster根據需要運行多少個map任務,多少個reduce任務向ResourceManager請求資源。

8.ResourceManager分配相應數量的容器,並告知MRAppmaster容器在哪。

9.MRAppmaster啟動maptask。

10.maptask從HDFS獲取分片數據執行map邏輯。

11.map邏輯執行結束后,MRAppmaster啟動reducetask。

12.reducetask從maptask獲取屬於自己的分區數據執行reduce邏輯。

13.reduce邏輯結束后將結果數據保存到HDFS上。

14.mapreduce作業結束后,MRAppmaster通知ResourceManager結束自己,讓ResourceManager回收所有資源。

數據本地化優化

在第7步,MRAppmaster向ResourceManager請求容器用於運行maptask時,在請求信息中有map所需要處理的分片數據所在的主機和相應的機架信息(即告訴MRAppmaster需要處理的數據在哪里),調度器根據這些信息做出調度決策。

1、最理想的情況是將任務分配到數據本地化的節點上,這樣一來map的輸入數據不需要從其他節點通過網絡傳輸過來,大大提高了性能。

2、如果存儲所需處理的三個HDFS數據塊備份的三個節點都在運行其他map任務,處於忙碌狀態,資源不足以再開辟一個容器來運行maptask。此時調度器會選擇一個與數據所在節點同機架的節點來開辟容器,運行maptask。

3、如果在同一機架上的節點都處於忙碌狀態,調度器才會選擇跨機架的節點,這會導致機架與機架之間的數據傳輸,是三種方式中性能最低的。

map和reduce的輸出結果存放位置

map任務將其輸出寫到本地硬盤而不是HDFS,因為map任務的輸出結果是中間結果,並不是最終結果,在mr程序結束后,map的輸出結果就可以被刪除,將其存在可靠的HDFS上一來是沒必要浪費HDFS集群的空間,二來是沒有存在本地硬盤的速度快。

reduce任務的輸出是最終的輸出結果,將其存在HDFS上可保證數據的安全。

map,reduce任務分配

默認情況下小於10個mapper且只有1個reducer且所需處理的數據總大小小於1個HDFS塊的作業是小作業(可通過mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces,mapreduce.job.ubertask.maxbytes改變一個作業的默認配置),對於小作業,MRAppmaster會選擇在與它同一個容器里運行任務(順序運行),而不會去向ResourceManager請求資源。(mapreduce.job.ubertask.enable設為false將關閉小作業這一性質)。

作業不是小作業的情況下,MRAppmaster會向ResourceManager請求資源,ResourceManager根據數據本地化優化原則分配相應的容器。在默認情況下map任務和reduce任務都分配到1024MB的內存(可通過mapreduce.map.memory.mb和mapreduce.map.memory.mb來設置map任務和reduce任務的使用內存空間)。

調度器在分配容器時有兩個參數,yarn.schedule.capacity.minimum-allocation-mb和yarn.schedule.capacity.minimum-allocation-mb,分別表示容器的最小可分配內存和最大可分配內存,默認值分別是1024MB和10240MB,手動給map,reduce任務分配內存空間時,應設置為容器最小可分配內存的整數倍且不大於最大可分配內存。在不設置map和reduce任務的使用內存情況下,調度器會自己根據任務的需要分配最接近的倍數的內存給容器。

map,reduce任務的並行

小作業的情況下,所有的map任務會在一個容器里順序執行,所有map任務處理完后再執行1個reduce任務。是大作業的話,所有map任務會分別發送到不同容器里並行運行。而在一個節點上可以並行運行幾個map,reduce任務,取決於節點的資源和每個任務所需的資源(如節點資源為8核8G可用內存,每個任務需要1個核1G內存,則該節點理論上可以開辟8個容器,並行執行8個任務)。在多個節點上的任務並行更是理所當然的,值得一提的是屬於同一個作業的map任務和reduce任務不能並行,reduce任務一定是在接收到來自所有map任務的分區數據后再執行。

mapreduce輸出數據壓縮

map或reduce函數的輸出可以壓縮,減少網絡io時間和存儲空間,但相應地增加了cpu負擔。

  • 計算密集型的任務,少用壓縮,將更多的cpu性能用在計算上
  • io密集型的任務,使用壓縮提高mr速度

相應配置:

#map輸出壓縮
mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
#也可在驅動類中寫相應的代碼
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);

#reduce輸出壓縮
mapreduce.output.fileoutputformat.compress=true
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type=RECORD
#也可在驅動類中寫相應代碼
Job job = Job.getInstance(conf);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM