Avro從入門到入土


avro官網

1、Avro歷史

Avro是Hadoop的一個數據序列化系統,由Hadoop的創始人Doug Cutting(也是Lucene,Nutch等項目的創始人)開發,設計用於支持大批量數據交換的應用。

它的主要特點有:

  • 支持二進制序列化方式,可以便捷,快速地處理大量數據;
  • 動態語言友好,Avro提供的機制使動態語言可以方便地處理Avro數據

     

    Hadoop現存的RPC系統遇到一些問題,

  • 性能瓶頸(當前采用IPC系統,它使用Java自帶的DataOutputStream和DataInputStream);
  • 需要服務器端和客戶端必須運行相同版本的Hadoop;
  • 只能使用Java開發等。

對比其他序列化系統,如Google的Protocol Buffers, Facebook的Thrift可以完全可以滿足普通應用的需求。但現存的這些序列化系統自身也有毛病

以Protocol Buffers為例:

  1. 它需要用戶先定義數據結構,然后根據這個數據結構生成代碼,再組裝數據。如果需要操作多個數據源的數據集,那么需要定義多套數據結構並重復執行多次上面的流程,這樣就不能對任意數據集做統一處理。
  2. 對於Hadoop中Hive和Pig這樣的腳本系統來說,使用代碼生成是不合理的。並且Protocol Buffers在序列化時考慮到數據定義與數據可能不完全匹配,在數據中添加注解,這會讓數據變得龐大並拖慢處理速度。

其它序列化系統有如Protocol Buffers類似的問題。所以為了Hadoop的前途考慮,Doug Cutting主導開發一套全新的序列化系統,這就是Avro於09年加入Hadoop項目族中。 

2、Avro的結構 

     (1)Avro依賴模式(Schema)來實現數據結構定義。可以把模式理解為Java的類,它定義每個實例的結構,可以包含哪些屬性。可以根據類來產生任意多個實例對象(比較抽象不過可以看到)。對實例序列化操作時必須需要知道它的基本結構,也就需要參考類的信息。這里,根據模式產生的Avro對象類似於類的實例對象。每次序列化/反序列化時都需要知道模式的具體結構。所以,在Avro可用的一些場景下,如文件存儲或是網絡通信,都需要模式與數據同時存在。Avro數據以模式來讀和寫(文件或是網絡),並且寫入的數據都不需要加入其它標識,這樣序列化時速度快且結果內容少。由於程序可以直接根據模式來處理數據,所以Avro更適合於腳本語言的發揮。 

     Avro的模式主要由JSON對象來表示,它可能會有一些特定的屬性,用來描述某種類型(Type)的不同形式。Avro支持八種基本類型(Primitive Type)和六種混合類型(Complex Type)。基本類型可以由JSON字符串來表示。每種不同的混合類型有不同的屬性(Attribute)來定義,有些屬性是必須的,有些是可選的,如果需要的話,可以用JSON數組來存放多個JSON對象定義。在這幾種Avro定義的類型的支持下,可以由用戶來創造出豐富的數據結構來,支持用戶紛繁復雜的數據。 

   (2)  Avro支持兩種序列化編碼方式:二進制編碼JSON編碼

  • 使用二進制編碼會高效序列化,並且序列化后得到的結果會比較小。而JSON一般用於調試系統或是基於WEB的應用。
  • 對Avro數據序列化/反序列化時都需要對模式以深度優先(Depth-First),從左到右(Left-to-Right)的遍歷順序來執行。
  • 基本類型的序列化容易解決,混合類型的序列化會有很多不同規則。對於基本類型和混合類型的二進制編碼在文檔中規定,按照模式的解析順序依次排列字節。對於JSON編碼,聯合類型(Union Type)就與其它混合類型表現不一致。 

    (3)Avro為了便於MapReduce的處理定義了一種容器文件格式(Container File Format)。

  1. 文件中只能有一種模式,所有需要存入這個文件的對象都需要按照這種模式以二進制編碼的形式寫入。
  2. 對象在文件中以塊(Block)來組織,並且這些對象都是可以被壓縮的。
  3. 塊和塊之間會存在同步標記符(Synchronization Marker),以便MapReduce方便地切割文件用於處理。

下圖是根據文檔描述畫出的文件結構圖(將Avro對象序列化到文件的操作): 

一個存儲文件由兩部分組成:頭信息(Header)和數據塊(Data Block)。

(1)頭信息又由三部分構成:四個字節的前綴(類似於Magic Number),文件Meta-data信息和隨機生成的16字節同步標記符。這里的Meta-data信息讓人有些疑惑,它除了文件的模式外,還能包含schema和codec。文檔中指出當前Avro認定的就兩個Meta-data:schema和codec。這里的codec表示對后面的文件數據塊(File Data Block)采用何種壓縮方式。Avro的實現都需要支持下面兩種壓縮方式:null(不壓縮)和deflate(使用Deflate算法壓縮數據塊)。除了文檔中認定的兩種Meta-data,用戶還可以自定義適用於自己的Meta-data。這里用long型來表示有多少個Meta-data數據對,也是讓用戶在實際應用中可以定義足夠的Meta-data信息。對於每對Meta-data信息,都有一個string型的key(需要以“avro.”為前綴)和二進制編碼后的value。

(2)每個數據塊,結構如下:一個long值記錄當前塊有多少個對象,一個long值用於記錄當前塊經過壓縮后的字節數,真正的序列化對象和16字節長度的同步標記符。由於對象可以組織成不同的塊,使用時就可以不經過反序列化而對某個數據塊進行操作。還可以由數據塊數,對象數和同步標記符來定位損壞的塊以確保數據完整性。 

 

3、RPC框架

Avro也被作為一種RPC框架來使用。客戶端希望同服務器端交互時,就需要交換雙方通信的協議,它類似於模式,需要雙方來定義,在Avro中被稱為消息(Message)。通信雙方都必須保持這種協議,以便於解析從對方發送過來的數據,這也就是RPC握手階段。 

消息從客戶端發送到服務器端需要經過傳輸層(Transport Layer),它發送消息並接收服務器端的響應。到達傳輸層的數據就是二進制數據。通常以HTTP作為傳輸模型,數據以POST方式發送到對方去。在Avro中,它的消息被封裝成為一組緩沖區(Buffer),類似於下圖的模型:

  如上圖,每個緩沖區以四個字節開頭,中間是多個字節的緩沖數據,最后以一個空緩沖區結尾。這種機制的好處在於,發送端在發送數據時可以很方便地組裝不同數據源的數據,接收方也可以將數據存入不同的存儲區。 當往緩沖區中寫數據時,大對象可以獨占一個緩沖區,而不是與其它小對象混合存放,便於接收方方便地讀取大對象。 

Protocol Buffer在傳輸數據時,往數據中加入注釋(annotation),以應對數據結構與數據不匹配的問題。但直接導致數據量變大,解析困難等缺點。那Avro是如何應對模式與數據的不同呢?

為了保證Avro的高效,假定模式至少大部分是匹配的,然后定義一些驗證規則,如果在規則滿足的前提下,做數據驗證。

如果模式不匹配就會報錯。相同模式,交互數據時,如果數據中缺少某個域(field),用規范中的默認值設置;如果數據中多了些與模式不匹配的數據。則忽視這些值。 

     Avro列出的優點中還有一項是:可排序的。一種語言支持的Avro程序在序列化數據后,可由其它語言的Avro程序對未反序列化的數據排序。

4.Avro數據序列化/反序列化

 1、下載avro-1.8.2.jar and avro-tools-1.8.2.jar兩個jar包,放到指定文件目錄。下載地址 http://www.trieuvan.com/apache/avro/1.8.2./java/

2、

定義模式(Schema)
在avro中,它是用Json格式來定義模式的。模式可以由基礎類型(null, boolean, int, long, float, double, bytes, and string)和復合類型(record, enum, array, map, union, and fixed)的數據組成。這里定義了一個簡單的模式User.avsc:
{
    "namespace": "com.wqb.hdfs.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "id", "type": "int"},
        {"name": "salary", "type": "int"},
        {"name": "age", "type": "int"},
        {"name": "address", "type": "string"}
    ]
}

3、打開cmd,進入到該目錄,執行命令生成User類

在該文件夾下的res 文件下的目錄下就會生成 com/wqb/hdfs/avro/User.java 文件。

4、使用eclipse新建maven項目,在pom.xml加入avro的依賴。

 

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

 

5、把生成的User.java類復制到工程中,注意這個User.java里面生成的User類及其內部類的包名默認是user.avsc文件中的namespace的值。

 

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package com.wqb.hdfs.avro;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = -8252366833275661088L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.wqb.hdfs.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"salary\",\"type\":\"int\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"address\",\"type\":\"string\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<User> ENCODER =
      new BinaryMessageEncoder<User>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<User> DECODER =
      new BinaryMessageDecoder<User>(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder<User> getDecoder() {
    return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
    return new BinaryMessageDecoder<User>(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this User to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
    return ENCODER.encode(this);
  }

  /** Deserializes a User from a ByteBuffer. */
  public static User fromByteBuffer(
      java.nio.ByteBuffer b) throws java.io.IOException {
    return DECODER.decode(b);
  }

  @Deprecated public CharSequence name;
  @Deprecated public int id;
  @Deprecated public int salary;
  @Deprecated public int age;
  @Deprecated public CharSequence address;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public User() {}

  /**
   * All-args constructor.
   * @param name The new value for name
   * @param id The new value for id
   * @param salary The new value for salary
   * @param age The new value for age
   * @param address The new value for address
   */
  public User(CharSequence name, Integer id, Integer salary, Integer age, CharSequence address) {
    this.name = name;
    this.id = id;
    this.salary = salary;
    this.age = age;
    this.address = address;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public Object get(int field$) {
    switch (field$) {
    case 0: return name;
    case 1: return id;
    case 2: return salary;
    case 3: return age;
    case 4: return address;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, Object value$) {
    switch (field$) {
    case 0: name = (CharSequence)value$; break;
    case 1: id = (Integer)value$; break;
    case 2: salary = (Integer)value$; break;
    case 3: age = (Integer)value$; break;
    case 4: address = (CharSequence)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'name' field.
   * @return The value of the 'name' field.
   */
  public CharSequence getName() {
    return name;
  }

  /**
   * Sets the value of the 'name' field.
   * @param value the value to set.
   */
  public void setName(CharSequence value) {
    this.name = value;
  }

  /**
   * Gets the value of the 'id' field.
   * @return The value of the 'id' field.
   */
  public Integer getId() {
    return id;
  }

  /**
   * Sets the value of the 'id' field.
   * @param value the value to set.
   */
  public void setId(Integer value) {
    this.id = value;
  }

  /**
   * Gets the value of the 'salary' field.
   * @return The value of the 'salary' field.
   */
  public Integer getSalary() {
    return salary;
  }

  /**
   * Sets the value of the 'salary' field.
   * @param value the value to set.
   */
  public void setSalary(Integer value) {
    this.salary = value;
  }

  /**
   * Gets the value of the 'age' field.
   * @return The value of the 'age' field.
   */
  public Integer getAge() {
    return age;
  }

  /**
   * Sets the value of the 'age' field.
   * @param value the value to set.
   */
  public void setAge(Integer value) {
    this.age = value;
  }

  /**
   * Gets the value of the 'address' field.
   * @return The value of the 'address' field.
   */
  public CharSequence getAddress() {
    return address;
  }

  /**
   * Sets the value of the 'address' field.
   * @param value the value to set.
   */
  public void setAddress(CharSequence value) {
    this.address = value;
  }

  /**
   * Creates a new User RecordBuilder.
   * @return A new User RecordBuilder
   */
  public static Builder newBuilder() {
    return new Builder();
  }

  /**
   * Creates a new User RecordBuilder by copying an existing Builder.
   * @param other The existing builder to copy.
   * @return A new User RecordBuilder
   */
  public static Builder newBuilder(Builder other) {
    return new Builder(other);
  }

  /**
   * Creates a new User RecordBuilder by copying an existing User instance.
   * @param other The existing instance to copy.
   * @return A new User RecordBuilder
   */
  public static Builder newBuilder(User other) {
    return new Builder(other);
  }

  /**
   * RecordBuilder for User instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
    implements org.apache.avro.data.RecordBuilder<User> {

    private CharSequence name;
    private int id;
    private int salary;
    private int age;
    private CharSequence address;

    /** Creates a new Builder */
    private Builder() {
      super(SCHEMA$);
    }

    /**
     * Creates a Builder by copying an existing Builder.
     * @param other The existing Builder to copy.
     */
    private Builder(Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.name)) {
        this.name = data().deepCopy(fields()[0].schema(), other.name);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.id)) {
        this.id = data().deepCopy(fields()[1].schema(), other.id);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.salary)) {
        this.salary = data().deepCopy(fields()[2].schema(), other.salary);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.age)) {
        this.age = data().deepCopy(fields()[3].schema(), other.age);
        fieldSetFlags()[3] = true;
      }
      if (isValidValue(fields()[4], other.address)) {
        this.address = data().deepCopy(fields()[4].schema(), other.address);
        fieldSetFlags()[4] = true;
      }
    }

    /**
     * Creates a Builder by copying an existing User instance
     * @param other The existing instance to copy.
     */
    private Builder(User other) {
            super(SCHEMA$);
      if (isValidValue(fields()[0], other.name)) {
        this.name = data().deepCopy(fields()[0].schema(), other.name);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.id)) {
        this.id = data().deepCopy(fields()[1].schema(), other.id);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.salary)) {
        this.salary = data().deepCopy(fields()[2].schema(), other.salary);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.age)) {
        this.age = data().deepCopy(fields()[3].schema(), other.age);
        fieldSetFlags()[3] = true;
      }
      if (isValidValue(fields()[4], other.address)) {
        this.address = data().deepCopy(fields()[4].schema(), other.address);
        fieldSetFlags()[4] = true;
      }
    }

    /**
      * Gets the value of the 'name' field.
      * @return The value.
      */
    public CharSequence getName() {
      return name;
    }

    /**
      * Sets the value of the 'name' field.
      * @param value The value of 'name'.
      * @return This builder.
      */
    public Builder setName(CharSequence value) {
      validate(fields()[0], value);
      this.name = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /**
      * Checks whether the 'name' field has been set.
      * @return True if the 'name' field has been set, false otherwise.
      */
    public boolean hasName() {
      return fieldSetFlags()[0];
    }


    /**
      * Clears the value of the 'name' field.
      * @return This builder.
      */
    public Builder clearName() {
      name = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /**
      * Gets the value of the 'id' field.
      * @return The value.
      */
    public Integer getId() {
      return id;
    }

    /**
      * Sets the value of the 'id' field.
      * @param value The value of 'id'.
      * @return This builder.
      */
    public Builder setId(int value) {
      validate(fields()[1], value);
      this.id = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /**
      * Checks whether the 'id' field has been set.
      * @return True if the 'id' field has been set, false otherwise.
      */
    public boolean hasId() {
      return fieldSetFlags()[1];
    }


    /**
      * Clears the value of the 'id' field.
      * @return This builder.
      */
    public Builder clearId() {
      fieldSetFlags()[1] = false;
      return this;
    }

    /**
      * Gets the value of the 'salary' field.
      * @return The value.
      */
    public Integer getSalary() {
      return salary;
    }

    /**
      * Sets the value of the 'salary' field.
      * @param value The value of 'salary'.
      * @return This builder.
      */
    public Builder setSalary(int value) {
      validate(fields()[2], value);
      this.salary = value;
      fieldSetFlags()[2] = true;
      return this;
    }

    /**
      * Checks whether the 'salary' field has been set.
      * @return True if the 'salary' field has been set, false otherwise.
      */
    public boolean hasSalary() {
      return fieldSetFlags()[2];
    }


    /**
      * Clears the value of the 'salary' field.
      * @return This builder.
      */
    public Builder clearSalary() {
      fieldSetFlags()[2] = false;
      return this;
    }

    /**
      * Gets the value of the 'age' field.
      * @return The value.
      */
    public Integer getAge() {
      return age;
    }

    /**
      * Sets the value of the 'age' field.
      * @param value The value of 'age'.
      * @return This builder.
      */
    public Builder setAge(int value) {
      validate(fields()[3], value);
      this.age = value;
      fieldSetFlags()[3] = true;
      return this;
    }

    /**
      * Checks whether the 'age' field has been set.
      * @return True if the 'age' field has been set, false otherwise.
      */
    public boolean hasAge() {
      return fieldSetFlags()[3];
    }


    /**
      * Clears the value of the 'age' field.
      * @return This builder.
      */
    public Builder clearAge() {
      fieldSetFlags()[3] = false;
      return this;
    }

    /**
      * Gets the value of the 'address' field.
      * @return The value.
      */
    public CharSequence getAddress() {
      return address;
    }

    /**
      * Sets the value of the 'address' field.
      * @param value The value of 'address'.
      * @return This builder.
      */
    public Builder setAddress(CharSequence value) {
      validate(fields()[4], value);
      this.address = value;
      fieldSetFlags()[4] = true;
      return this;
    }

    /**
      * Checks whether the 'address' field has been set.
      * @return True if the 'address' field has been set, false otherwise.
      */
    public boolean hasAddress() {
      return fieldSetFlags()[4];
    }


    /**
      * Clears the value of the 'address' field.
      * @return This builder.
      */
    public Builder clearAddress() {
      address = null;
      fieldSetFlags()[4] = false;
      return this;
    }

    
    @SuppressWarnings("unchecked")
    public User build() {
      try {
        User record = new User();
        record.name = fieldSetFlags()[0] ? this.name : (CharSequence) defaultValue(fields()[0]);
        record.id = fieldSetFlags()[1] ? this.id : (Integer) defaultValue(fields()[1]);
        record.salary = fieldSetFlags()[2] ? this.salary : (Integer) defaultValue(fields()[2]);
        record.age = fieldSetFlags()[3] ? this.age : (Integer) defaultValue(fields()[3]);
        record.address = fieldSetFlags()[4] ? this.address : (CharSequence) defaultValue(fields()[4]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumWriter<User>
    WRITER$ = (org.apache.avro.io.DatumWriter<User>)MODEL$.createDatumWriter(SCHEMA$);

  @Override public void writeExternal(java.io.ObjectOutput out)
    throws java.io.IOException {
    WRITER$.write(this, SpecificData.getEncoder(out));
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumReader<User>
    READER$ = (org.apache.avro.io.DatumReader<User>)MODEL$.createDatumReader(SCHEMA$);

  @Override public void readExternal(java.io.ObjectInput in)
    throws java.io.IOException {
    READER$.read(this, SpecificData.getDecoder(in));
  }

}
User.java文件

 6、實現序列化avro文件。

package com.wqb.hdfs.avro;

import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.File;
import java.io.IOException;

public class testAvro {
    public static void main(String[] args) throws IOException {
        // 聲明並初始化User對象
        // 方式一
        User user1 = new User();
        user1.setName("wqbin");
        user1.setId(1);
        user1.setSalary(1000);
        user1.setAge(20);
        user1.setAddress("beijing");

        // 方式二 使用構造函數
// Alternate constructor
        User user2 = new User("wang", 2, 1000, 19, "guangzhou");

// 方式三,使用Build方式
// Construct via builder
        User user3 = User.newBuilder()
                .setName("bin")
                .setId(3)
                .setAge(21)
                .setSalary(2000)
                .setAddress("shenzhen")
                .build();
        String path = "E:\\avro\\fuxi\\User.avro"; // avro文件存放目錄
        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
        DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
        dataFileWriter.create(user1.getSchema(), new File(path));
// 把生成的user對象寫入到avro文件
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.append(user3);
        dataFileWriter.close();
    }
}

 

 

7、實現avro反序列化

    @Test
    public void testDeSerial() throws IOException {
        DatumReader<User> reader = new SpecificDatumReader<User>(User.class);
        DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("E:\\avro\\fuxi\\User.avro"), reader);
        User user = null;
        while (dataFileReader.hasNext()) {
            user = dataFileReader.next();
            System.out.println(user);
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM