Hadoop數據類型


Writable類

Hadoop將許多Writable類歸入org.apache.hadoop.io包。形成如下圖所示的類層次結構。

Writable的Java基本類封裝


char類型以外,所有的原生類型都有對應的Writable類,並且通過getset方法(或者new的方式)可以獲取和設置它們

的值。


自定義Writable:

Hadoop自帶一系列有用的Writable實現,可以滿足絕大多數用途。但有時,我們需要編寫自己的自定義實現。通過自定

義Writable,我們能夠完全控制二進制表示和排序順序。Writable是MapReduce數據路徑的核心,所以調整二進制表示對其性

能有顯著影響。現有的Hadoop Writable應用已得到很好的優化,但為了對付更復雜的結構,最好創建一個新的Writable類型,

而不是使用已有的類型。


編寫一個表示一對字符串的實現,名為TextPair:

import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<textpair> {
	private Text first;
	private Text second;
	public TextPair() {
		set(newText(),newText());
	}
	public TextPair(String first, String second) {
		set(newText(first),newText(second));
	}
	public TextPair(Text first, Text second) {
		set(first, second);
	}
	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}
	public Text getFirst() {
		return first;
	}
	public Text getSecond() {
		return second;
	}
	@Override
	public void write(DataOutput out)throws IOException {
		first.write(out);
		second.write(out);
	}
	@Override
	public void readFields(DataInput in)throwsIOException {
		first.readFields(in);
		second.readFields(in);
	}
	@Override
	public int hashCode() {
		return first.hashCode() *163+ second.hashCode();
	}
	@Override
	public boolean equals(Object o) {
		if(o instanceof TextPair) {
			TextPair tp = (TextPair) o;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}
	@Override
	public String toString() {
		return first +"\t"+ second;
	}
	@Override
	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);
		if(cmp !=0) {
			return cmp;
		}
		return second.compareTo(tp.second);
	}
}

此實現的第一部分直觀易懂:有兩個Text實例變量(first和second)和相關的構造函數、get方法和set方法。所有的Writable

實現都必須有一個默認的構造函數,以便MapReduce框架能夠對它們進行實例化,進而調用readFields()方法來填充它們的字

段。 Writable實例是易變的、經常重用的,所以我們應該盡量避免在write()或readFields()方法中分配對象。


通過委托給每個Text對象本身,TextPair的write()方法依次序列化輸出流中的每一個Text對象。同樣,也通過委托給Text對

象本身,readFields()反序列化輸人流中的字節。DataOutput和DataInput接口有豐富的整套方法用於序列化和反序列化Java基

本類型,所以在一般情況下,我們能夠完全控制Writable對象的數據傳輸格式。


正如為Java寫的任意值對象一樣,我們會重寫java.lang.Object的hashCode()方法,equals()方法和toString()方法。

HashPartitioner使用hashCode()方法來選擇reduce分區,所以應該確保寫一個好的哈希函數來確保reduce函數的分區在大小上

是相當的。


TextPair是WritableComparable的實現,所以它提供了compareTo()方法的實現,加入我們希望的順序:它通過一個一個

String逐個排序。請注意,TextPair不同於前面的TextArrayWritable類(除了它可以存儲Text對象數之外),因為

TextArrayWritable只是一個Writable,而不是WritableComparable。


實現一個快速的RawComparator


上例中所示代碼能夠有效工作,但還可以進一步優化。正如前面所述,在MapReduce中,TextPair被用作鍵時,它必須被

反序列化為要調用的compareTo()方法的對象。是否可以通過查看其序列化表示的方式來比較兩個TextPair對象。


事實證明,我們可以這樣做,因為TextPair由兩個Text對象連接而成,二進制Text對象表示是一個可變長度的整型,包含

UTF-8表示的字符串中的字節數,后跟UTF-8字節本身。關鍵在於讀取開始的長度。從而得知第一個Text對象的字節表示有多

長,然后可以委托Text對象的RawComparator,然后利用第一或者第二個字符串的偏移量來調用它。下面例子給出了具體方法

(注意,該代碼嵌套在TextPair類中)。


用於比較TextPair字節表示的RawComparator:

public static class Comparator extends WritableComparator {
	private static final Text.Comparator TEXT_COMPARATOR =new Text.Comparator();
	public Comparator() {
		super(TextPair.class);
	}
	@Override
	public int  compare(byte[] b1,int s1,int l1,
	                    byte[] b2,int s2,int l2) {
		try {
			int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
			int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
			int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
			if(cmp != 0) {
				return cmp;
			}
			return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
			                               b2, s2 + firstL2, l2 - firstL2);
		} catch(IOException e) {
			throw new IllegalArgumentException(e);
		}
	}
}
static {
	WritableComparator.define(TextPair.class,newComparator());
}

事實上,我們一般都是繼承WritableComparator,而不是直接實現RawComparator,因為它提供了一些便利的方法和默

認實現。這段代碼的精妙之處在於計算firstL1和firstL2,每個字節流中第一個Text字段的長度。每個都由可變長度的整型(由

WritableUtils的decodeVIntSize()返回)和它的編碼值(由readVInt()返問)組成。


靜態代碼塊注冊原始的comparator以便MapReduce每次看到TextPair類,就知道使用原始comparator作為其默認

comparator。


自定義comparator


從TextPair可知,編寫原始的cornparator比較費力,因為必須處理字節級別的細節。如果需要編寫自己的實現,

org.apache.hadoop.io包中Writable的某些前瞻性實現值得研究研究。WritableUtils的有效方法也比較非常方便。


如果可能,還應把自定義comparator寫為RawComparators。這些comparator實現的排序順序不同於默認comparator定義

的自然排序順序。下面的例子顯示了TextPair的comparator,稱為First Comparator。只考慮了一對Text對象中的第一個字符

串。請注意,我們重寫了compare()方法使其使用對象進行比較,所以兩個compare()方法的語義是相同的。


自定義的RawComparator,用於比較TextPair字節表示中的第一字段:

public static class FirstComparator extends WritableComparator {
	private static final Text.Comparator TEXT_COMPARATOR =newText.Comparator();
	public FirstComparator() {
		super(TextPair.class);
	}
	@Override
	public int compare(byte[] b1,ints1,intl1,
	                   byte[] b2,ints2,intl2) {
		try {
			int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
			int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
			return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
		} catch(IOException e) {
			throw new IllegalArgumentException(e);
		}
	}
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		if(a instanceof TextPair && b instanceof TextPair) {
			return((TextPair) a).first.compareTo(((TextPair) b).first);
		}
		return super.compare(a, b);
	}
}

更多Writable參見 http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/Writable.html



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM