參考資料
-
grpc name resolver原理及實踐:
-
grpc客戶端負載均衡/重試/健康檢查:
- 185.199.111.153 github.io
- http://yangxikun.github.io/golang/2019/10/19/golang-grpc-client-side-lb.html
-
使用dns做resolver以及MAX_CONNECTION_AGE處理dns ttl的問題:
-
介紹grpc.WithDefaultServiceConfig()的參數ServiceConfig的message形式
-
介紹client與server連接機制
-
解析grpc.ClientConn源碼
-
grpc name resolution
-
基於WithDefaultServiceConfig的一個示例
關鍵概念
-
Resolver
- passthrough
- dns
- manual
-
Balancer
- pickerfirst
- roundrobin
- grpclb
-
Picker
- pickerfirst
- roundrobin
- grpclb
實現
目的
定制resolver實現:
- etcd服務發現/注冊(TBD)
- addr多連接支持(N個),替代連接池
思路
支持2種scheme:
- etcd:///endpoint#N, 其中N表示創建N個連接(默認1個)
- pass:///ip1:port1[#N1],ip2:port2[#N2]..., 其中N1,N2表示創建連接數量
對於1的前綴必然是etcd
對於2的前綴可選是extd, pass, addr, 暫定pass, 相對於passthrough而言
問題是如何解析target...
- scheme
- authority
- endpoint
針對endpoint再做解析最后生成相應的結果Address
問題
-
waitForResolvedAddrs阻塞
解決: 參考passthrough的源碼並進行修改
-
測試server端的連接是否有2條? 並且client是否真正roundrobin?
解決: 在server添加creds連接攔截器, 打印每個連接的handshake信息
源碼
- server
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"net"
"os"
...
)
func main() {
grpcAddr := ":9080"
if len(os.Args) > 1 {
grpcAddr = os.Args[1]
}
// 1. 創建server
//通過code設置
svr := http.NewServerWith(&http.Config{
//HttpAddr: ":8080", // 開啟http訪問
GrpcAddr: grpcAddr, // 開啟grpc訪問
WbskCheckOrigin: http.DOWN, // websocket不啟用origin檢測
})
svr.GrpcServerOption(grpc.Creds(new(TransportCredentialsTest)))
//通過conf設置
/*svr := http.NewServer()
*/
// 2. 注冊service. 綁定實現
svr.RegisterService(api.TagServiceRegistry, new(biz.TagServiceService))
// 3. 啟動server. 提供服務
if err := svr.ListenAndServe(); err != nil {
base.DefaultLogger.Errorf("server error: %+v", err)
}
}
type TransportCredentialsTest struct {
}
func (tc *TransportCredentialsTest) ClientHandshake(ctx context.Context, name string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
fmt.Println("ClientHandshake#########################")
return nil, nil, nil
}
func (tc *TransportCredentialsTest) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
fmt.Println("ServerHandshake#########################")
fmt.Printf("Remote Addr %v, %v\n", conn.RemoteAddr().Network(), conn.RemoteAddr().String())
ai := AuthInfoTest("test")
return conn, &ai, nil
}
func (tc *TransportCredentialsTest) Info() credentials.ProtocolInfo {
fmt.Println("Info#########################")
return credentials.ProtocolInfo{}
}
func (tc *TransportCredentialsTest) Clone() credentials.TransportCredentials {
return tc
}
func (tc *TransportCredentialsTest) OverrideServerName(string) error {
fmt.Println("OverrideServerName#########################")
return nil
}
type AuthInfoTest string
func (ai *AuthInfoTest) AuthType() string {
return string(*ai)
}
- client
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
...
)
func main() {
// 默認是pickerfirst
cc, err := grpc.Dial("pass:///:9080#2,:9090#1", grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{ "round_robin":{}}]}`))
if err != nil {
panic(err)
}
defer cc.Close()
cl := api.NewTagServiceClient(cc)
for i := 0; ; i++ {
rsp, err := cl.All(context.Background(), &api.AllReq{
Search: "all",
From: int32(i),
Size: 10,
})
if err != nil {
panic(err)
}
fmt.Printf("%v: %v\n", i, kits.ToJson(rsp.Data))
time.Sleep(500 * time.Millisecond)
}
}
- resolver
package main
import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"strconv"
"strings"
)
/*
支持2種scheme:
1. etcd:///endpoint#N
2. pass:///endpoint1#N1,endpoint2#N2....
*/
func init() {
resolver.Register(new(passAnchorBuilder))
}
type AnchorAddress struct {
Addr string // 服務地址
Anch int // 錨記數量
}
/*
格式: addr1#anch1,addr2#anch2...
*/
func ParseAnchorAddress(endpoint string) (rt []*AnchorAddress) {
var (
addr string
anch int
)
for _, val := range strings.Split(endpoint, ",") {
idx := strings.IndexByte(val, '#')
if idx > 0 {
addr = val[:idx]
anch, _ = strconv.Atoi(val[idx+1:])
} else {
addr = val
}
if anch < 1 {
anch = 1
}
rt = append(rt, &AnchorAddress{
Addr: addr,
Anch: anch,
})
}
return
}
type passAnchorResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (r *passAnchorResolver) ResolveNow(resolver.ResolveNowOptions) {
}
func (r *passAnchorResolver) Close() {
}
func (r *passAnchorResolver) start() {
var state resolver.State
for _, item := range ParseAnchorAddress(r.target.Endpoint) {
for i := 0; i < item.Anch; i++ {
state.Addresses = append(state.Addresses, resolver.Address{
Addr: item.Addr,
Attributes: attributes.New("idx", i),
})
}
}
r.cc.UpdateState(state)
/*下述代碼會在ClientConn.conns生成多個連接對象,但無法配合roundrobin做相關負載均衡*/
//for _, item := range ParseAnchorAddress(r.target.Endpoint) {
// for i := 0; i < item.Anch; i++ {
// r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{
// {
// Addr: item.Addr,
// Attributes: attributes.New("idx", i),
// },
// }})
// }
//}
}
type passAnchorBuilder struct {
}
func (b *passAnchorBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &passAnchorResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}
func (b *passAnchorBuilder) Scheme() string {
return "pass"
}
總結
1. balancer默認是pickerfirst,不是roundrobin
2. resolver.start()邏輯不能放在ResolveNow(),具體參考passthrough
3. ClientConn.UpdateState()多次調用會在ClientConn.conns生成多個連接對象,但無法與roundrobin共用
4. ClientConn.UpdateState()的State的Address必須指定不同的attribute對象,否則會覆蓋去重!
5. client-server端效果達到預期,自動容錯,負載均衡(根據#比例)