本文為原創,未經許可禁止轉載。
關於Tprotocol層都是一些通信協議,個人感覺內容較大,很難分類描述清楚。故打算以TBinaryProtocol為例,分析客戶端發請求以及接收服務端返回數據的整個過程。
先將客戶端的測試用例貼上。
1 public class DemoClient { 2 public static void main(String[] args) throws Exception{ 3 String param1 = "haha"; 4 Map<String, String> param3 = new HashMap<String, String>(); 5 param3.put("1", "2"); 6 Parameter param2 = new Parameter(10, "kaka"); 7 8 TSocket socket = new TSocket("127.0.0.1", 7911); 9 socket.setTimeout(3000); 10 TTransport transport = socket; 11 transport.open(); 12 TProtocol protocol = new TBinaryProtocol(transport); 13 DemoService.Client client = new DemoService.Client.Factory().getClient(protocol); 14 int result = client.demoMethod(param1, param2, param3); 15 System.out.println("result: " + result); 16 transport.close(); 17 }
首先就是構造transport,這里由於TSocket extens TIOStreamTransport,因此可構造一個TSocket即可,而TSocket包含:host(主機IP),port(端口號),time_out(超時時間)與一個Socket。
1 public TSocket(String host, int port, int timeout) { 2 host_ = host; 3 port_ = port; 4 timeout_ = timeout; 5 initSocket(); 6 }
對於socket.setTimeout(3000);實際操作就是為TSocket中的socket設置timeout
1 public void setTimeout(int timeout) { 2 timeout_ = timeout; 3 try { 4 socket_.setSoTimeout(timeout); 5 } catch (SocketException sx) { 6 LOGGER.warn("Could not set socket timeout.", sx); 7 } 8 }
下圖是構造的transport直觀構造:包含了host,inputStream,outputStream,port,socket,timeout.
transport.open所做的事情就是初始化一些輸入輸出流並且connect the socket to the InetSocketAddress
1 /** 2 * Connects the socket, creating a new socket object if necessary. 3 */ 4 public void open() throws TTransportException { 5 if (isOpen()) { 6 throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); 7 } 8 9 if (host_.length() == 0) { 10 throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); 11 } 12 if (port_ <= 0) { 13 throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port."); 14 } 15 16 if (socket_ == null) { 17 initSocket(); 18 } 19 20 try { 21 socket_.connect(new InetSocketAddress(host_, port_), timeout_); 22 inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);//均采用緩沖模式輸入輸出流 23 outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); 24 } catch (IOException iox) { 25 close(); 26 throw new TTransportException(TTransportException.NOT_OPEN, iox); 27 } 28 }
再看一下open之后的transport:
接下來就是在已有transport也就是TSocket的基礎之上,完成Tprotocol的構建,這里選擇了TBinaryProtocol。這個工作實際上就是將上一步建好的Ttransport關聯到Tprotocol上來。相當於進一步封裝。
1 public abstract class TProtocol { 2 3 /** 4 * Prevent direct instantiation 5 */ 6 @SuppressWarnings("unused") 7 private TProtocol() {} 8 9 /** 10 * Transport 11 */ 12 protected TTransport trans_; 13 14 /** 15 * Constructor 16 */ 17 protected TProtocol(TTransport trans) { 18 trans_ = trans; 19 } 20 21 /** 22 * Transport accessor 23 */ 24 public TTransport getTransport() { 25 return trans_; 26 } 27 /**各種讀寫方法略去 28 */ 29 }
從TProtocol的構造方法中可以看出,實際上就是將上一步生成的Transport賦與TProtocol中的trans_變量並將strictRead_與strictWrite_賦值。
1 /** 2 * Constructor 3 */ 4 public TBinaryProtocol(TTransport trans) { 5 this(trans, false, true); 6 } 7 8 public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) { 9 super(trans); 10 strictRead_ = strictRead; 11 strictWrite_ = strictWrite; 12 }
其中還有一些字節數組的初始化工作。
1 private byte [] bout = new byte[1]; 2 3 4 private byte[] i16out = new byte[2]; 5 6 7 private byte[] i32out = new byte[4]; 8 9 10 private byte[] i64out = new byte[8]; 11
這時候一切准備就緒。Tprotocol目前狀態如下圖:
Tprotocol已經准備就緒,接下來的工作就是new 一個client,然后才可以去與服務端進行請求與響應。下面我把一個client的代碼全部粘貼出來。
1 public static class Client extends org.apache.thrift.TServiceClient implements Iface { 2 public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> { 3 public Factory() {} 4 public Client getClient(org.apache.thrift.protocol.TProtocol prot) {//通過Tprotocol去構造client 5 return new Client(prot); 6 } 7 public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { 8 return new Client(iprot, oprot); 9 } 10 } 11 12 public Client(org.apache.thrift.protocol.TProtocol prot) 13 { 14 super(prot, prot);//使用了相同的Tprotocol進行構造 15 } 16 17 public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { 18 super(iprot, oprot); 19 } 20 21 public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException 22 { 23 send_demoMethod(param1, param2, param3); 24 return recv_demoMethod(); 25 } 26 27 public void send_demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException 28 { 29 demoMethod_args args = new demoMethod_args(); 30 args.setParam1(param1); 31 args.setParam2(param2); 32 args.setParam3(param3); 33 sendBase("demoMethod", args); 34 } 35 36 public int recv_demoMethod() throws org.apache.thrift.TException 37 { 38 demoMethod_result result = new demoMethod_result(); 39 receiveBase(result, "demoMethod"); 40 if (result.isSetSuccess()) { 41 return result.success; 42 } 43 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result"); 44 } 45 46 }
為了理解客戶端構造的具體過程,我把TserviceClient.class的部分源碼貼出來:
1 public TServiceClient(TProtocol iprot, TProtocol oprot) { 2 iprot_ = iprot; 3 oprot_ = oprot; 4 } 5 6 protected TProtocol iprot_; 7 protected TProtocol oprot_; 8 9 protected int seqid_; 10 11 /** 12 * Get the TProtocol being used as the input (read) protocol. 13 * @return the TProtocol being used as the input (read) protocol. 14 */ 15 public TProtocol getInputProtocol() { 16 return this.iprot_; 17 } 18 19 /** 20 * Get the TProtocol being used as the output (write) protocol. 21 * @return the TProtocol being used as the output (write) protocol. 22 */ 23 public TProtocol getOutputProtocol() { 24 return this.oprot_; 25 }
明顯的可以看到,client有三個變量,TProtocol類型的iprot_和oprot_,還有一個順序號seqid_.由於在構造client的過程中使用了相同的Tprotocol,在這里也就是使用了相同的TBinaryProtocol,因此iprot_與oprot_是相同的,都指向上一步生成的TProtocol,也就是TBinaryProtocol.當DemoService.Client client = new DemoService.Client.Factory().getClient(protocol);執行完畢后,client的狀態如下圖:
client已經准備完畢,我們調用client的方法就可以向服務端發送請求了。而這個過程的總體代碼也就那么一點點,先直接貼出來:
1 public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException 2 { 3 send_demoMethod(param1, param2, param3);//發送請求 4 return recv_demoMethod();//接收響應 5 } 6 7 public void send_demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException 8 { 9 demoMethod_args args = new demoMethod_args();//封裝請求參數demoMethod_args 10 args.setParam1(param1); 11 args.setParam2(param2); 12 args.setParam3(param3); 13 sendBase("demoMethod", args);//發請求 14 } 15 16 public int recv_demoMethod() throws org.apache.thrift.TException 17 { 18 demoMethod_result result = new demoMethod_result();//封裝接收響應數據demoMethod_result,貌似與demoMethod_args還不一樣 19 receiveBase(result, "demoMethod");//接收返回數據 20 if (result.isSetSuccess()) { 21 return result.success; 22 } 23 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result"); 24 }
當執行完demoMethod_args args = new demoMethod_args();之后,其實就是對demoMethod_args中的靜態變量進行了初始化,STRUCT_DESC,PARAM1_FIELD_DESC,PARAM2_FIELD_DESC,schemes,PARAM3_FIELD_DESC,metaDataMap等都有了初始值。args.setParam之后,demoMethod_args的狀態:
接下來就是:
1 protected void sendBase(String methodName, TBase args) throws TException { 2 oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//注意這里的++seqid,就是發送請求的序號,遞增 3 args.write(oprot_); 4 oprot_.writeMessageEnd(); 5 oprot_.getTransport().flush();//這里最終其實就是outputStream進行flush 6 }
將methodName: demoMethod, args: demoMethod_args(param1:haha, param2:Parameter(id:10, name:kaka), param3:{1=2})寫入Tprotocol,在這里是oprot_。
1 public void writeMessageBegin(TMessage message) throws TException { 2 if (strictWrite_) { 3 int version = VERSION_1 | message.type;//異或形成版本號 4 writeI32(version);//寫入版本號 5 writeString(message.name);//寫方法名 6 writeI32(message.seqid);//方法序號 7 } else { 8 writeString(message.name); 9 writeByte(message.type); 10 writeI32(message.seqid); 11 } 12 }
1 public void writeString(String str) throws TException { 2 try { 3 byte[] dat = str.getBytes("UTF-8"); 4 writeI32(dat.length); 5 trans_.write(dat, 0, dat.length); 6 } catch (UnsupportedEncodingException uex) { 7 throw new TException("JVM DOES NOT SUPPORT UTF-8"); 8 } 9 }
1 public void writeI32(int i32) throws TException { 2 i32out[0] = (byte)(0xff & (i32 >> 24)); 3 i32out[1] = (byte)(0xff & (i32 >> 16)); 4 i32out[2] = (byte)(0xff & (i32 >> 8)); 5 i32out[3] = (byte)(0xff & (i32)); 6 trans_.write(i32out, 0, 4); 7 }
1 /** 2 * Writes to the underlying output stream if not null. 3 */ 4 public void write(byte[] buf, int off, int len) throws TTransportException { 5 if (outputStream_ == null) { 6 throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream"); 7 } 8 try { 9 outputStream_.write(buf, off, len); 10 } catch (IOException iox) { 11 throw new TTransportException(TTransportException.UNKNOWN, iox); 12 } 13 }
從以上代碼可以看出來,無論怎么寫,都是一層層深入的,TProtocol oprot_ ----->Ttransport trans_ ----->OutputStream outputStream(TODO:這里的outputStream其實也是bufferedOutputStream,也就是剛剛初始化transport的時候那個outputstream.其中比較奇葩的是args_.write,其代碼如下,最后還是繞到了oprot.write,只不過這里有Struct,Field.目測這里用 schemes.get(oprot.getScheme()).getScheme().write(oprot, this);就是因為args的一些參數在靜態初始化的時候已經放入了schemes
1 public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { 2 schemes.get(oprot.getScheme()).getScheme().write(oprot, this); 3 }
1 public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_args struct) throws org.apache.thrift.TException { 2 struct.validate(); 3 4 oprot.writeStructBegin(STRUCT_DESC); 5 if (struct.param1 != null) { 6 oprot.writeFieldBegin(PARAM1_FIELD_DESC); 7 oprot.writeString(struct.param1); 8 oprot.writeFieldEnd(); 9 } 10 if (struct.param2 != null) { 11 oprot.writeFieldBegin(PARAM2_FIELD_DESC); 12 struct.param2.write(oprot); 13 oprot.writeFieldEnd(); 14 } 15 if (struct.param3 != null) { 16 oprot.writeFieldBegin(PARAM3_FIELD_DESC); 17 { 18 oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.param3.size())); 19 for (Map.Entry<String, String> _iter4 : struct.param3.entrySet()) 20 { 21 oprot.writeString(_iter4.getKey()); 22 oprot.writeString(_iter4.getValue()); 23 } 24 oprot.writeMapEnd(); 25 } 26 oprot.writeFieldEnd(); 27 } 28 oprot.writeFieldStop(); 29 oprot.writeStructEnd(); 30 } 31 32 }
到此為止,send_domoMethod完畢,接下來就是recv_demoMethod()也就是接受服務端返回的數據。
1 public int recv_demoMethod() throws org.apache.thrift.TException 2 { 3 demoMethod_result result = new demoMethod_result();//與封裝請求參數類似,加入一些內容到schema中 4 receiveBase(result, "demoMethod");//讀取數據進行一些組裝工作 5 if (result.isSetSuccess()) { 6 return result.success;//返回result中的success值 7 } 8 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "demoMethod failed: unknown result"); 9 }
1 protected void receiveBase(TBase result, String methodName) throws TException {//讀取返回結果,並將返回結果組裝好放到result中 2 TMessage msg = iprot_.readMessageBegin(); 3 if (msg.type == TMessageType.EXCEPTION) { 4 TApplicationException x = TApplicationException.read(iprot_); 5 iprot_.readMessageEnd(); 6 throw x; 7 } 8 if (msg.seqid != seqid_) { 9 throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response"); 10 } 11 result.read(iprot_);//將所讀取的數據封裝成需要類型返回 12 iprot_.readMessageEnd();//這一步其實什么也沒做,到此為止result其實已經形成 13 }
由於寫入的時候有寫入信息的類型,序號之類的東西,故這里讀取和寫入保持一致,也要readMessageBegin,只不過這里使用的是iprot_,其實還是Tprotocol。Tprotocol iprot_ ----->Ttransport trans_ ----->InputStream inputstream
1 public TMessage readMessageBegin() throws TException { 2 int size = readI32(); 3 if (size < 0) { 4 int version = size & VERSION_MASK; 5 if (version != VERSION_1) { 6 throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin"); 7 } 8 return new TMessage(readString(), (byte)(size & 0x000000ff), readI32()); 9 } else { 10 if (strictRead_) { 11 throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?"); 12 } 13 return new TMessage(readStringBody(size), readByte(), readI32()); 14 } 15 }
其中result.read(iprot_)還是對應着寫入時候的args.write,代碼貼出來:
1 private static class demoMethod_resultStandardScheme extends StandardScheme<demoMethod_result> { 2 3 public void read(org.apache.thrift.protocol.TProtocol iprot, demoMethod_result struct) throws org.apache.thrift.TException { 4 org.apache.thrift.protocol.TField schemeField; 5 iprot.readStructBegin(); 6 while (true) 7 { 8 schemeField = iprot.readFieldBegin(); 9 if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 10 break; 11 } 12 switch (schemeField.id) { 13 case 0: // SUCCESS 14 if (schemeField.type == org.apache.thrift.protocol.TType.I32) { 15 struct.success = iprot.readI32();//在這里讀取返回結果,這些結果的結構都是早已經定義好的,因為我們這里的例子是int類型,故這里只需要讀取readI32即可 16 struct.setSuccessIsSet(true); 17 } else { 18 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); 19 } 20 break; 21 default: 22 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); 23 } 24 iprot.readFieldEnd(); 25 } 26 iprot.readStructEnd(); 27 28 // check for required fields of primitive type, which can't be checked in the validate method 29 struct.validate(); 30 } 31 32 public void write(org.apache.thrift.protocol.TProtocol oprot, demoMethod_result struct) throws org.apache.thrift.TException { 33 struct.validate(); 34 35 oprot.writeStructBegin(STRUCT_DESC); 36 oprot.writeFieldBegin(SUCCESS_FIELD_DESC); 37 oprot.writeI32(struct.success); 38 oprot.writeFieldEnd(); 39 oprot.writeFieldStop(); 40 oprot.writeStructEnd(); 41 } 42 43 }
綜上,整個客戶端發請求以及接受返回數據也就是先寫后讀的一個完整過程也就完畢。整體流程圖我就用從網上找到的一個例子來看就好了,除了方法不一樣,其他都是一樣的道理。
本文為博主原創,未經許可禁止轉載。謝謝。