Golang&Python測試thrift


接上篇,安裝好之后,就開始編寫IDL生成然后測試。

 

一、生成運行

參考 http://www.aboutyun.com/thread-8916-1-1.html 來個添加,查詢。

namespace go  my.test.demo
namespace py  my.test.demo
 
struct Student{
 1: i32 sid, 
 2: string sname,
 3: bool ssex=0,
 4: i16 sage,
}
 
const map<string,string> MAPCONSTANT = {'hello':'world', 'goodnight':'moon'}
     
service ClassMember {        
    list<Student> List(1:i64 callTime),
    void Add(1: Student s),
    bool IsNameExist(1:i64 callTime, 2:string name),
}

通過以下命令來生成代碼:

 thrift -r --gen go my.test.thrift 
 thrift -r --gen py my.test.thrift
├── client
├── client.go
├── gen-py
│   ├── __init__.py
│   ├── client.py
│   └── my
│       ├── __init__.py
│       ├── __init__.pyc
│       └── test
│           ├── __init__.py
│           ├── __init__.pyc
│           └── demo
│               ├── ClassMember-remote
│               ├── ClassMember.py
│               ├── ClassMember.pyc
│               ├── __init__.py
│               ├── __init__.pyc
│               ├── constants.py
│               ├── ttypes.py
│               └── ttypes.pyc
├── my
│   └── test
│       └── demo
│           ├── class_member-remote
│           │   └── class_member-remote.go
│           ├── classmember.go
│           ├── constants.go
│           └── ttypes.go
├── my.test.thrift
├── server
└── server.go

thrift幫忙生成代碼部分:網絡連接、數據序列化、RPC調用函數映射、數據發送等。

采用什么傳輸類型,采用什么服務類型,具體函數還要是自己寫的。

參考:

類似Thrift的工具,還有Avro、protocol buffer,但相對於Thrift來講,都沒有Thrift支持全面和使用廣泛。

1) thrift內部框架一瞥
  按照官方文檔給出的整體框架,Thrift自下到上可以分為4層:
+-------------------------------------------+
| Server                                    |  -- 服務器進程調度
| (single-threaded, event-driven etc) |
+-------------------------------------------+
| Processor                                 |  -- RPC接口處理函數分發,IDL定義接口的實現將掛接到這里面
| (compiler generated)                      |
+-------------------------------------------+
| Protocol                                  |  -- 協議
| (JSON, compact etc)                       |
+-------------------------------------------+
| Transport                                 |  -- 網絡傳輸
| (raw TCP, HTTP etc)                       |
+-------------------------------------------+

  Thrift實際上是實現了C/S模式,通過代碼生成工具將接口定義文件生成服務器端和客戶端代碼(可以為不同語言),從而實現服務端和客戶端跨語言的支持。用戶在Thirft描述文件中聲明自己的服務,這些服務經過編譯后會生成相應語言的代碼文件,然后用戶實現服務(客戶端調用服務,服務器端提服務)便可以了。其中protocol(協議層, 定義數據傳輸格式,可以為二進制或者XML等)和transport(傳輸層,定義數據傳輸方式,可以為TCP/IP傳輸,內存共享或者文件共享等)被用作運行時庫。

2)支持的數據傳輸格式、數據傳輸方式和服務模型
    (a)支持的傳輸格式
      TBinaryProtocol – 二進制格式.
      TCompactProtocol – 壓縮格式
      TJSONProtocol – JSON格式
      TSimpleJSONProtocol –提供JSON只寫協議, 生成的文件很容易通過腳本語言解析。
      TDebugProtocol – 使用易懂的可讀的文本格式,以便於debug
    (b) 支持的數據傳輸方式
      TSocket -阻塞式socker
      TFramedTransport – 以frame為單位進行傳輸,非阻塞式服務中使用。
      TFileTransport – 以文件形式進行傳輸。
      TMemoryTransport – 將內存用於I/O. java實現時內部實際使用了簡單的ByteArrayOutputStream。
      TZlibTransport – 使用zlib進行壓縮, 與其他傳輸方式聯合使用。當前無java實現。
    (c)支持的服務模型
      TSimpleServer – 簡單的單線程服務模型,常用於測試
      TThreadPoolServer – 多線程服務模型,使用標准的阻塞式IO。
      TNonblockingServer – 多線程服務模型,使用非阻塞式IO(需使用TFramedTransport數據傳輸方式)

    3) Thrift IDL
  Thrift定義一套IDL(Interface Definition Language)用於描述接口,通常后綴名為.thrift,通過thrift程序把.thrift文件導出成各種不一樣的代碼的協議定義。IDL支持的類型可以參考這里:http://thrift.apache.org/docs/types

 

//client.go

package main

import (
    "./my/test/demo"
    "fmt"
    "git.apache.org/thrift.git/lib/go/thrift"
    "net"
    "os"
    "time"
)

const (
    HOST = "127.0.0.1"
    PORT = "10086"
)

func main() {
    startTime := currentTimeMillis()
    transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
    transport, err := thrift.NewTSocket(net.JoinHostPort(HOST, PORT))
    if err != nil {
        fmt.Fprintln(os.Stderr, "error resolving address:", err)
        os.Exit(1)
    }

    useTransport := transportFactory.GetTransport(transport)
    client := demo.NewClassMemberClientFactory(useTransport, protocolFactory)
    if err := transport.Open(); err != nil {
        fmt.Fprintln(os.Stderr, "Error opening socket to "+HOST+":"+PORT, " ", err)
        os.Exit(1)
    }
    defer transport.Close()
    var i int32
    for i = 0; i < 5; i++ {
        var s demo.Student
        s.Sid = i
        s.Sname = fmt.Sprintf("name_%d", i)
        err := client.Add(&s)
        time.Sleep(time.Second * 3)
        fmt.Println("add", i, "student", err)
    }

    sList, err := client.List(currentTimeMillis())
    fmt.Println(err)
    for _, s := range sList {
        fmt.Println(s)
    }

    endTime := currentTimeMillis()
    fmt.Printf("calltime:%d-%d=%dms\n", endTime, startTime, (endTime - startTime))

}

func currentTimeMillis() int64 {
    return time.Now().UnixNano() / 1000000
}
View Code
//server.go

package main

import (
    "./my/test/demo"
    "fmt"
    "git.apache.org/thrift.git/lib/go/thrift"
    "os"
)

const (
    NetworkAddr = "0.0.0.0:10086"
)

type ClassMemberImpl struct {
}

func (c *ClassMemberImpl) Add(s *demo.Student) (err error) {
    fmt.Println(s)
    students[s.Sid] = s
    return nil
}

func (c *ClassMemberImpl) List(callTime int64) (r []*demo.Student, err error) {
    for _, s := range students {
        r = append(r, s)
    }
    return r, nil
}

func (c *ClassMemberImpl) IsNameExist(callTime int64, name string) (r bool, err error) {
    for _, s := range students {
        if s.Sname == name {
            return true, nil
        }
    }
    return false, nil
}

var students map[int32]*demo.Student

func main() {

    students = make(map[int32]*demo.Student, 5)

    transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
    serverTransport, err := thrift.NewTServerSocket(NetworkAddr)
    if err != nil {
        fmt.Println("Error!", err)
        os.Exit(1)
    }

    handler := &ClassMemberImpl{}
    processor := demo.NewClassMemberProcessor(handler)
    server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
    fmt.Println("thrift server in", NetworkAddr)
    server.Serve()
}
View Code
#client.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, glob, time,datetime
sys.path.append('gen-py')
#print glob.glob('libpy/lib.*')
#sys.path.insert(0, glob.glob('libpy/lib.*')[0])

from my.test.demo import ClassMember
from my.test.demo.ttypes import *

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

try:
  startTime = time.time()*1000
  # Make socket
  transport = TSocket.TSocket('127.0.0.1', 10086)

  # Framed is critical. Raw sockets are very slow
  transport = TTransport.TFramedTransport(transport)

  # Wrap in a protocol
  protocol = TBinaryProtocol.TBinaryProtocol(transport)

  # Create a client to use the protocol encoder
  client = ClassMember.Client(protocol)

  # Connect!
  transport.open()

  for i in range(1,6):
    i+=1
    s = Student()  
    s.sid = i
    s.sname = "name_{}".format(i)
    r = client.Add(s)
    print(s)
  
  endTime = time.time()*1000

  print client.List(time.time()*1000)

  print "use:%d-%d=%dms" %(endTime,startTime, (endTime - startTime))
  # Close!
  transport.close()

except Thrift.TException, tx:
  print 'ERROR:%s' % (tx.message)

 

編寫代碼之后,編譯和運行可能需要安裝對應的語言包:

#python的thrift package安裝
sudo easy_install thrift==0.9.0

#golang的package安裝
go get git.apache.org/thrift.git/lib/go/thrift

編譯之后運行:

qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest $./client 
add 0 student <nil>
add 1 student <nil>
add 2 student <nil>
add 3 student <nil>
add 4 student <nil>
<nil>
Student({Sid:0 Sname:name_0 Ssex:false Sage:0})
Student({Sid:1 Sname:name_1 Ssex:false Sage:0})
Student({Sid:2 Sname:name_2 Ssex:false Sage:0})
Student({Sid:3 Sname:name_3 Ssex:false Sage:0})
Student({Sid:4 Sname:name_4 Ssex:false Sage:0})
calltime:1429856944847-1429856929841=15006ms
qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest/gen-py $python client.py 
Student(sname='name_2', sage=None, ssex=False, sid=2)
Student(sname='name_3', sage=None, ssex=False, sid=3)
Student(sname='name_4', sage=None, ssex=False, sid=4)
Student(sname='name_5', sage=None, ssex=False, sid=5)
Student(sname='name_6', sage=None, ssex=False, sid=6)
[Student(sname='name_2', sage=0, ssex=False, sid=2), Student(sname='name_3', sage=0, ssex=False, sid=3), Student(sname='name_4', sage=0, ssex=False, sid=4), Student(sname='name_5', sage=0, ssex=False, sid=5), Student(sname='name_6', sage=0, ssex=False, sid=6)]
use:1429858724602-1429858724600=1ms

跨語言OK。

 

二、thrift go package 

qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest $./client 
add 0 student <nil>
add 1 student <nil>
add 2 student <nil>
add 3 student EOF
add 4 student write tcp 127.0.0.1:10086: broken pipe
write tcp 127.0.0.1:10086: broken pipe
calltime:1429856896694-1429856881690=15004ms

在client運行過程中把server干掉,可以看到返回錯誤。

這里超時、心跳,重連都要自己來搞,不像ZeroMQ,全部都幫你搞掂。

另外,Golang里面沒有下面兩種實現:

TThreadPoolServer – 多線程服務模型,使用標准的阻塞式IO。
TNonblockingServer – 多線程服務模型,使用非阻塞式IO(需使用TFramedTransport數據傳輸方式)

查看源碼:

func (p *TSimpleServer) AcceptLoop() error {
    for {
        client, err := p.serverTransport.Accept()
        if err != nil {
            select {
            case <-p.quit:
                return nil
            default:
            }
            return err
        }
        if client != nil {
            go func() { if err := p.processRequests(client); err != nil { log.Println("error processing request:", err) } }()
        }
    }
}

func (p *TSimpleServer) Serve() error {
    err := p.Listen()
    if err != nil {
        return err
    }
    p.AcceptLoop()
    return nil
}

可以看到simple server本身就是異步非阻塞的。

func (p *TSimpleServer) processRequests(client TTransport) error {
    processor := p.processorFactory.GetProcessor(client)
    inputTransport := p.inputTransportFactory.GetTransport(client)
    outputTransport := p.outputTransportFactory.GetTransport(client)
    inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
    outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
    defer func() {
        if e := recover(); e != nil {
            log.Printf("panic in processor: %s: %s", e, debug.Stack())
        }
    }()
    if inputTransport != nil {
        defer inputTransport.Close()
    }
    if outputTransport != nil {
        defer outputTransport.Close()
    }
    for {
        ok, err := processor.Process(inputProtocol, outputProtocol) if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
            return nil
        } else if err != nil {
            log.Printf("error processing request: %s", err)
            return err
        }
        if !ok {
            break
        }
    }
    return nil
}
 
可以看到實現是在生成的代碼里面的:
 
func NewClassMemberProcessor(handler ClassMember) *ClassMemberProcessor {

    self6 := &ClassMemberProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
    self6.processorMap["List"] = &classMemberProcessorList{handler: handler}
    self6.processorMap["Add"] = &classMemberProcessorAdd{handler: handler}
    self6.processorMap["IsNameExist"] = &classMemberProcessorIsNameExist{handler: handler}
    return self6
}

func (p *ClassMemberProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    name, _, seqId, err := iprot.ReadMessageBegin()
    if err != nil {
        return false, err
    }
    if processor, ok := p.GetProcessorFunction(name); ok { return processor.Process(seqId, iprot, oprot) }
    iprot.Skip(thrift.STRUCT)
    iprot.ReadMessageEnd()
    x7 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
    oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
    x7.Write(oprot)
    oprot.WriteMessageEnd()
    oprot.Flush()
    return false, x7

}

type classMemberProcessorList struct {
    handler ClassMember
}

func (p *classMemberProcessorList) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    args := ListArgs{}
    if err = args.Read(iprot); err != nil {
        iprot.ReadMessageEnd()
        x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
        oprot.WriteMessageBegin("List", thrift.EXCEPTION, seqId)
        x.Write(oprot)
        oprot.WriteMessageEnd()
        oprot.Flush()
        return false, err
    }

    iprot.ReadMessageEnd()
    result := ListResult{}
    var retval []*Student
    var err2 error
    if retval, err2 = p.handler.List(args.CallTime); err2 != nil {
        x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing List: "+err2.Error())
        oprot.WriteMessageBegin("List", thrift.EXCEPTION, seqId)
        x.Write(oprot)
        oprot.WriteMessageEnd()
        oprot.Flush()
        return true, err2
    } else {
        result.Success = retval
    }
    if err2 = oprot.WriteMessageBegin("List", thrift.REPLY, seqId); err2 != nil {
        err = err2
    }
    if err2 = result.Write(oprot); err == nil && err2 != nil {
        err = err2
    }
    if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
        err = err2
    }
    if err2 = oprot.Flush(); err == nil && err2 != nil {
        err = err2
    }
    if err != nil {
        return
    }
    return true, err
}

 

如果是多語言協作,另外一種方式json+http,實現成本感覺沒有thrift這么大。

 

另外,性能可以參考, http://www.tuicool.com/articles/rEj6fie 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM