RPC原理以及GRPC詳解


一、RPC原理

1、RPC框架由來

單體應用體量越來越大,代碼不好維護和管理,所以就產生了微服務架構,按照公共或功能模塊拆分為一個個獨立的服務,然后各獨立的服務之間可以相互調用。

微服務之間相互調用,該如何實現?
首先要解決下面5個問題:
1、如何規定遠程調用的語法?
2、如何傳遞參數?
3、如何表示數據?
4、如何知道一個服務端都實現了哪些遠程調用?從哪個端口可以訪問這個遠程調用?
5、發生了錯誤、重傳、丟包、性能等問題怎么辦?

大家可能都寫過socket或則http通信,簡單的client訪問server的模式,認為通過這個就可以解決服務之間的相互調用了,但是考慮下上面5個問題,處理起來就不是那么容易的事情了,非個人可以完成的工作。

於是就誕生了RPC框架,讓我們不用管底層實現,簡單好用:
RPC統一框架圖

2、RPC框架原理

當客戶端的應用想發起一個遠程調用時,它實際是調用客戶端的 Stub。它負責將調用的接口、方法和參數,通過約定的協議規范進行編碼,並通過本地的 RPCRuntime 進行傳輸,將調用網絡包發送到服務器。服務器端的 RPCRuntime 收到請求后,交給服務器端的 Stub 進行解碼,然后調用服務端的方法,服務端執行方法,返回結果,服務器端的 Stub 將返回結果編碼后,發送給客戶端,客戶端的 RPCRuntime 收到結果,發給客戶端的 Stub 解碼得到結果,返回給客戶端。

1、對於客戶端而言,這些過程是透明的,就像本地調用一樣;對於服務端而言,專注於業務邏輯的處理就可以了。
2、對於 Stub 層,處理雙方約定好的語法、語義、封裝、解封裝。
3、對於 RPCRuntime,主要處理高性能的傳輸,以及網絡的錯誤和異常。

來看一下RPC框架是如何解決上面5個問題的:1、2、3的問題可以由Stub層解決,4的問題可以由服務注冊和發布解決,5的問題可以由RPCRuntime解決。
個人理解Stub層的語法、語義約定現在常用的都是協議文件方式,流行的有xml、json、protocol buffers等等,它們各自都帶有自己特有的封裝解封裝方法,封裝解封裝主要解決兩個問題:
1、壓縮數據,減少傳輸數據量;
2、數據結構中字段的映射關系;

二、GRPC原理

gRPC 是一個高性能、開源和通用的 RPC 框架,面向移動和 HTTP/2 設計。目前提供 C、Java 和 Go 語言版本,分別是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。

本文以GO語言版本講解
1、Golang安裝GRPC
詳細安裝連接
2、protocol buffer原理文章
文章1
文章2
3、grpc-go github地址:
github倉庫
godoc文檔

grpc-go 的Stub層協議約定問題通過.proto文件約定好服務接口、參數等,通過工具protoc-gen-go生成客戶端和服務端共用的對照表,想生成什么語言文件就用相應的插件,這樣就實現了跨語言。
生成GO語言文件使用命令如下:
protoc --go_out=plugins=grpc:. *.proto
gRPC RPCRuntime層基於HTTP/2設計,帶來諸如雙向流、流控、頭部壓縮、單 TCP 連接上的多復用請求等特性。

GRPC Server端啟動

1、整體啟動過程

func main() {
	//解析運行參數
	flag.Parse()
	//配置監聽協議、地址、端口
	lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	//grpc額外的服務配置,這里主要是需不需要加密
	var opts []grpc.ServerOption
	if *tls {
		if *certFile == "" {
			*certFile = testdata.Path("server1.pem")
		}
		if *keyFile == "" {
			*keyFile = testdata.Path("server1.key")
		}
		creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
		if err != nil {
			log.Fatalf("Failed to generate credentials %v", err)
		}
		opts = []grpc.ServerOption{grpc.Creds(creds)}
	}
	//grpc服務初始化,綁定一些配置參數
	grpcServer := grpc.NewServer(opts...)
	//把.proto文件中定義的接口API實現注冊到grpc服務上,方便調用
	pb.RegisterRouteGuideServer(grpcServer, newServer())
	//grpc服務啟動,開始監聽
	grpcServer.Serve(lis)
}

2、Serve函數

關鍵處理就是一個for循環。如果Accept() 返回錯誤,並且錯誤是臨時性的,那么會有重試,重試時間以5ms翻倍增長,直到1s。

for {
		rawConn, err := lis.Accept()
		//錯誤處理
		if err != nil {
			if ne, ok := err.(interface {
				Temporary() bool
			}); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				s.mu.Lock()
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
				s.mu.Unlock()
				timer := time.NewTimer(tempDelay)
				select {
				case <-timer.C:
				case <-s.quit.Done():
					timer.Stop()
					return nil
				}
				continue
			}
			s.mu.Lock()
			s.printf("done serving; Accept = %v", err)
			s.mu.Unlock()

			if s.quit.HasFired() {
				return nil
			}
			return err
		}
		tempDelay = 0
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
		// loop goroutine.
		//
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
		// s.conns before this conn can be added.
		s.serveWG.Add(1)
		//重新啟動一個goroutine處理accept的連接
		go func() {
			s.handleRawConn(rawConn)
			s.serveWG.Done()
		}()
	}

3、handleRawConn函數

主要作用就是獲取一個服務端的Transport,並開一個goroutine等待處理stream,里面會涉及到調用注冊的方法。

st := s.newHTTP2Transport(conn, authInfo)
	if st == nil {
		return
	}

	rawConn.SetDeadline(time.Time{})
	if !s.addConn(st) {
		return
	}
	go func() {
		s.serveStreams(st)
		s.removeConn(st)
	}()

GRPC Client端啟動

1、建立連接和綁定實現的接口

//解析運行參數
	flag.Parse()
	//連接的一些配置,主要是加密,安全、阻塞
	var opts []grpc.DialOption
	if *tls {
		if *caFile == "" {
			*caFile = testdata.Path("ca.pem")
		}
		creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
		if err != nil {
			log.Fatalf("Failed to create TLS credentials %v", err)
		}
		opts = append(opts, grpc.WithTransportCredentials(creds))
	} else {
		opts = append(opts, grpc.WithInsecure())
	}

	opts = append(opts, grpc.WithBlock())
	//建立一個連接
	conn, err := grpc.Dial(*serverAddr, opts...)
	if err != nil {
		log.Fatalf("fail to dial: %v", err)
	}
	defer conn.Close()
	
	//創建一個實現了.proto文件定義的接口API的Client
	client := pb.NewRouteGuideClient(conn)

2、Client調用方式

Unary RPC: 一元RPC

func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
	out := new(Feature)
	err := c.cc.Invoke(ctx, "/routeguide.RouteGuide/GetFeature", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// printFeature gets the feature for the given point.
func printFeature(client pb.RouteGuideClient, point *pb.Point) {
	log.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	feature, err := client.GetFeature(ctx, point)
	if err != nil {
		log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
	}
	log.Println(feature)
}

// GetFeature returns the feature at the given point.
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
	for _, feature := range s.savedFeatures {
		if proto.Equal(feature.Location, point) {
			return feature, nil
		}
	}
	// No feature was found, return an unnamed feature
	return &pb.Feature{Location: point}, nil
}

Server-Side streaming RPC: 服務端流式RPC

func (c *routeGuideClient) ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error) {
	stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[0], "/routeguide.RouteGuide/ListFeatures", opts...)
	if err != nil {
		return nil, err
	}
	x := &routeGuideListFeaturesClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}

// printFeatures lists all the features within the given bounding Rectangle.
func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
	log.Printf("Looking for features within %v", rect)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.ListFeatures(ctx, rect)
	if err != nil {
		log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
	}
	for {
		feature, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
		}
		log.Println(feature)
	}
}

// ListFeatures lists all features contained within the given bounding Rectangle.
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
	for _, feature := range s.savedFeatures {
		if inRange(feature.Location, rect) {
			if err := stream.Send(feature); err != nil {
				return err
			}
		}
	}
	return nil
}

Client-Side streaming RPC: 客戶端流式RPC

func (c *routeGuideClient) RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error) {
	stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[1], "/routeguide.RouteGuide/RecordRoute", opts...)
	if err != nil {
		return nil, err
	}
	x := &routeGuideRecordRouteClient{stream}
	return x, nil
}

// runRecordRoute sends a sequence of points to server and expects to get a RouteSummary from server.
func runRecordRoute(client pb.RouteGuideClient) {
	// Create a random number of random points
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
	var points []*pb.Point
	for i := 0; i < pointCount; i++ {
		points = append(points, randomPoint(r))
	}
	log.Printf("Traversing %d points.", len(points))
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.RecordRoute(ctx)
	if err != nil {
		log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
	}
	for _, point := range points {
		if err := stream.Send(point); err != nil {
			log.Fatalf("%v.Send(%v) = %v", stream, point, err)
		}
	}
	reply, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
	}
	log.Printf("Route summary: %v", reply)
}


// RecordRoute records a route composited of a sequence of points.
//
// It gets a stream of points, and responds with statistics about the "trip":
// number of points,  number of known features visited, total distance traveled, and
// total time spent.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
	var pointCount, featureCount, distance int32
	var lastPoint *pb.Point
	startTime := time.Now()
	for {
		point, err := stream.Recv()
		if err == io.EOF {
			endTime := time.Now()
			return stream.SendAndClose(&pb.RouteSummary{
				PointCount:   pointCount,
				FeatureCount: featureCount,
				Distance:     distance,
				ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
			})
		}
		if err != nil {
			return err
		}
		pointCount++
		for _, feature := range s.savedFeatures {
			if proto.Equal(feature.Location, point) {
				featureCount++
			}
		}
		if lastPoint != nil {
			distance += calcDistance(lastPoint, point)
		}
		lastPoint = point
	}
}

Bidirectional streaming RPC : 雙向流式RPC

func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
	stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[2], "/routeguide.RouteGuide/RouteChat", opts...)
	if err != nil {
		return nil, err
	}
	x := &routeGuideRouteChatClient{stream}
	return x, nil
}

// runRouteChat receives a sequence of route notes, while sending notes for various locations.
func runRouteChat(client pb.RouteGuideClient) {
	notes := []*pb.RouteNote{
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.RouteChat(ctx)
	if err != nil {
		log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				// read done.
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("Failed to receive a note : %v", err)
			}
			log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
		}
	}()
	for _, note := range notes {
		if err := stream.Send(note); err != nil {
			log.Fatalf("Failed to send a note: %v", err)
		}
	}
	stream.CloseSend()
	<-waitc
}

// RouteChat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		key := serialize(in.Location)

		s.mu.Lock()
		s.routeNotes[key] = append(s.routeNotes[key], in)
		// Note: this copy prevents blocking other clients while serving this one.
		// We don't need to do a deep copy, because elements in the slice are
		// insert-only and never modified.
		rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
		copy(rn, s.routeNotes[key])
		s.mu.Unlock()

		for _, note := range rn {
			if err := stream.Send(note); err != nil {
				return err
			}
		}
	}
}

Client 連接底層兩個主要方法

1、Invoke函數

newClientStream:獲取傳輸層 Trasport 並組合封裝到 ClientStream 中返回,在這塊會涉及負載均衡、超時控制、 Encoding、 Stream 的動作,與服務端基本一致的行為。
cs.SendMsg:發送 RPC 請求出去,但其並不承擔等待響應的功能。
cs.RecvMsg:阻塞等待接受到的 RPC 方法響應結果。

// Invoke sends the RPC request on the wire and returns after response is
// received.  This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
	// allow interceptor to see all applicable call options, which means those
	// configured as defaults from dial option as well as per-call options
	opts = combine(cc.dopts.callOptions, opts)

	if cc.dopts.unaryInt != nil {
		return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
	}
	return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}

2、NewStream函數

// NewStream creates a new Stream for the client side. This is typically
// called by generated code. ctx is used for the lifetime of the stream.
//
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
//      1. Call Close on the ClientConn.
//      2. Cancel the context provided.
//      3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
//         client-streaming RPC, for instance, might use the helper function
//         CloseAndRecv (note that CloseSend does not Recv, therefore is not
//         guaranteed to release all resources).
//      4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
	// allow interceptor to see all applicable call options, which means those
	// configured as defaults from dial option as well as per-call options
	opts = combine(cc.dopts.callOptions, opts)

	if cc.dopts.streamInt != nil {
		return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
	}
	return newClientStream(ctx, desc, cc, method, opts...)
}

總結

gRPC的優點和缺點:

優點:

1、protobuf二進制消息,性能好/效率高(空間和時間效率都很不錯);
2、proto文件生成目標代碼,簡單易用;
3、序列化反序列化直接對應程序中的數據類,不需要解析后在進行映射(XML,JSON都是這種方式);
4、支持向前兼容(新加字段采用默認值)和向后兼容(忽略新加字段),簡化升級;
5、支持多種語言(可以把proto文件看做IDL文件);
6、Netty等一些框架集成;

缺點:

1、GRPC尚未提供連接池,需要自行實現;
2、尚未提供“服務發現”、“負載均衡”機制;
3、因為基於HTTP2,絕大部多數HTTP Server、Nginx都尚不支持,即Nginx不能將GRPC請求作為HTTP請求來負載均衡,而是作為普通的TCP請求。(nginx1.9版本已支持);
4、Protobuf二進制可讀性差(貌似提供了Text_Fromat功能,沒用過);
5、默認不具備動態特性(可以通過動態定義生成消息類型或者動態編譯支持);

參考資料

1.從實踐到原理,帶你參透GRPC.
2.極客時間:趣談網絡協議32-36講.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM