go微服務框架go-micro深度學習(五) stream 調用過程詳解


github 例子地址
上一篇寫了一下rpc調用過程的實現方式,簡單來說就是服務端把實現了接口的結構體對象進行反射,抽取方法,簽名,保存,客戶端調用的時候go-micro封請求數據,服務端接收到請求時,找到需要調用調用的對象和對應的方法,利用反射進行調用,返回數據。 但是沒有說stream的實現方式,感覺單獨寫一篇帖子來說這個更好一些。上一篇帖子是基礎,理解了上一篇,stream實現原理一點即破。先說一下使用方式,再說原理。
當前go-micro對 rpc 調用的方式大概如下:
普通的rpc調用 是這樣:

1.連接服務器或者從緩存池得到連接
2.客戶端 ->發送數據 -> 服務端接收
3.服務端 ->返回數據 -> 客戶端處理數據
4.關閉連接或者把連接返回到緩存池

當前 rps stream的實現方式 是這樣子:

1. 連接服務器
2. 客戶端多次發送請求-> 服務端接收
3. 服務端多次返回數據-> 客戶端處理數據
4. 關閉連接

    當數據量比較大的時候我們可以用stream方式分批次傳輸數據。對於客戶端還是服務端沒有限制,我們可以根據自己的需要使用stream方式,使用方式也非常的簡單,在定義接口的時候在參數或者返回值前面加上stream然后就可以多次進行傳輸了,使用的代碼還是之前寫的例子,代碼都在github上:
    比如我的例子中定義了兩個使用stream的接口,一個只在返回值使用stream,另一個是在參數和返回值前都加上了stream,最終的使用方式沒有區別

    rpc Stream(model.SRequest) returns (stream model.SResponse) {}
    rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}

看一下go-micro為我們生成的代碼rpcapi.micro.go里,不要被嚇到,生成了很多代碼,但是沒啥理解不了的
Server端

// Server API for Say service
type SayHandler interface {
	// .... others	
	Stream(context.Context, *model.SRequest, Say_StreamStream) error
	BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error
}
type Say_StreamStream interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Send(*model.SResponse) error
}
type Say_BidirectionalStreamStream interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Send(*model.SResponse) error
	Recv() (*model.SRequest, error)
}
// .... others 

Client端

// Client API for Say service
type SayService interface {	
    //... others
	Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error)
	BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error)
}

type Say_StreamService interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Recv() (*model.SResponse, error)
}

type Say_BidirectionalStreamService interface {
	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error
	Send(*model.SRequest) error
	Recv() (*model.SResponse, error)
}

    你會發現參數前面加了 Stream后,生成的代碼會把你的參數變成一個接口,這個接口主要要的方法是

	SendMsg(interface{}) error
	RecvMsg(interface{}) error
	Close() error

剩下的兩個接口方法是根據你是發送還是接收生成的,如果有發送就會有Send(你的參數),如果有接收會生成Rev() (你的參數, error),但這兩個方法只是為了讓你使用時方便,里面調用的還是SendMsg(interface)和RecvMsg(interface)方法,但是他們是怎么工作的,如何多次發送和接收傳輸的數據,是不是感覺很神奇。

我就以TsBidirectionalStream 方法為例開始分析,上一篇和再早之前的帖子已經說了服務端啟動的時候都做了哪些操作,這里就不再贅述,
服務端的實現,很簡單,不斷的獲取客戶端發過來的數據,再給客戶端一次一次的返回一些數據。

/*
 模擬數據
 */
func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		for i := int64(0); i < req.Count; i++ {
			if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil {
				return err
			}
		}
	}
	return nil
}

啟動服務,服務開始監聽客戶端傳過來的數據.....
客戶端調用服務端方法:

// 調用 
func TsBidirectionalStream(client rpcapi.SayService) {
	rspStream, err := client.BidirectionalStream(context.Background())
	if err != nil {
		panic(err)
	}
	// send
	go func() {
		rspStream.Send(&model.SRequest{Count: 2})
		rspStream.Send(&model.SRequest{Count: 5})
        // close the stream
		if err := rspStream.Close(); err != nil {
			fmt.Println("stream close err:", err)
		}
	}()
     // recv
	idx := 1
	for  {
		rsp, err := rspStream.Recv()

		if err == io.EOF {
			break
		} else if err != nil {
			panic(err)
		}

		fmt.Printf("test stream get idx %d  data  %v\n", idx, rsp)
		idx++
	}
	fmt.Println("Read Value End")
}

當客戶端在調用rpc的stream方法是要很得到stream

rspStream, err := client.BidirectionalStream(context.Background())
// 
func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) {
	req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{})
	stream, err := c.c.Stream(ctx, req, opts...)
	if err != nil {
		return nil, err
	}
	return &sayServiceBidirectionalStream{stream}, nil
}

這個調用c.c.Stream(ctx, req, opts...)是關鍵,他的內部實現就是和服務器進行連接,然后返回一個stream,進行操作。

客戶端:和服務端建立連接,返回Stream,進行接收和發送數據
服務端:接收客戶端連接請求,利用反射找到相應的方法,組織Strem,傳給方法,進行數據的發送和接收

建立連接的時候就是一次rpc調用,服務端接受連接,然后客戶端發送一次調用,但是傳輸的是空數據,服務端利用反射找到具體的方法,組織stream,調用具體方法,利用這個連接,客戶端和服務端進行多次通信。

stream


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM