Go gRPC 學習系列:
第一篇內容我們已經基本了解到 gRPC 如何使用 、對應的三種流模式。現在已經可以讓服務端和客戶端互相發送消息。本篇仍然講解功能性的使用說明:如何使用攔截器。使用過 Java 的同學知道 Spring 或者 Dubbo,這兩個框架都提供了攔截器的支持,攔截器的作用無需多言,鑒權,Tracing,數據統計等等。
在 gRPC 中攔截器的實現會稍微有所不同,原因在於 gRPC 多了一種流式數據傳輸模式。所以這種攔截器的處理也變得相對復雜。
攔截器類型
- UnaryServerInterceptor 服務端攔截,在服務端接收請求的時候進行攔截。
- UnaryClientInterceptor 這是一個客戶端上的攔截器,在客戶端真正發起調用之前,進行攔截。
- StreamClientInterceptor 在流式客戶端調用時,通過攔截 clientstream 的創建,返回一個自定義的 clientstream, 可以做一些額外的操作。
- StreamServerInterceptor 在服務端接收到流式請求的時候進行攔截。
攔截器使用
普通攔截器
在 gRPC 中攔截器被定義成一個變量:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
參數含義如下:
- ctx context.Context:請求上下文
- req interface{}:RPC 方法的請求參數
- info *UnaryServerInfo:RPC 方法的所有信息
- handler UnaryHandler:RPC 方法真正執行的邏輯
它本質是一個方法,攔截器的應用是在服務端啟動的時候要注冊上去,從 grpc.NewServer(opts...)
這里開始,這里需要一個 ServerOption 對象:
//注冊攔截器 創建gRPC服務器
s := grpc.NewServer(grpc.UnaryInterceptor(LoggingInterceptor))
gRPC 在 v1.28.0 版本增加了多 interceptor 支持,可以在不借助第三方庫(go-grpc-middleware)的情況下添加多個 interceptor。看一下 grpc.UnaryInterceptor() 方法的定義:
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
})
}
參數為一個 UnaryServerInterceptor ,即傳入 UnaryServerInterceptor 類型的方法即可,所以自定義一個攔截器就變得很簡單,只需要定義一個 UnaryServerInterceptor 類型的方法。比如我們實現一個打印日志的攔截器:
//攔截器 - 打印日志
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
fmt.Printf("gRPC method: %s, %v", info.FullMethod, req)
resp, err := handler(ctx, req)
fmt.Printf("gRPC method: %s, %v", info.FullMethod, resp)
return resp, err
}
可以看到只需要按照 UnaryServerInterceptor 方法的參數去構造即可。然后就是應用,在 server 注冊的時候:
grpc.NewServer(grpc.UnaryInterceptor(LoggingInterceptor)) // 創建gRPC服務器
......
......
......
將攔截器注冊上就行,是不是很簡單。
流攔截器
流攔截器過程和一元攔截器有所不同,同樣可以分為3個階段:
- 預處理(pre-processing)
- 調用RPC方法(invoking RPC method)
- 后處理(post-processing)
預處理階段的攔截只是在流式請求第一次 發起的時候進行攔截,后面的流式請求不會再進入到處理邏輯。
后面兩種情況對應着 Streamer api 提供的兩個擴展方法來進行,分別是 SendMsg 和 RecvMsg 方法。
正常情況下實現一個流式攔截器與普通攔截器一樣,實現這個已經定義好的攔截器方法即可:
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
如果是想在發消息之前和之進行處理, 則實現 SendMsg 和 RecvMsg:
type wrappedStream struct {
grpc.ServerStream
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
fmt.Printf("Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
fmt.Printf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}
首先我們自己包裝一個 grpc.ServerStream ,然后去實現它的 SendMsg 和 RecvMsg 方法。然后就是將這個 ServerStream 應用到攔截器中去:
//發消息前后流式調用攔截器
func SendMsgCheckStreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
fmt.Printf("gRPC method: %s,", info.FullMethod)
err := handler(srv, newWrappedStream(ss))
fmt.Printf("gRPC method: %s", info.FullMethod)
return err
}
這里注意到參數 ss 使用我們自己定義的 ServerStream 包裝一下。
相關測試代碼都在這里,點擊查看。