Thrift compiler代碼生成類解析


代碼生成類解析:   

      Thrift--facebook RPC框架,介紹就不說了,百度,google一大把,使用也不介紹,直接上結構和分析吧。

      Hello.thrift文件內容如下:

namespace java com.tomsun.thrift.generated.demo
service Hello {
   string helloString(1:string para)
   }

     內容很簡單,申明個RPC service(Hello),服務方法helloString,方法參數格式(seq: parameter type, parameter name),參數需要標號(1: xxx xxx, 2: xxx xxx), namespace 指定生成代碼語言類型(java),和Java包名(本文只討論java ^#^!).

 

類文件解析

     話說就上面定義一個service(只包含一個method),可生成的類文件可是一個龐然大物(975行代碼),在此就不全貼出來了,說明時會貼出關鍵代碼用於說明。

     因為thrift是一個完整的RPC框架,內部結構分的很細,所以代碼生成量理所當然,(protobuf的所謂的RPC,只不過是個架子,空的,基本上都得自己去實現。當protobuf序列化自己感覺比thrift豐富多了(sint,fint,int,uint)),

但thrift支持的容器類型(List,set, map)比protobuf多(list),具體介紹等以后再詳細,回到正題。

    thrift compiler生成的Hello.java 文件,以服務名為類文件名。服務接口定義如下:

 1   public interface Iface { //同步RPC服務調用接口定義
 2 
 3     public String helloString(String para) throws org.apache.thrift.TException;
 4 
 5   }
 6 
 7   public interface AsyncIface { //異步RPC服務調用接口定義(仔細瞅,方法返回值參數string沒了,方法參數多了個AsyncMethodCallback)  8 
 9     public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
10 
11   }

    Async形式具體返回值是通過callback回調來獲得。定義如下:

 1 public interface AsyncMethodCallback<T> {
 2   /**
 3    * This method will be called when the remote side has completed invoking
 4    * your method call and the result is fully read. For oneway method calls,
 5    * this method will be called as soon as we have completed writing out the
 6    * request.
 7    * @param response
 8    */
 9   public void onComplete(T response);
10 
11   /**
12    * This method will be called when there is an unexpected clientside
13    * exception. This does not include application-defined exceptions that
14    * appear in the IDL, but rather things like IOExceptions.
15    * @param exception
16    */
17   public void onError(Exception exception);
18 }

     自己定義異步RPC得實現該接口,同步RPC客戶端骨架及其工廠類如下。

1   public static class Client extends org.apache.thrift.TServiceClient implements Iface {
2     public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
View Code

    Client,客戶端調用RPC骨架實現,封裝了底層復雜RPC序列化,和網絡傳輸。

 public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
    public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
View Code

    感覺thrift的工廠有點雞肋的感念,沒有設計模式三種工廠那種封裝的味道,可有可無。

    具體看一下同步骨架內部:

 1   public String helloString(String para) throws org.apache.thrift.TException
 2     {
 3       send_helloString(para);
 4       return recv_helloString();
 5     }
 6 
 7     public void send_helloString(String para) throws org.apache.thrift.TException
 8     {
 9       helloString_args args = new helloString_args();
10       args.setPara(para);
11       sendBase("helloString", args);
12     }
13 
14     public String recv_helloString() throws org.apache.thrift.TException
15     {
16       helloString_result result = new helloString_result();
17       receiveBase(result, "helloString");
18       if (result.isSetSuccess()) {
19         return result.success;
20       }
21       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "helloString failed: unknown result");
22     }

     String helloString(String para)方法包含send,recv兩同步操作,另外thrift為方法參數和返回值都生成一個java類,本例中的helloString_args,helloString_result.

public static class helloString_args implements org.apache.thrift.TBase<helloString_args, helloString_args._Fields>, java.io.Serializable, Cloneable, Comparable<helloString_args>

public static class helloString_result implements org.apache.thrift.TBase<helloString_result, helloString_result._Fields>, java.io.Serializable, Cloneable, Comparable<helloString_result> 

    兩者為rpc客戶端和服務器端傳輸的數據,都實現TBase接口,

public interface TBase<T extends TBase<T,F>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable {

  /**
   * Reads the TObject from the given input protocol.
   *
   * @param iprot Input protocol
   */
  public void read(TProtocol iprot) throws TException;

  /**
   * Writes the objects out to the protocol
   *
   * @param oprot Output protocol
   */
  public void write(TProtocol oprot) throws TException;

  /**
   * Get the F instance that corresponds to fieldId.
   */
  public F fieldForId(int fieldId);

  /**
   * Check if a field is currently set or unset.
   *
   * @param field
   */
  public boolean isSet(F field);

  /**
   * Get a field's value by field variable. Primitive types will be wrapped in 
   * the appropriate "boxed" types.
   *
   * @param field
   */
  public Object getFieldValue(F field);

  /**
   * Set a field's value by field variable. Primitive types must be "boxed" in
   * the appropriate object wrapper type.
   *
   * @param field
   */
  public void setFieldValue(F field, Object value);

  public TBase<T, F> deepCopy();

  /**
   * Return to the state of having just been initialized, as though you had just
   * called the default constructor.
   */
  public void clear();
}

    TBase定義read,write和具體字段是否設值的判定方法,以helloString_args具體解析:

 private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("helloString_args");

 private static final org.apache.thrift.protocol.TField PARA_FIELD_DESC = new org.apache.thrift.protocol.TField("para", org.apache.thrift.protocol.TType.STRING, (short)1);

   struct_desc(TStruct)為thrift內部對象結構,作為遠程傳輸讀寫的參數metadata元數據和標志位(readStructBegin(), writeStructEnd())

/**
 * Helper class that encapsulates struct metadata.
 *
 */
public final class TStruct {
  public TStruct() {
    this("");
  }

  public TStruct(String n) {
    name = n;
  }

  public final String name; //(本例中為hellosString_args)
}

    para_field_desc(TField)為傳輸結構對象中的變量結構,read,write時會對應相應的標志位(readTFiledBegin(), writeTFieldEnd()),具體為方法參數和返回值,本例中為helloString方法參數(如果多個方法參數,helloString_args中將對應多個TField元數據):

/**
 * Helper class that encapsulates field metadata.
 *
 */
public class TField {
  public TField() {
    this("", TType.STOP, (short)0);
  }

  public TField(String n, byte t, short i) {
    name = n; //參數名
    type = t;//thrift內部類型
    id = i;// seq number,即為hello.thrift方法參數定義的num.
  }

  public final String name;
  public final byte   type;
  public final short  id;

  public String toString() {
    return "<TField name:'" + name + "' type:" + type + " field-id:" + id + ">";
  }

  @Override
  public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + id;
    result = prime * result + ((name == null) ? 0 : name.hashCode());
    result = prime * result + type;
    return result;
  }

  @Override
  public boolean equals(Object obj) {
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    TField otherField = (TField) obj;
    return type == otherField.type && id == otherField.id;
  }
}

     thrift內部類型:

/**
 * Type constants in the Thrift protocol.
 */
public final class TType {
  public static final byte STOP   = 0; //參數值沒有設定,指定stop,read wire stream時會直接skip。
  public static final byte VOID   = 1;
  public static final byte BOOL   = 2;
  public static final byte BYTE   = 3;
  public static final byte DOUBLE = 4;
  public static final byte I16    = 6;
  public static final byte I32    = 8;
  public static final byte I64    = 10;
  public static final byte STRING = 11;
  public static final byte STRUCT = 12;
  public static final byte MAP    = 13;
  public static final byte SET    = 14;
  public static final byte LIST   = 15;
  public static final byte ENUM   = 16;
}
 static {
      schemes.put(StandardScheme.class, new helloString_argsStandardSchemeFactory());
      schemes.put(TupleScheme.class, new helloString_argsTupleSchemeFactory());
    }

 注冊具體stream read,write的類和對應的工廠類,兩種讀寫方法:

 public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
    }

    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
    }
 private static class helloString_argsStandardScheme extends StandardScheme<helloString_args> {

      public void read(org.apache.thrift.protocol.TProtocol iprot, helloString_args struct) throws org.apache.thrift.TException {
        org.apache.thrift.protocol.TField schemeField;
        iprot.readStructBegin();
        while (true)
        {
          schemeField = iprot.readFieldBegin();
          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
            break;
          }
          switch (schemeField.id) {
            case 1: // PARA
              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
                struct.para = iprot.readString();
                struct.setParaIsSet(true);
              } else { 
                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
              }
              break;
            default:
              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
          }
          iprot.readFieldEnd();
        }
        iprot.readStructEnd();

        // check for required fields of primitive type, which can't be checked in the validate method
        struct.validate();
      }

      public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException {
        struct.validate();

        oprot.writeStructBegin(STRUCT_DESC);
        if (struct.para != null) {
          oprot.writeFieldBegin(PARA_FIELD_DESC);
          oprot.writeString(struct.para);
          oprot.writeFieldEnd();
        }
        oprot.writeFieldStop();
        oprot.writeStructEnd();
      }

    }
 private static class helloString_argsTupleScheme extends TupleScheme<helloString_args> {

      @Override
      public void write(org.apache.thrift.protocol.TProtocol prot, helloString_args struct) throws org.apache.thrift.TException {
        TTupleProtocol oprot = (TTupleProtocol) prot;
        BitSet optionals = new BitSet();
        if (struct.isSetPara()) {
          optionals.set(0);
        }
        oprot.writeBitSet(optionals, 1);
        if (struct.isSetPara()) {
          oprot.writeString(struct.para);
        }
      }

      @Override
      public void read(org.apache.thrift.protocol.TProtocol prot, helloString_args struct) throws org.apache.thrift.TException {
        TTupleProtocol iprot = (TTupleProtocol) prot;
        BitSet incoming = iprot.readBitSet(1);
        if (incoming.get(0)) {
          struct.para = iprot.readString();
          struct.setParaIsSet(true);
        }
      }
    }

  }

      standard schema讀寫更傾向於結構化,(structBeging->fieldBegin()->value()->fieldEnd()->structEnd()),而tuple schema通過一個bitset(bit位操作)來代替(structBegin,fieldBegin,structEnd,filedEnd哪些占用流空間,因為上面提及struct,field里面包含struct名,field名,類型,seq num等占用流空間信息),減少序列化和網絡傳輸的流大小。

public String para; // required

     para,helloString_args結構內部成員(方法參數具體值)。 _Field,struct內部所有成員的enum表示,用於上面standard schema讀取(見上述代碼)

/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
      PARA((short)1, "para");

      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();

      static {
        for (_Fields field : EnumSet.allOf(_Fields.class)) {
          byName.put(field.getFieldName(), field);
        }
      }

      /**
       * Find the _Fields constant that matches fieldId, or null if its not found.
       */
      public static _Fields findByThriftId(int fieldId) {
        switch(fieldId) {
          case 1: // PARA
            return PARA;
          default:
            return null;
        }
      }

      /**
       * Find the _Fields constant that matches fieldId, throwing an exception
       * if it is not found.
       */
      public static _Fields findByThriftIdOrThrow(int fieldId) {
        _Fields fields = findByThriftId(fieldId);
        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
        return fields;
      }

      /**
       * Find the _Fields constant that matches name, or null if its not found.
       */
      public static _Fields findByName(String name) {
        return byName.get(name);
      }

      private final short _thriftId;
      private final String _fieldName;

      _Fields(short thriftId, String fieldName) {
        _thriftId = thriftId;
        _fieldName = fieldName;
      }

      public short getThriftFieldId() {
        return _thriftId;
      }

      public String getFieldName() {
        return _fieldName;
      }
    }

      struct中field元數據metadata,FieldMetaData包含(_Field enum, fieldvaluemetadata(參數名,requirement type, TType)):

 // isset id assignments
    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
    static {
      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
      tmpMap.put(_Fields.PARA, new org.apache.thrift.meta_data.FieldMetaData("para", org.apache.thrift.TFieldRequirementType.DEFAULT, 
          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
      metaDataMap = Collections.unmodifiableMap(tmpMap);
      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(helloString_args.class, metaDataMap);
    }

    Requirement Type enum:

/**
 * Requirement type constants.
 *
 */
public final class TFieldRequirementType {
  public static final byte REQUIRED  = 1;
  public static final byte OPTIONAL = 2;
  public static final byte DEFAULT = 3;
}

    一些設值,判斷是否設值操作:

    public String getPara() {
      return this.para;
    }

    public helloString_args setPara(String para) {
      this.para = para;
      return this;
    }

    public void unsetPara() {
      this.para = null;
    }

    /** Returns true if field para is set (has been assigned a value) and false otherwise */
    public boolean isSetPara() {
      return this.para != null;
    }

<**************************************************************************************************************************************************>

     服務器端同步處理器類:

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor 

   注冊實際的處理函數類:

 private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("helloString", new helloString());
      return processMap;
    }

   處理函數類:

 public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {
      public helloString() {
        super("helloString");
      }

      public helloString_args getEmptyArgsInstance() {
        return new helloString_args();
      }

      protected boolean isOneway() {//是否是單向RPC,只調用不要求返回
        return false;
      }

      public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {
        helloString_result result = new helloString_result();
        result.success = iface.helloString(args.para);//調用用戶編寫的server實現類方法
        return result; //方法值類(helloString_result類)
      }
    }

  }

<**********************************************************************************************************************************************************>

       Async異步client和processor有所不同,來看看AsyncClient內部:

 public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { super(protocolFactory, clientManager, transport);
    }

    public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
      checkReady();
      helloString_call method_call = new helloString_call(para, resultHandler, this, ___protocolFactory, ___transport);
      this.___currentMethod = method_call;
      ___manager.call(method_call);
    }

    1:TAsyncClientManager:異步客戶端管理器類,AsyncClient統一把AsyncMethodCall傳遞給manager,內部用ConcurrentLinkedQueue,TimeOutSet,和另外開一個Thread來管理存儲,消息的傳遞,接收,和超時處理,

並異常,完成時調用MethodCallback回調。

    2:底層用Noblocking channel進行傳遞。

 

    異步遠程RPC調用:

public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
      checkReady();
      helloString_call method_call = new helloString_call(para, resultHandler, this, ___protocolFactory, ___transport);
      this.___currentMethod = method_call;
      ___manager.call(method_call);
    }

   helloString_call類封裝了消息表現形式:

 public static class helloString_call extends org.apache.thrift.async.TAsyncMethodCall {
      private String para;
      public helloString_call(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
        super(client, protocolFactory, transport, resultHandler, false);
        this.para = para;
      }

      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("helloString", org.apache.thrift.protocol.TMessageType.CALL, 0));
        helloString_args args = new helloString_args();
        args.setPara(para);
        args.write(prot);
        prot.writeMessageEnd();
      }

      public String getResult() throws org.apache.thrift.TException {
        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
          throw new IllegalStateException("Method call not finished!");
        }
        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
        return (new Client(prot)).recv_helloString();
      }
    }

  }

     異步RPC調用狀態:

  public static enum State {
    CONNECTING, // connecting.連接
    WRITING_REQUEST_SIZE, // writing_request_size.寫請求長度
    WRITING_REQUEST_BODY, // writing_request_body.寫請求體
    READING_RESPONSE_SIZE, // reading_repsonse_size.讀返回長度
    READING_RESPONSE_BODY, //reading_response_body.讀返回體
    RESPONSE_READ, //repsonse_read.返回讀取完畢
    ERROR;// error.有異常
  }

     消息類型:

public final class TMessageType {
  public static final byte CALL  = 1; // 調用
  public static final byte REPLY = 2; //返回應答
  public static final byte EXCEPTION = 3;//異常
  public static final byte ONEWAY = 4;// 單向
}

   Async服務器端處理,TBaseAsyncProcessor.process():

 //Find processing function
        final TMessage msg = in.readMessageBegin();
        AsyncProcessFunction fn = processMap.get(msg.name);
 //Get Args
        TBase args = (TBase)fn.getEmptyArgsInstance();
         args.read(in);
         in.readMessageEnd();
        //start off processing function
        fn.start(iface, args,fn.getResultHandler(fb,msg.seqid));
        return true;
public static class helloString<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, helloString_args, String>

  public helloString_args getEmptyArgsInstance() {
        return new helloString_args();
      }
 
   public void start(I iface, helloString_args args, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws TException {
        iface.helloString(args.para,resultHandler);
      }
 public AsyncMethodCallback<String> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
        final org.apache.thrift.AsyncProcessFunction fcall = this;
        return new AsyncMethodCallback<String>() { 
          public void onComplete(String o) {
            helloString_result result = new helloString_result();
            result.success = o;
            try {
              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
              return;
            } catch (Exception e) {
              LOGGER.error("Exception writing to internal frame buffer", e);
            }
            fb.close();
          }
          public void onError(Exception e) {
            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
            org.apache.thrift.TBase msg;
            helloString_result result = new helloString_result();
            {
              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
            }
            try {
              fcall.sendResponse(fb,msg,msgType,seqid);
              return;
            } catch (Exception ex) {
              LOGGER.error("Exception writing to internal frame buffer", ex);
            }
            fb.close();
          }
        };
      }

      protected boolean isOneway() {
        return false;
      }

 

    TProcessor, TTransport, async, schema,  server分析待續。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM