接上篇,安裝好之后,就開始編寫IDL生成然后測試。
一、生成運行
參考 http://www.aboutyun.com/thread-8916-1-1.html 來個添加,查詢。
namespace go my.test.demo namespace py my.test.demo struct Student{ 1: i32 sid, 2: string sname, 3: bool ssex=0, 4: i16 sage, } const map<string,string> MAPCONSTANT = {'hello':'world', 'goodnight':'moon'} service ClassMember { list<Student> List(1:i64 callTime), void Add(1: Student s), bool IsNameExist(1:i64 callTime, 2:string name), }
通過以下命令來生成代碼:
thrift -r --gen go my.test.thrift
thrift -r --gen py my.test.thrift
├── client ├── client.go ├── gen-py │ ├── __init__.py │ ├── client.py │ └── my │ ├── __init__.py │ ├── __init__.pyc │ └── test │ ├── __init__.py │ ├── __init__.pyc │ └── demo │ ├── ClassMember-remote │ ├── ClassMember.py │ ├── ClassMember.pyc │ ├── __init__.py │ ├── __init__.pyc │ ├── constants.py │ ├── ttypes.py │ └── ttypes.pyc ├── my │ └── test │ └── demo │ ├── class_member-remote │ │ └── class_member-remote.go │ ├── classmember.go │ ├── constants.go │ └── ttypes.go ├── my.test.thrift ├── server └── server.go
thrift幫忙生成代碼部分:網絡連接、數據序列化、RPC調用函數映射、數據發送等。
采用什么傳輸類型,采用什么服務類型,具體函數還要是自己寫的。
參考:
類似Thrift的工具,還有Avro、protocol buffer,但相對於Thrift來講,都沒有Thrift支持全面和使用廣泛。 1) thrift內部框架一瞥 按照官方文檔給出的整體框架,Thrift自下到上可以分為4層: +-------------------------------------------+ | Server | -- 服務器進程調度 | (single-threaded, event-driven etc) | +-------------------------------------------+ | Processor | -- RPC接口處理函數分發,IDL定義接口的實現將掛接到這里面 | (compiler generated) | +-------------------------------------------+ | Protocol | -- 協議 | (JSON, compact etc) | +-------------------------------------------+ | Transport | -- 網絡傳輸 | (raw TCP, HTTP etc) | +-------------------------------------------+ Thrift實際上是實現了C/S模式,通過代碼生成工具將接口定義文件生成服務器端和客戶端代碼(可以為不同語言),從而實現服務端和客戶端跨語言的支持。用戶在Thirft描述文件中聲明自己的服務,這些服務經過編譯后會生成相應語言的代碼文件,然后用戶實現服務(客戶端調用服務,服務器端提服務)便可以了。其中protocol(協議層, 定義數據傳輸格式,可以為二進制或者XML等)和transport(傳輸層,定義數據傳輸方式,可以為TCP/IP傳輸,內存共享或者文件共享等)被用作運行時庫。 2)支持的數據傳輸格式、數據傳輸方式和服務模型 (a)支持的傳輸格式 TBinaryProtocol – 二進制格式. TCompactProtocol – 壓縮格式 TJSONProtocol – JSON格式 TSimpleJSONProtocol –提供JSON只寫協議, 生成的文件很容易通過腳本語言解析。 TDebugProtocol – 使用易懂的可讀的文本格式,以便於debug (b) 支持的數據傳輸方式 TSocket -阻塞式socker TFramedTransport – 以frame為單位進行傳輸,非阻塞式服務中使用。 TFileTransport – 以文件形式進行傳輸。 TMemoryTransport – 將內存用於I/O. java實現時內部實際使用了簡單的ByteArrayOutputStream。 TZlibTransport – 使用zlib進行壓縮, 與其他傳輸方式聯合使用。當前無java實現。 (c)支持的服務模型 TSimpleServer – 簡單的單線程服務模型,常用於測試 TThreadPoolServer – 多線程服務模型,使用標准的阻塞式IO。 TNonblockingServer – 多線程服務模型,使用非阻塞式IO(需使用TFramedTransport數據傳輸方式) 3) Thrift IDL Thrift定義一套IDL(Interface Definition Language)用於描述接口,通常后綴名為.thrift,通過thrift程序把.thrift文件導出成各種不一樣的代碼的協議定義。IDL支持的類型可以參考這里:http://thrift.apache.org/docs/types

//client.go package main import ( "./my/test/demo" "fmt" "git.apache.org/thrift.git/lib/go/thrift" "net" "os" "time" ) const ( HOST = "127.0.0.1" PORT = "10086" ) func main() { startTime := currentTimeMillis() transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transport, err := thrift.NewTSocket(net.JoinHostPort(HOST, PORT)) if err != nil { fmt.Fprintln(os.Stderr, "error resolving address:", err) os.Exit(1) } useTransport := transportFactory.GetTransport(transport) client := demo.NewClassMemberClientFactory(useTransport, protocolFactory) if err := transport.Open(); err != nil { fmt.Fprintln(os.Stderr, "Error opening socket to "+HOST+":"+PORT, " ", err) os.Exit(1) } defer transport.Close() var i int32 for i = 0; i < 5; i++ { var s demo.Student s.Sid = i s.Sname = fmt.Sprintf("name_%d", i) err := client.Add(&s) time.Sleep(time.Second * 3) fmt.Println("add", i, "student", err) } sList, err := client.List(currentTimeMillis()) fmt.Println(err) for _, s := range sList { fmt.Println(s) } endTime := currentTimeMillis() fmt.Printf("calltime:%d-%d=%dms\n", endTime, startTime, (endTime - startTime)) } func currentTimeMillis() int64 { return time.Now().UnixNano() / 1000000 }

//server.go package main import ( "./my/test/demo" "fmt" "git.apache.org/thrift.git/lib/go/thrift" "os" ) const ( NetworkAddr = "0.0.0.0:10086" ) type ClassMemberImpl struct { } func (c *ClassMemberImpl) Add(s *demo.Student) (err error) { fmt.Println(s) students[s.Sid] = s return nil } func (c *ClassMemberImpl) List(callTime int64) (r []*demo.Student, err error) { for _, s := range students { r = append(r, s) } return r, nil } func (c *ClassMemberImpl) IsNameExist(callTime int64, name string) (r bool, err error) { for _, s := range students { if s.Sname == name { return true, nil } } return false, nil } var students map[int32]*demo.Student func main() { students = make(map[int32]*demo.Student, 5) transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() serverTransport, err := thrift.NewTServerSocket(NetworkAddr) if err != nil { fmt.Println("Error!", err) os.Exit(1) } handler := &ClassMemberImpl{} processor := demo.NewClassMemberProcessor(handler) server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory) fmt.Println("thrift server in", NetworkAddr) server.Serve() }
#client.py #!/usr/bin/env python # -*- coding: utf-8 -*- import sys, glob, time,datetime sys.path.append('gen-py') #print glob.glob('libpy/lib.*') #sys.path.insert(0, glob.glob('libpy/lib.*')[0]) from my.test.demo import ClassMember from my.test.demo.ttypes import * from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol try: startTime = time.time()*1000 # Make socket transport = TSocket.TSocket('127.0.0.1', 10086) # Framed is critical. Raw sockets are very slow transport = TTransport.TFramedTransport(transport) # Wrap in a protocol protocol = TBinaryProtocol.TBinaryProtocol(transport) # Create a client to use the protocol encoder client = ClassMember.Client(protocol) # Connect! transport.open() for i in range(1,6): i+=1 s = Student() s.sid = i s.sname = "name_{}".format(i) r = client.Add(s) print(s) endTime = time.time()*1000 print client.List(time.time()*1000) print "use:%d-%d=%dms" %(endTime,startTime, (endTime - startTime)) # Close! transport.close() except Thrift.TException, tx: print 'ERROR:%s' % (tx.message)
編寫代碼之后,編譯和運行可能需要安裝對應的語言包:
#python的thrift package安裝 sudo easy_install thrift==0.9.0 #golang的package安裝 go get git.apache.org/thrift.git/lib/go/thrift
編譯之后運行:
qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest $./client add 0 student <nil> add 1 student <nil> add 2 student <nil> add 3 student <nil> add 4 student <nil> <nil> Student({Sid:0 Sname:name_0 Ssex:false Sage:0}) Student({Sid:1 Sname:name_1 Ssex:false Sage:0}) Student({Sid:2 Sname:name_2 Ssex:false Sage:0}) Student({Sid:3 Sname:name_3 Ssex:false Sage:0}) Student({Sid:4 Sname:name_4 Ssex:false Sage:0}) calltime:1429856944847-1429856929841=15006ms
qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest/gen-py $python client.py Student(sname='name_2', sage=None, ssex=False, sid=2) Student(sname='name_3', sage=None, ssex=False, sid=3) Student(sname='name_4', sage=None, ssex=False, sid=4) Student(sname='name_5', sage=None, ssex=False, sid=5) Student(sname='name_6', sage=None, ssex=False, sid=6) [Student(sname='name_2', sage=0, ssex=False, sid=2), Student(sname='name_3', sage=0, ssex=False, sid=3), Student(sname='name_4', sage=0, ssex=False, sid=4), Student(sname='name_5', sage=0, ssex=False, sid=5), Student(sname='name_6', sage=0, ssex=False, sid=6)] use:1429858724602-1429858724600=1ms
跨語言OK。
二、thrift go package
qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest $./client add 0 student <nil> add 1 student <nil> add 2 student <nil> add 3 student EOF add 4 student write tcp 127.0.0.1:10086: broken pipe write tcp 127.0.0.1:10086: broken pipe calltime:1429856896694-1429856881690=15004ms
在client運行過程中把server干掉,可以看到返回錯誤。
這里超時、心跳,重連都要自己來搞,不像ZeroMQ,全部都幫你搞掂。
另外,Golang里面沒有下面兩種實現:
TThreadPoolServer – 多線程服務模型,使用標准的阻塞式IO。
TNonblockingServer – 多線程服務模型,使用非阻塞式IO(需使用TFramedTransport數據傳輸方式)
查看源碼:
func (p *TSimpleServer) AcceptLoop() error { for { client, err := p.serverTransport.Accept() if err != nil { select { case <-p.quit: return nil default: } return err } if client != nil { go func() { if err := p.processRequests(client); err != nil { log.Println("error processing request:", err) } }() } } } func (p *TSimpleServer) Serve() error { err := p.Listen() if err != nil { return err } p.AcceptLoop() return nil }
可以看到simple server本身就是異步非阻塞的。
func (p *TSimpleServer) processRequests(client TTransport) error { processor := p.processorFactory.GetProcessor(client) inputTransport := p.inputTransportFactory.GetTransport(client) outputTransport := p.outputTransportFactory.GetTransport(client) inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport) outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport) defer func() { if e := recover(); e != nil { log.Printf("panic in processor: %s: %s", e, debug.Stack()) } }() if inputTransport != nil { defer inputTransport.Close() } if outputTransport != nil { defer outputTransport.Close() } for { ok, err := processor.Process(inputProtocol, outputProtocol) if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE { return nil } else if err != nil { log.Printf("error processing request: %s", err) return err } if !ok { break } } return nil }
可以看到實現是在生成的代碼里面的:
func NewClassMemberProcessor(handler ClassMember) *ClassMemberProcessor { self6 := &ClassMemberProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} self6.processorMap["List"] = &classMemberProcessorList{handler: handler} self6.processorMap["Add"] = &classMemberProcessorAdd{handler: handler} self6.processorMap["IsNameExist"] = &classMemberProcessorIsNameExist{handler: handler} return self6 } func (p *ClassMemberProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { name, _, seqId, err := iprot.ReadMessageBegin() if err != nil { return false, err } if processor, ok := p.GetProcessorFunction(name); ok { return processor.Process(seqId, iprot, oprot) } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() x7 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) x7.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() return false, x7 } type classMemberProcessorList struct { handler ClassMember } func (p *classMemberProcessorList) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { args := ListArgs{} if err = args.Read(iprot); err != nil { iprot.ReadMessageEnd() x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) oprot.WriteMessageBegin("List", thrift.EXCEPTION, seqId) x.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() return false, err } iprot.ReadMessageEnd() result := ListResult{} var retval []*Student var err2 error if retval, err2 = p.handler.List(args.CallTime); err2 != nil { x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing List: "+err2.Error()) oprot.WriteMessageBegin("List", thrift.EXCEPTION, seqId) x.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() return true, err2 } else { result.Success = retval } if err2 = oprot.WriteMessageBegin("List", thrift.REPLY, seqId); err2 != nil { err = err2 } if err2 = result.Write(oprot); err == nil && err2 != nil { err = err2 } if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { err = err2 } if err2 = oprot.Flush(); err == nil && err2 != nil { err = err2 } if err != nil { return } return true, err }
如果是多語言協作,另外一種方式json+http,實現成本感覺沒有thrift這么大。
另外,性能可以參考, http://www.tuicool.com/articles/rEj6fie