Go微服務 grpc/protobuf


 

了解grpc/protobuf

gRPC是一個高性能、通用的開源RPC框架,其由Google主要面向移動應用開發並基於HTTP/2協議標准而設計,基於ProtoBuf(Protocol Buffers)序列化協議開發,且支持眾多開發語言。
gRPC提供了一種簡單的方法來精確地定義服務和為iOS、Android和后台支持服務自動生成可靠性很強的客戶端功能庫。客戶端充分利用高級流和鏈接功能,從而有助於節省帶寬、降低的TCP鏈接次數、節省CPU使用、和電池壽命。

Protobuf(Protocol Buffers),是 Google 開發的一種跨語言、跨平台的可擴展機制,用於序列化結構化數據。
grpc與傳統的 REST 架構相比,REST架構通過 http 傳輸 JSON 或者 XML ,會帶來了一個問題:服務 A 把原始數據編碼成 JSON/XML 格式,發送一長串字符給服務 B,B 通過解碼還原成原始數據,通信的總體數據量很大。
但在兩個微服務的通信間,我們不需要字符串中的所有數據,所以我們采用難理解但更加輕量的二進制數據進行交互。gRPC 采用的是支持二進制數據的 HTTP 2.0 規范,而protobuf負責處理二進制數據, 它更小、更快、更便捷。
protobuf 目前支持 C++、Java、Python、Objective-C,如果使用 proto3,還支持 C#、Ruby、Go、PHP、JavaScript 等語言。

官網地址:https://developers.google.cn/protocol-buffers/
GitHub 地址:https://github.com/protocolbuffers/protobuf

Grpc中文文檔:http://doc.oschina.net/grpc?t=60133

 

優點:

  • 性能好
  • 跨語言

缺點:

  • 二進制格式可讀性差:為了提高性能,protobuf 采用了二進制格式進行編碼,這直接導致了可讀性差。
  • 缺乏自描述:XML 是自描述的,而 protobuf 不是,不配合定義的結構體是看不出來什么作用的。

 

 環境配置

第一步:安裝protobuf:

  1. 先下載protoc:https://github.com/protocolbuffers/protobuf/releases/
  2. 把這么文件里面的bin里面的protoc.exe 復制到GOPATH/bin下,GOPATH/bin加入環境變量。也可以放到別的目錄,把那個目錄 配置到環境變量。反正最后的效果就是 在cmd 輸入 protoc 不會報錯。
  3. 在cmd 中運行 go get -u github.com/golang/protobuf/protoc-gen-go 獲取protobuf的編譯器插件 protoc-gen-go。
  4. 到這里 protobuf 就配的差不多了,protoc --go_out=plugins=grpc:. xxxx.proto 這條命令就是 編譯 協議的,具體怎么使用我們以后說。

第二步:安裝grpc:

  1. 第一種方法:官方的安裝方法是 go get -u google.golang.org/grpc ,但是需要翻牆。
  2. 第二種方法:從git上 克隆 grpc 的各種 依賴庫 ,然后 移到我們 的 GOPATH 目錄下面。(網上找的代碼,親測有效)。
    git clone https://github.com/grpc/grpc-go.git        $GOPATH/src/google.golang.org/grpc
    git clone https://github.com/golang/net.git          $GOPATH/src/golang.org/x/net
    git clone https://github.com/golang/text.git         $GOPATH/src/golang.org/x/text
    git clone https://github.com/google/go-genproto.git  $GOPATH/src/google.golang.org/genproto
    cd $GOPATH/src/

第三步:安裝gRPC運行時接口編解碼支持庫

1
go  get -u github.com/golang/protobuf/proto 

注意:上面這個文件也需要移到src目錄下。

調用過程

1、客戶端(gRPC Stub)調用 A 方法,發起 RPC 調用。

2、對請求信息使用 Protobuf 進行對象序列化壓縮(IDL)。

3、服務端(gRPC Server)接收到請求后,解碼請求體,進行業務邏輯處理並返回。

4、對響應結果使用 Protobuf 進行對象序列化壓縮(IDL)。

5、客戶端接受到服務端響應,解碼請求體。回調被調用的 A 方法,喚醒正在等待響應(阻塞)的客戶端調用並返回響應結果。

調用方式

一、Unary RPC:一元 RPC

Server

type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
    return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

const PORT = "9001"

func main() {
    server := grpc.NewServer()
    pb.RegisterSearchServiceServer(server, &SearchService{})

    lis, err := net.Listen("tcp", ":"+PORT)
    ...

    server.Serve(lis)
}
  • 創建 gRPC Server 對象,你可以理解為它是 Server 端的抽象對象。

  • 將 SearchService(其包含需要被調用的服務端接口)注冊到 gRPC Server。的內部注冊中心。這樣可以在接受到請求時,通過內部的 “服務發現”,發現該服務端接口並轉接進行邏輯處理。

  • 創建 Listen,監聽 TCP 端口。

  • gRPC Server 開始 lis.Accept,直到 Stop 或 GracefulStop。

Client

func main() {
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
    ...
    defer conn.Close()

    client := pb.NewSearchServiceClient(conn)
    resp, err := client.Search(context.Background(), &pb.SearchRequest{
        Request: "gRPC",
    })
    ...
}
  • 創建與給定目標(服務端)的連接句柄。

  • 創建 SearchService 的客戶端對象。

  • 發送 RPC 請求,等待同步響應,得到回調后返回響應結果。

二、Server-side streaming RPC:服務端流式 RPC

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    for n := 0; n <= 6; n++ {
        stream.Send(&pb.StreamResponse{
            Pt: &pb.StreamPoint{
                ...
            },
        })
    }

    return nil
}

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.List(context.Background(), r)
    ...

    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    return nil
}

三、Client-side streaming RPC:客戶端流式 RPC

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}})
        }
        ...

    }

    return nil
}

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Record(context.Background())
    ...

    for n := 0; n < 6; n++ {
        stream.Send(r)
    }

    resp, err := stream.CloseAndRecv()
    ...

    return nil
}

四、Bidirectional streaming RPC:雙向流式 RPC

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    for {
        stream.Send(&pb.StreamResponse{...})
        r, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        ...
    }

    return nil
}

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Route(context.Background())
    ...

    for n := 0; n <= 6; n++ {
        stream.Send(r)
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    stream.CloseSend()

    return nil
}

 

淺談理解

 

服務端

為什么四行代碼,就能夠起一個 gRPC Server,內部做了什么邏輯。你有想過嗎?接下來我們一步步剖析,看看里面到底是何方神聖。

一、初始化

// grpc.NewServer()
func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range opt {
        o(&opts)
    }
    s := &Server{
        lis:    make(map[net.Listener]bool),
        opts:   opts,
        conns:  make(map[io.Closer]bool),
        m:      make(map[string]*service),
        quit:   make(chan struct{}),
        done:   make(chan struct{}),
        czData: new(channelzData),
    }
    s.cv = sync.NewCond(&s.mu)
    ...

    return s
}

這塊比較簡單,主要是實例 grpc.Server 並進行初始化動作。涉及如下:

  • lis:監聽地址列表。

  • opts:服務選項,這塊包含 Credentials、Interceptor 以及一些基礎配置。

  • conns:客戶端連接句柄列表。

  • m:服務信息映射。

  • quit:退出信號。

  • done:完成信號。

  • czData:用於存儲 ClientConn,addrConn 和 Server 的channelz 相關數據。

  • cv:當優雅退出時,會等待這個信號量,直到所有 RPC 請求都處理並斷開才會繼續處理。

二、注冊

pb.RegisterSearchServiceServer(server, &SearchService{})

步驟一:Service API interface

// search.pb.go
type SearchServiceServer interface {
    Search(context.Context, *SearchRequest) (*SearchResponse, error)
}

func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) {
    s.RegisterService(&_SearchService_serviceDesc, srv)
}

還記得我們平時編寫的 Protobuf 嗎?在生成出來的.pb.go文件中,會定義出 Service APIs interface 的具體實現約束。而我們在 gRPC Server 進行注冊時,會傳入應用 Service 的功能接口實現,此時生成的RegisterServer方法就會保證兩者之間的一致性。

步驟二:Service API IDL

你想亂傳糊弄一下?不可能的,請乖乖定義與 Protobuf 一致的接口方法。但是那個&_SearchService_serviceDesc又有什么作用呢?代碼如下:

// search.pb.go
var _SearchService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "proto.SearchService",
    HandlerType: (*SearchServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Search",
            Handler:    _SearchService_Search_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "search.proto",
}

這看上去像服務的描述代碼,用來向內部表述 “我” 都有什么。涉及如下:

  • ServiceName:服務名稱

  • HandlerType:服務接口,用於檢查用戶提供的實現是否滿足接口要求

  • Methods:一元方法集,注意結構內的Handler方法,其對應最終的 RPC 處理方法,在執行 RPC 方法的階段會使用。

  • Streams:流式方法集

  • Metadata:元數據,是一個描述數據屬性的東西。在這里主要是描述SearchServiceServer服務

步驟三:Register Service

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    ...
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        ...
    }
    s.m[sd.ServiceName] = srv
}

在最后一步中,我們會將先前的服務接口信息、服務描述信息給注冊到內部service去,以便於后續實際調用的使用。涉及如下:

  • server:服務的接口信息

  • md:一元服務的 RPC 方法集

  • sd:流式服務的 RPC 方法集

  • mdata:metadata,元數據

小結

在這一章節中,主要介紹的是 gRPC Server 在啟動前的整理和注冊行為,看上去很簡單,但其實一切都是為了后續的實際運行的預先准備。因此我們整理一下思路,將其串聯起來看看,如下:

三、監聽

接下來到了整個流程中,最重要也是大家最關注的監聽/處理階段,核心代碼如下:

func (s *Server) Serve(lis net.Listener) error {
    ...
    var tempDelay time.Duration 
    for {
        rawConn, err := lis.Accept()
        if err != nil {
            if ne, ok := err.(interface {
                Temporary() bool
            }); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                ...
                timer := time.NewTimer(tempDelay)
                select {
                case <-timer.C:
                case <-s.quit:
                    timer.Stop()
                    return nil
                }
                continue
            }
            ...
            return err
        }
        tempDelay = 0

        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
    }
}

Serve 會根據外部傳入的 Listener 不同而調用不同的監聽模式,這也是net.Listener的魅力,靈活性和擴展性會比較高。而在 gRPC Server 中最常用的就是TCPConn,基於 TCP Listener 去做。接下來我們一起看看具體的處理邏輯,如下:

 

  • 循環處理連接,通過lis.Accept取出連接,如果隊列中沒有需處理的連接時,會形成阻塞等待。

  • lis.Accept失敗,則觸發休眠機制,若為第一次失敗那么休眠 5ms,否則翻倍,再次失敗則不斷翻倍直至上限休眠時間 1s,而休眠完畢后就會嘗試去取下一個 “它”。

  • lis.Accept成功,則重置休眠的時間計數和啟動一個新的 goroutine 調用handleRawConn方法去執行/處理新的請求,也就是大家很喜歡說的 “每一個請求都是不同的 goroutine 在處理”。

  • 在循環過程中,包含了 “退出” 服務的場景,主要是硬關閉和優雅重啟服務兩種情況。

客戶端

一、創建撥號連接

// grpc.Dial(":"+PORT, grpc.WithInsecure())
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
    ...
    chainUnaryClientInterceptors(cc)
    chainStreamClientInterceptors(cc)

    ...
}

grpc.Dial方法實際上是對於grpc.DialContext的封裝,區別在於ctx是直接傳入context.Background。其主要功能是創建與給定目標的客戶端連接,其承擔了以下職責:

  • 初始化 ClientConn

  • 初始化(基於進程 LB)負載均衡配置

  • 初始化 channelz

  • 初始化重試規則和客戶端一元/流式攔截器

  • 初始化協議棧上的基礎信息

  • 相關 context 的超時控制

  • 初始化並解析地址信息

  • 創建與服務端之間的連接

連沒連

之前聽到有的人說調用grpc.Dial后客戶端就已經與服務端建立起了連接,但這對不對呢?我們先鳥瞰全貌,看看正在跑的 goroutine。如下:

 

我們可以有幾個核心方法一直在等待/處理信號,通過分析底層源碼可得知。涉及如下:

func (ac *addrConn) connect()
func (ac *addrConn) resetTransport()
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time)
func (ac *addrConn) getReadyTransport()

在這里主要分析 goroutine 提示的resetTransport方法,看看都做了啥。核心代碼如下:

func (ac *addrConn) resetTransport() {
    for i := 0; ; i++ {
        if ac.state == connectivity.Shutdown {
            return
        }
        ...
        connectDeadline := time.Now().Add(dialDuration)
        ac.updateConnectivityState(connectivity.Connecting)
        newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
        if err != nil {
            if ac.state == connectivity.Shutdown {
                return
            }
            ac.updateConnectivityState(connectivity.TransientFailure)
            timer := time.NewTimer(backoffFor)
            select {
            case <-timer.C:
                ...
            }
            continue
        }

        if ac.state == connectivity.Shutdown {
            newTr.Close()
            return
        }
        ...
        if !healthcheckManagingState {
            ac.updateConnectivityState(connectivity.Ready)
        }
        ...

        if ac.state == connectivity.Shutdown {
            return
        }
        ac.updateConnectivityState(connectivity.TransientFailure)
    }
}

在該方法中會不斷地去嘗試創建連接,若成功則結束。否則不斷地根據Backoff算法的重試機制去嘗試創建連接,直到成功為止。從結論上來講,單純調用DialContext是異步建立連接的,也就是並不是馬上生效,處於Connecting狀態,而正式下要到達Ready狀態才可用。

二、實例化 Service API

type SearchServiceClient interface {
    Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error)
}

type searchServiceClient struct {
    cc *grpc.ClientConn
}

func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient {
    return &searchServiceClient{cc}
}

這塊就是實例 Service API interface,比較簡單。

三、調用

// search.pb.go
func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {
    out := new(SearchResponse)
    err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

proto 生成的 RPC 方法更像是一個包裝盒,把需要的東西放進去,而實際上調用的還是grpc.invoke方法。如下:

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

通過概覽,可以關注到三塊調用。如下:

  • newClientStream:獲取傳輸層 Trasport 並組合封裝到 ClientStream 中返回,在這塊會涉及負載均衡、超時控制、 Encoding、 Stream 的動作,與服務端基本一致的行為。

  • cs.SendMsg:發送 RPC 請求出去,但其並不承擔等待響應的功能。

  • cs.RecvMsg:阻塞等待接受到的 RPC 方法響應結果。

連接

// clientconn.go
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
        FullMethodName: method,
    })
    if err != nil {
        return nil, nil, toRPCErr(err)
    }
    return t, done, nil
}

newClientStream方法中,我們通過getTransport方法獲取了 Transport 層中抽象出來的 ClientTransport 和 ServerTransport,實際上就是獲取一個連接給后續 RPC 調用傳輸使用。

四、關閉連接

// conn.Close()
func (cc *ClientConn) Close() error {
    defer cc.cancel()
    ...
    cc.csMgr.updateState(connectivity.Shutdown)
    ...
    cc.blockingpicker.close()
    if rWrapper != nil {
        rWrapper.close()
    }
    if bWrapper != nil {
        bWrapper.close()
    }

    for ac := range conns {
        ac.tearDown(ErrClientConnClosing)
    }
    if channelz.IsOn() {
        ...
        channelz.AddTraceEvent(cc.channelzID, ted)
        channelz.RemoveEntry(cc.channelzID)
    }
    return nil
}

該方法會取消 ClientConn 上下文,同時關閉所有底層傳輸。涉及如下:

  • Context Cancel

  • 清空並關閉客戶端連接

  • 清空並關閉解析器連接

  • 清空並關閉負載均衡連接

  • 添加跟蹤引用

  • 移除當前通道信息

總結

  • gRPC 基於 HTTP/2 + Protobuf。

  • gRPC 有四種調用方式,分別是一元、服務端/客戶端流式、雙向流式。

  • gRPC 的附加信息都會體現在 HEADERS 幀,數據在 DATA 幀上。

  • Client 請求若使用 grpc.Dial 默認是異步建立連接,當時狀態為 Connecting。

  • Client 請求若需要同步則調用 WithBlock(),完成狀態為 Ready。

  • Server 監聽是循環等待連接,若沒有則休眠,最大休眠時間 1s;若接收到新請求則起一個新的 goroutine 去處理。

  • grpc.ClientConn 不關閉連接,會導致 goroutine 和 Memory 等泄露。

  • 任何內/外調用如果不加超時控制,會出現泄漏和客戶端不斷重試。

  • 特定場景下,如果不對 grpc.ClientConn 加以調控,會影響調用。

  • 攔截器如果不用 go-grpc-middleware 鏈式處理,會覆蓋。

  • 在選擇 gRPC 的負載均衡模式時,需要謹慎。

參考

  • http://doc.oschina.net/grpc

  • https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md

  • https://juejin.im/post/5b88a4f56fb9a01a0b31a67e

  • https://www.ibm.com/developerworks/cn/web/wa-http2-under-the-hood/index.html

  • https://github.com/grpc/grpc-go/issues/1953

  • https://www.zhihu.com/question/52670041

可以拷貝的代碼見:

https://github.com/EDDYCJY/blog/blob/master/golang/gRPC/2019-06-28-talking-grpc.md

原文鏈接:https://mp.weixin.qq.com/s/qet7FX26HGnXgLIG-lOSyw


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM