Hadoop_MapReduce流程


Hadoop學習筆記總結

01. MapReduce

1. Combiner(規約)

Combiner號稱本地的Reduce。

問:為什么使用Combiner?
答:Combiner發生在Map端,對數據進行規約處理,數據量變小了,傳送到reduce端的數據量變小了,傳輸時間變短,作業的整體時間變短。減少了reduce的輸入。

問:為什么Combiner不作為MR運行的標配,而是可選步驟哪?
答:因為不是所有的算法都適合使用Combiner處理,例如求平均數。使用了規約,造成了最終結果的不同。

問:Combiner本身已經執行了reduce操作,為什么在Reducer階段還要執行reduce操作哪?
答:combiner操作發生在map端的,處理一個任務所接收的文件中的數據,不能跨map任務執行;並且,處理的是局部有序數據,不是全局有序數據。

job.setCombinerClass(MyCombiner.class);//其實就是一個另一個Reducer函數

2. Shuffle機制

Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。(包括了分組、排序、規約和緩存機制)

(1) split切片概念
  1. map的數量不是由Block塊數量決定,而是由split切片數量決定。
  2. 切片是一個邏輯概念,指的是文件中數據的偏移量范圍
  3. 切片的具體大小應該根據所處理的文件的大小來調整。
  4. 最佳的分片大小應該與塊相同。因為如果跨越了兩個數據塊,節點一般不會同時存儲這兩個塊,因而會造成網絡傳輸,較低效率。
  5. 數據本地化優化。(1)、避免了調用同一個機架中空閑機器運行該map任務,(2)、其它機架來處理(小概率)。浪費集群寬帶資源。
(2) shuffle流程

Map端:分區排序規約

  1. 每個map有一個環形內存緩沖區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個后台線程把內容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。
  2. 寫磁盤前,后台線程會根據將要傳入的reducer將數據分區partition,排序sort。一個個小spill的寫,寫滿了就再新建一個溢出文件(spill file)。在每個分區中,后台線程會按鍵進行內排序,再寫入磁盤spill。如果有combiner,運行combine使得map的輸出數據更加緊湊,減少傳遞到磁盤和傳遞給reduce的數據。
  3. 等最后記錄寫完,合並全部溢出寫文件為一個已分區且已排序的文件。combiner也會在寫到磁盤之前再次執行。
    map總結:map輸出-->環形緩沖區-->分區,排序,規約(如果有)-->小spill文件-->合並到磁盤。

Reduce端:復制,合並排序

  1. reduce任務的復制階段。Reducer通過Http方式得到輸出文件的分區。(通過appMaster告知)。少量的復制線程並行獲取map的輸出,每個map完成時間不同,只要一個map完成,reduce就開始復制輸出。
  2. NodeManager為運行Reduce任務。復制階段把Map輸出(整體溢出文件)復制到Reducer的內存或磁盤(由map輸出大小決定)。
  3. 排序階段。合並map輸出在Reducer的內存或磁盤中(關於為什么內存和磁盤都有,日后總結)。然后也是基於key值得排序,將局部有序的數據合並成全局有序的數據。然后執行reduce方法。
    reduce總結:復制map輸出,合並排序

總結,shuffle流程的過程其實就是,分組+排序(map和reduce都有排序)的過程,還有在map和reduce端的緩存機制。
在map端的排序和reduce端的排序是分別有序和整體排序的關系。
想要優化整個過程,可以增大內存,減少溢出文件的IO和提高溢出閥值等

(3) MRAppMaster的任務監控調度機制,帶shuffle機制的job流程圖


和之前Yarn框架管理,組成了整個MapReduce資源的分配和任務調度的過程。

3. Partitioner分區和ReduceTasks數量

Map的結果,會通過partition函數分發到Reducer上。它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。

默認的HashPartitioner

public class HashPartitioner<K, V> extends Partitioner<K, V> {
	public int getPartition(K key, V value,
                      int numReduceTasks) {
 	return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

即:key運算結果相同的被分到同一組,哪個key到哪個Reducer的分配過程,是由Partitioner規定的。
輸入是Map的結果對<key, value>和Reducer的數目,輸出則是分配的Reducer(整數編號)。就是指定Mappr輸出的鍵值對到哪一個reducer上去。

總結:分區Partitioner主要作用在於以下兩點
(1)根據業務需要,產生多個輸出文件;
(2)多個reduce任務並發運行,提高整體job的運行效率

job.setPartitionerClass(AreaPartitioner.class);
job.setNumReduceTasks(3);

Ps:當分組有6個。當ReduceTask任務有10個時,只有6個輸出文件有數據,其它的為空文件;當Task任務時5個時,報錯,因為有的分組找不到reduce函數。

4. 自定義分區、排序、分組的總結

在第3節已經講述了分區(partition)的概念,在分區過后,每個分區里面的數據按照key進行排序、分組。排序會在map和reduce過程中多次調用。在reduce端排序之后,具有相同Key值的記錄是屬於同一個分組的傳入reduce()方法。
下面以二次排序SecondSort為例講解:
job.setSortComparatorClass()
job.setGroupingComparatorClass()

1.自定義key

創建的自定義key:NewPairKey需要實現的接口:WritableComparable,可序列的並且可比較的

static class NewPairKey implements WritableComparable<NewPairKey> {
	int first = 0;
	int second = 0;

	//空構造器和含參構造器

	//序列化,將IntPair轉化成使用流傳送的二進制
	public void write(DataOutput out) throws IOException {}

	//反序列化,從流中的二進制轉換成IntPai
	public void readFields(DataInput in) throws IOException {}

	// NewPairKey的比較邏輯
	public int compareTo(NewPairKey o) {}

	// 新建的類應該重寫的方法
	public String toString() {}
	public boolean equals(Object obj) {}
	public int hashCode() {}
}
2.排序的比較邏輯

Key排序的規則:

  1. 如果設置了job的setSortComparatorClass(KeyComparator.class),作為key比較函數類

     public static class KeyComparator extends WritableComparator{}
    
  2. 如果沒有設置,則系統會自動使用自定義類NewPairKey中實現的compareTo()方法作為key比較的。上面的方法不是必要的。但是都是構造了新的比較邏輯。

     // 當NewPairKey進行排序時,會調用該方法,第一列升序排列,第一列不同時,第二列升序排列。
     @Override
     public int compareTo(NewPairKey o) {
     int minus = this.first - o.first;
     if (minus != 0) {
     return minus;
     }
     return this.second - o.second;
     }
    

Ps:上面的compareTo(NewPairKey o)參數類型是NewPairKey,需要將字節流反序列化成NewPairKey對象再比較,造成了新建對象的額外開銷。
RawComparator接口允許其實現直接比較數據流中的記錄,無需先把數據流反序列化為對象,加速比較!。
如下面IntWritable默認是實現源碼。

public class IntWritable implements WritableComparable<IntWritable> {
	private int value;

	public IntWritable() {}

	public IntWritable(int value) {
		set(value);
	}

	/** Set the value of this IntWritable. */
	public void set(int value) {
		this.value = value;
	}

	/** Return the value of this IntWritable. */
	public int get() {
		return value;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		value = in.readInt();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(value);
	}

	/** Returns true iff <code>o</code> is a IntWritable with the same value. */
	@Override
	public boolean equals(Object o) {
		if (!(o instanceof IntWritable))
			return false;
		IntWritable other = (IntWritable) o;
		return this.value == other.value;
	}

	@Override
	public int hashCode() {
		return value;
	}

	/** Compares two IntWritables. */
	@Override
	public int compareTo(IntWritable o) {
		int thisValue = this.value;
		int thatValue = o.value;
		return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
	}

	@Override
	public String toString() {
		return Integer.toString(value);
	}

	/** A Comparator optimized for IntWritable. */
	public static class Comparator extends WritableComparator {
		public Comparator() {
			super(IntWritable.class);
		}

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			int thisValue = readInt(b1, s1);
			int thatValue = readInt(b2, s2);
			return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0
					: 1));
		}
	}

	static { // register this comparator
		WritableComparator.define(IntWritable.class, new Comparator());
	}
}

從IntWritable 的例子中,可以看到:
1、把WritableComparator當IntWritable的內部類,寫到里面
2、如何注冊一個自定義的WritableComparator。
此處注意一下,關注static { WritableComparator.define(IntWritable.class,new Comparator())} 的位置。
參看《http://blog.itpub.net/30066956/viewspace-2112283/》

3.分組函數類

WritableComparator是RawComparator接口的實現。功能:1.提供了原始compare()方法的一個默認實現;2.和充當RawComparator實例的工廠(《權威指南P105》)。和WritableComparable區別詳情見《http://www.cnblogs.com/robert-blue/p/4159434.html》
job.setGroupingComparatorClass(MyGroupingComparator.class).

static class MyGroupingComparator implements RawComparator<NewPairKey> {
	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
				b2, s2, Integer.SIZE / 8);
	}

	@Override
	public int compare(NewPairKey arg0, NewPairKey arg1) {
		return arg0.first - arg1.first;
	}
}

總結

分組:

  1. 實際上,沒有使用自定義分組也能實現二次排序邏輯。
  2. 使用了自定義分組后,分組的邏輯變成不再以整個NewPairKey作為比較,而是以fisrt為標的,把一些相同鍵的鍵值進行合並組,測試可知:Reduce input groups=16變成了4。
  3. compare方法需要知道比較的是Int型數據,就是4個字節的比較,Long型數據就是8個字節的比較。

分區和分組的區別:

  1. 分區是對輸出結果文件進行分類拆分文件以便更好查看,比如一個輸出文件包含所有狀態的http請求,那么為了方便查看通過分區把請求狀態分成幾個結果文件。
  2. 分組就是把一些相同鍵的鍵值對進行進行合並組;分區之后數據全部還是照樣輸出到reduce端,而分組的話就有所減少了。當然這2個步驟也是不同的階段執行。分區是在map端執行,環形內存緩沖區輸出到溢出文件的時候就是按照分區輸出;分組是在reduce端。

Group Comparator的執行過程:

  1. 在reduce階段,reducer接收到所有映射到這個reducer的map輸出后,調用key比較函數類對所有數據排序得到全局有序數據。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用job.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器。

需要Group Comparator嗎?

  1. 問:mapper產生的中間結果經過shuffle和sort后,每個key整合成一個記錄,每次reduce方法調用處理一個記錄,但是group的目的是讓一次reduce調用處理多條記錄,為什么?
  2. 答:如果不用分組,那么同一組的記錄就要在多次reduce方法中獨立處理,那么有些狀態數據就要傳遞了,就會增加復雜度,在一次調用中處理的話,這些狀態只要用方法內的變量就可以的。

關於WritableComparable、RawComparator和WritableComparator聯系與區別下回再總結了。

參考《Hadoop權威指南》
http://www.cnblogs.com/robert-blue/p/4159434.html》
http://blog.itpub.net/30066956/viewspace-2112283/》
初接觸,記下學習筆記,還有很多問題,望指導,謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM