Google Protobuf 使用方式分析
對於 RPC 協議來說,最重要的就是對象的發送與接收,這就要用到序列化與反序列化,也稱為編碼和解碼,序列化與反序列化和網絡傳輸一般都在對應的 RPC 框架中完成。
序列化與反序列化的流程如下:
JavaBean-> stub(client) <->skeleton(server)->JavaBean,簡單點說就是編碼和解碼。
相比於 RMI 遠程方法調用,很多 RPC 遠程過程調用的跨語言的,這就需要序列化於反序列化協議也支持跨語言。Google Procobuf 就是這樣一種跨語言的序列化於反序列化協議,效率非常高(怎么做到比其他協議效率高那?比其他協議壓縮生成的對象小)。
Netty 對於 ProtoBuf 提供了很好的支持。
先看如何單獨使用 Google ProtoBuf
-
新建 .proto 結構描述文件
syntax = "proto2"; package com.paul.protobuf; //加快解析速度 option optimize_for = SPEED; option java_package = "com.paul.protobuf"; option java_outer_classname = "DataInfo"; message Student{ reuqired string name = 1; option int32 = 2; option string address = 3; }
-
使用對應的編譯文件生成對應的 Java 類
Proton —java_out src/main/java src/protobuf/Student.proto
-
這時在我們代碼的 src/main/java 文件夾下生成了一個新的 pkg com.paul.protobuf,里面生成了 DataInfo 類。對象會有對應的 builder 方法讓我們來構建。
-
測試序列化方法
// 構建對象->字節->對象 public class ProtoBufTest{ public static void main(String[] args) throws Exception{ DataInfo.Student student = DataInfo.Student.newBuilder().setName("張三").setAge(20).setAddress("abc").build(); byte[] student2ByteArray = student.toByteArray(); DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray); System.out.println(studdent2); } }
在來看 Netty 對 Google ProtoBuf 的支持
還是只給出不一樣的部分(服務單和客戶端的這部分是一樣的):
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//解碼器
pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
//編碼器
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new MyServerHandler());
}
測試方法就是在客戶端組裝一個 DataInfo.Student 然后發送給服務端,這里就不演示了。
大家可能會發現上面的代碼存在一個問題,就是上面的程序只能對 DataInfo.Student 進行編解碼,如果傳遞消息的類型有多種怎么辦那?
解決方案一:定義義協議,需要自己實現解碼器,通過前兩位來標識具體的 JavaBean 類型。
解決方案二:定義一個最外層的類,通過枚舉的方式來確定傳遞的 JavaBean 類型。
比如我們有兩個 JavaBean
message MyMessage{
enum DataType{
PersonType = 1;
DogType = 2;
CatType = 3;
}
required Datatype data_type = 1;
//oneof 在同一時刻只有一個字段會被設置,字段之間會共享內存,后面設置會自動清空前面的。
oneof dataBody{
Person person = 2;
Dog dog = 3;
Cat cat = 4;
}
}
message Person{
option string name = 1;
option int32 age = 2;
option string address = 3;
}
message Dog{
option string name = 1;
option int32 age = 2;
}
message Cat{
option string name = 1;
option int32 city = 2;
}
Pipeline 的改動(客戶端和服務端):
pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));
我們自己的 handler 的改動:
@Overrode
public void channelActive(ChannelHandlerContext ctx) throws Exception{
MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(DataType.PersonType.PersonType).
setPerson(MyDataInfo.Person.newBuilder().
setName("張三").setAge(20).
setAddress("111").build()).
build();
ctx.channel().writeAndFlush(myMessage);
}
服務端 handler 根據 enum 的類型分別進行解析。
在實際的應用環境中,我們客戶端和服務端大概率是兩個分開的應用程序,此時我們使用 Google ProtoBuf 時 .proto 文件和對應的 class 文件是不是需要在兩邊都保存一份,如果有修改會非常麻煩。下面我們介紹一種最佳實踐。
最佳實踐是使用 git 作為版本控制系統為前提的:
不那么好的方案:git submodule,就相當於 maven 的子模塊,客戶端和服務端都依賴這個模塊。
比較好的方案:git subtree,將公共的代碼合並到 server 和 client 端,相當於向 server 和 client 提交代碼。
Apache Thrift 使用方式與文件編寫方式分析
Apache Thrift 和 Google ProtoBuf 整體非常相似,適用於可伸縮的跨語言的服務開發。Thrift 相當於 Netty + Google ProtoBuf,是一個高性能 RPC 框架。Thrift 底層是 socket + RPC 的模式。
Thrift 是一個典型的 CS 結構,客戶端和服務端可以使用不同的語言開發,既然客戶端和服務端能使用不同的語言開發,那么一定有一種中間語言來關聯服務端和客戶端,這就是 IDL(Interface Description Language)。
Thrift 如何實現多語言之間的通信?
數據傳輸使用 socket (多種語言均支持),數據再以特定的格式(String 等)發送,接收方語言進行解析。
如何使用?
定義 thrift 的文件,由 thrift 文件(IDL) 生成雙方語言的接口,model,在生成的 model 以及接口中會有解碼編碼的代碼。
Thrift 中的服務
Thrift 定義服務相當於 Java 中創建 Interface 一樣,創建的 service 經過代碼生成命令之后就會生成客戶端和服務端的框架代碼,定義形式如下:
service HelloWorldService{
//service 中定義的函數,相當於 java interface 中定義的方法
string doAction(1:string name, 2:i32 age);
}
.thrift 文件的定義
// java 中的包名
namespace jave thrift.generate
// 定義別名
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String
struct Person{
1: optional String username,
2: optional int age,
3: optional boolean married
}
exception DataException{
1: optional String message,
2: optional String callStack,
3: optional String date
}
service PersonService{
Person getPersonByName(1: required String username) throws (1: DataException dataException),
void savePerson(1:requried Person person) throws (1:DataException dataException)
}
編譯 thrift 文件
thrift --gen java src/thrift/data.thrift
生成的文件
Person.java 里面包含了編解碼的方法,PersonService 里面包含了 getPersonByName 和 savePerson 的方法。
測試方法:
服務端服務的具體實現方法
public class PersonServiceImpl implements PersonService.Iface{
@Override
public Person getPersonByName(String username) throws DataException,TException{
Person p = new Person();
p.setUserName("paul");
p.setAge(25);
p.setMarried(true);
return p;
}
@Override
public void savePerson(Person person) throws DataException,TException{
System.out.println(person.getUserName());
}
}
Thrift 的服務端:
public class ThriftServer{
public static void main(String[] args){
//非阻塞的 socket server
TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
// 高可用的 server
THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4);
PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());
arg.protocolFactory(new TCompactPrococol.Factory());
arg.transportFactory(new TFramedTransport.Facotry());
arg.processorFactory(new TProcessorFactory(processor));
TServer server = new THsHaServer(arg);
System.out.println("Thrift Server Started");
//死循環
server.serve();
}
}
Thrift 的客戶端:
public class ThriftClient{
publiuc static void main(String[] args){
TTransport transport = new TFramedTransport(new TSocket
("localhost",8899),600);
TProcotol procotol = new TComapctProcotol(transport);
PersonService.Client client = new PersonService.Client(procotol);
try{
//打開 socket
transport.open();
//好像調用本地方法一樣
Person person = client.getPersonByName("paul");
System.out.println(person.getAge());
}catch(Exception ex){
throw ex;
}finally{
transport.close();
}
}
}
Thrift 的架構:
Thrift 的傳輸格式,協議:
TBinaryProtocol-二進制格式
TCompactProtocol-壓縮格式
TJSONProtocol-JSON 格式
TSimpleJSONProtocol-提供 JSON 只寫協議,生成的文件很容易通過腳本語言解析。很少使用,缺少元數據信息,接收方不能讀取出來。
TDebugProtocol-使用易懂的可讀的文本格式,以便於 debug。
Thrift 數據傳輸方式,transport:
TSocket-阻塞式 socket。
TFramedTransport-以 frame 為單位進行傳輸,非阻塞式服務中使用。
TFileTransport-以文件形式進行傳輸。
TMemoryTransport-將內存用於 I/O,Java 實現時內部實際使用了簡單的 ByteArrayOutputStream。
支持的服務模型,server:
TSimpleServer-簡單的單線程服務模型,常用於測試。
TThreadPoolServer-多線程服務模型,標准的阻塞式 IO。
TNonboockingServer-多線程服務模型,使用非阻塞式 IO(需要使用 TFramedTransport 數據傳輸方式)。
THsHaServer-THsHa 引入了線程池去處理,其模型把讀寫任務放到線程池處理。Half-sync/Half-async 的處理模式,Half-sync 是在處理 IO 事件上,Half-async 用於 handler 對 rpc 的同步處理。