Hadoop2源碼分析-序列化篇


1.概述

  上一篇我們了解了MapReduce的相關流程,包含MapReduce V2的重構思路,新的設計架構,與MapReduce V1的區別等內容,今天我們在來學習下在Hadoop V2中的序列化的相關內容,其目錄如下所示:

  • 序列化的由來
  • Hadoop序列化依賴圖詳解
  • Writable常用實現類

  下面,我們開始學習今天的內容。

2.序列化的由來

  我們知道Java語言對序列化提供了非常友好的支持,在定義一個類時,如果我們需要序列化一個類,只需要實現該類的序列化接口即可。場景:讓一個AppInfo類能夠被序列化,代碼如下所示:

/**
 * 
 */
package cn.hdfs.io;

import java.io.Serializable;

/**
 * @author dengjie
 * @date Apr 21, 2015
 * @description 定義一個可序列化的App信息類 
 */
public class AppInfo implements Serializable{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

}

  這么定義,不需要其他的操作,Java會自動的處理各種對象關系。雖然,Java的序列化接口易於實現且內建支持,同樣,它的不足之處也是暴露無遺,它占用空間過大,額外的開銷導致速度降低。這些缺點對於Hadoop來說是不合適的,導致Hadoop沒有采用Java自身的序列化機制,而是Hadoop自己開發了一套適合自己的序列化機制。

  由於 Hadoop 的 MapReduce 和 HDFS 都有通信的需求,需要對通信的對象進行序列化。而且,Hadoop本身需要序列化速度要快,體積要小,占用帶寬低等要求。因此,了解Hadoop的序列化過程是很有必要的,下面我們對Hadoop的序列化內容做進一步學習研究。

  注:本文不對Java的Serializable接口做詳細贅述,若需了解 ,請參考官方文檔:http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html

3.Hadoop序列化依賴圖詳解

  在Hadoop的序列化機制中,org.apache.hadoop.io 中定義了大量的可序列化對象,他們都實現了 Writable 接口,Writable接口中有兩個方法,如下所示:

  • write:將對象寫入字節流。

  • readFields:從字節流中解析出對象。

  Writeable源碼如下所示:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * A serializable object which implements a simple, efficient, serialization 
 * protocol, based on {@link DataInput} and {@link DataOutput}.
 *
 * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
 * framework implements this interface.</p>
 * 
 * <p>Implementations typically implement a static <code>read(DataInput)</code>
 * method which constructs a new instance, calls {@link #readFields(DataInput)} 
 * and returns the instance.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritable implements Writable {
 *       // Some data     
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public static MyWritable read(DataInput in) throws IOException {
 *         MyWritable w = new MyWritable();
 *         w.readFields(in);
 *         return w;
 *       }
 *     }
 * </pre></blockquote></p>
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
  /** 
   * Serialize the fields of this object to <code>out</code>.
   * 
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
   * 
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  void readFields(DataInput in) throws IOException;
}

  下面我們來看看Hadoop序列化的依賴圖關系,如下圖所示:

  從上圖我們可以看出,WritableComparable接口同時繼承了Writable和Comparable接口。

  WritableComparable源碼如下所示:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * A {@link Writable} which is also {@link Comparable}. 
 *
 * <p><code>WritableComparable</code>s can be compared to each other, typically 
 * via <code>Comparator</code>s. Any type which is to be used as a 
 * <code>key</code> in the Hadoop Map-Reduce framework should implement this
 * interface.</p>
 *
 * <p>Note that <code>hashCode()</code> is frequently used in Hadoop to partition
 * keys. It's important that your implementation of hashCode() returns the same 
 * result across different instances of the JVM. Note also that the default 
 * <code>hashCode()</code> implementation in <code>Object</code> does <b>not</b>
 * satisfy this property.</p>
 *  
 * <p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
 *       // Some data
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public int compareTo(MyWritableComparable o) {
 *         int thisValue = this.value;
 *         int thatValue = o.value;
 *         return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
 *       }
 *
 *       public int hashCode() {
 *         final int prime = 31;
 *         int result = 1;
 *         result = prime * result + counter;
 *         result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));
 *         return result
 *       }
 *     }
 * </pre></blockquote></p>
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

  接着我們再來看看Comparable的源碼,代碼如下所示:

package java.lang;
import java.util.*;

public interface Comparable<T> {
    public int compareTo(T o);
}

  通過源碼的閱讀,我們可以發現,Java的API提供的Comparable接口,它只有一個方法,就是compareTo,該方法用於比較兩個對象。

  上圖中列舉了Hadoop序列化接口中的所有類型,這里我們主要研究一些常用的實現類,如IntWriteable,Text,LongWriteable等。

4.Writable常用實現類

  首先我們來看看IntWriteable和LongWriteable的源碼,具體代碼如下所示:

  • IntWriteable
package org.apache.hadoop.io;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for ints. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class IntWritable implements WritableComparable<IntWritable> {
  private int value;

  public IntWritable() {}

  public IntWritable(int value) { set(value); }

  /** Set the value of this IntWritable. */
  public void set(int value) { this.value = value; }

  /** Return the value of this IntWritable. */
  public int get() { return value; }

  @Override
  public void readFields(DataInput in) throws IOException {
    value = in.readInt();
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeInt(value);
  }

  /** Returns true iff <code>o</code> is a IntWritable with the same value. */
  @Override
  public boolean equals(Object o) {
    if (!(o instanceof IntWritable))
      return false;
    IntWritable other = (IntWritable)o;
    return this.value == other.value;
  }

  @Override
  public int hashCode() {
    return value;
  }

  /** Compares two IntWritables. */
  @Override
  public int compareTo(IntWritable o) {
    int thisValue = this.value;
    int thatValue = o.value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }

  @Override
  public String toString() {
    return Integer.toString(value);
  }

  /** A Comparator optimized for IntWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(IntWritable.class);
    }
    
    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

  static {                                        // register this comparator
    WritableComparator.define(IntWritable.class, new Comparator());
  }
}
  • LongWritable
package org.apache.hadoop.io;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** A WritableComparable for longs. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class LongWritable implements WritableComparable<LongWritable> {
  private long value;

  public LongWritable() {}

  public LongWritable(long value) { set(value); }

  /** Set the value of this LongWritable. */
  public void set(long value) { this.value = value; }

  /** Return the value of this LongWritable. */
  public long get() { return value; }

  @Override
  public void readFields(DataInput in) throws IOException {
    value = in.readLong();
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(value);
  }

  /** Returns true iff <code>o</code> is a LongWritable with the same value. */
  @Override
  public boolean equals(Object o) {
    if (!(o instanceof LongWritable))
      return false;
    LongWritable other = (LongWritable)o;
    return this.value == other.value;
  }

  @Override
  public int hashCode() {
    return (int)value;
  }

  /** Compares two LongWritables. */
  @Override
  public int compareTo(LongWritable o) {
    long thisValue = this.value;
    long thatValue = o.value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }

  @Override
  public String toString() {
    return Long.toString(value);
  }

  /** A Comparator optimized for LongWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(LongWritable.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      long thisValue = readLong(b1, s1);
      long thatValue = readLong(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

  /** A decreasing Comparator optimized for LongWritable. */ 
  public static class DecreasingComparator extends Comparator {
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      return -super.compare(a, b);
    }
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return -super.compare(b1, s1, l1, b2, s2, l2);
    }
  }

  static {                                       // register default comparator
    WritableComparator.define(LongWritable.class, new Comparator());
  }

}

  從源碼IntWritable和LongWriteable中可以看到,兩個類中都包含內部類Comparator,該類的作用是用來支持在沒有反序列化的情況下直接對數據進行處理。源碼中的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法不需要創建IntWritable對象,效率比compareTo(Object o)高。

  • Text

  Text的源碼大約有670多行,這里就不貼了,若大家要閱讀詳細的Text源碼,請在Hadoop的org.apache.hadoop.io的包下,找到Text類進行閱讀,下面只截取Text的部分源碼,部分源碼如下:

@Stringable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Text extends BinaryComparable
    implements WritableComparable<BinaryComparable> {
      
    // 詳細代碼省略......  
    
}

  從源碼中看出,Text繼承類BinaryComparable基類,並實現了WritableComparable<BinaryComparable>接口,WritableComparable在上面已贅述過了,下面我們來分析一下BinaryComparable,首先我們來查看一下BinaryComparable的源碼,部分源碼如下所示:

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class BinaryComparable implements Comparable<BinaryComparable> {

   // 詳細代碼省略......

}

  我們發現BinaryComparable(實現了Comparable接口)是一個抽象類,由該抽象類的子類去實現了Hadoop二進制的序列化。該抽象類中有兩個compareTo方法,代碼如下所示:

/**
   * Compare bytes from {#getBytes()}.
   * @see org.apache.hadoop.io.WritableComparator#compareBytes(byte[],int,int,byte[],int,int)
   */
  @Override
  public int compareTo(BinaryComparable other) {
    if (this == other)
      return 0;
    return WritableComparator.compareBytes(getBytes(), 0, getLength(),
             other.getBytes(), 0, other.getLength());
  }

  /**
   * Compare bytes from {#getBytes()} to those provided.
   */
  public int compareTo(byte[] other, int off, int len) {
    return WritableComparator.compareBytes(getBytes(), 0, getLength(),
             other, off, len);
  }

  從代碼中,我們可以看出,兩個compareTo方法中依賴WritableComparator的靜態方法compareBytes來完成二進制數據的比較。另外,從Text類的注視中可以看出,Text是基於UTF-8編碼的Writeable類,注視內容如下所示:

/** This class stores text using standard UTF8 encoding.  It provides methods
 * to serialize, deserialize, and compare texts at byte level.  The type of
 * length is integer and is serialized using zero-compressed format.  <p>In
 * addition, it provides methods for string traversal without converting the
 * byte array to a string.  <p>Also includes utilities for
 * serializing/deserialing a string, coding/decoding a string, checking if a
 * byte array contains valid UTF8 code, calculating the length of an encoded
 * string.
 */

  一般來說,在開發Hadoop項目時,我們認為它等價於Java的String類型,即java.lang.String。

5.總結

  通過本篇博客的學習,我們對Hadoop的序列化有了較深的認識,對IntWriteable,LongWriteable,Text等實現類也有所了解,這對我們在經后開發Hadoop項目,編寫相應的MR作業是有所幫助的。在類型的選擇上,我們是可以做到心中有數的。

6.結束語

  這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM