grpc client連接池及負載均衡實現


參考資料

關鍵概念

  • Resolver

    • passthrough
    • dns
    • manual
  • Balancer

    • pickerfirst
    • roundrobin
    • grpclb
  • Picker

    • pickerfirst
    • roundrobin
    • grpclb

實現

目的

定制resolver實現:

  1. etcd服務發現/注冊(TBD)
  2. addr多連接支持(N個),替代連接池

思路

支持2種scheme:

  1. etcd:///endpoint#N, 其中N表示創建N個連接(默認1個)
  2. pass:///ip1:port1[#N1],ip2:port2[#N2]..., 其中N1,N2表示創建連接數量

對於1的前綴必然是etcd
對於2的前綴可選是extd, pass, addr, 暫定pass, 相對於passthrough而言

問題是如何解析target...

  1. scheme
  2. authority
  3. endpoint
    針對endpoint再做解析最后生成相應的結果Address

問題

  1. waitForResolvedAddrs阻塞

    解決: 參考passthrough的源碼並進行修改

  2. 測試server端的連接是否有2條? 並且client是否真正roundrobin?

    解決: 在server添加creds連接攔截器, 打印每個連接的handshake信息

源碼

  • server
package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"net"
	"os"
	...
)

func main() {

	grpcAddr := ":9080"
	if len(os.Args) > 1 {
		grpcAddr = os.Args[1]
	}

	// 1. 創建server
	//通過code設置
	svr := http.NewServerWith(&http.Config{
		//HttpAddr:        ":8080",   // 開啟http訪問
		GrpcAddr:        grpcAddr,  // 開啟grpc訪問
		WbskCheckOrigin: http.DOWN, // websocket不啟用origin檢測
	})

	svr.GrpcServerOption(grpc.Creds(new(TransportCredentialsTest)))
	//通過conf設置
	/*svr := http.NewServer()
	 */

	// 2. 注冊service. 綁定實現
	svr.RegisterService(api.TagServiceRegistry, new(biz.TagServiceService))

	// 3. 啟動server. 提供服務
	if err := svr.ListenAndServe(); err != nil {
		base.DefaultLogger.Errorf("server error: %+v", err)
	}
}

type TransportCredentialsTest struct {
}

func (tc *TransportCredentialsTest) ClientHandshake(ctx context.Context, name string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
	fmt.Println("ClientHandshake#########################")
	return nil, nil, nil
}
func (tc *TransportCredentialsTest) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
	fmt.Println("ServerHandshake#########################")
	fmt.Printf("Remote Addr %v, %v\n", conn.RemoteAddr().Network(), conn.RemoteAddr().String())
	ai := AuthInfoTest("test")
	return conn, &ai, nil
}
func (tc *TransportCredentialsTest) Info() credentials.ProtocolInfo {
	fmt.Println("Info#########################")
	return credentials.ProtocolInfo{}
}

func (tc *TransportCredentialsTest) Clone() credentials.TransportCredentials {
	return tc
}

func (tc *TransportCredentialsTest) OverrideServerName(string) error {
	fmt.Println("OverrideServerName#########################")
	return nil
}

type AuthInfoTest string

func (ai *AuthInfoTest) AuthType() string {
	return string(*ai)
}

  • client
package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"time"
	...
)

func main() {
	// 默認是pickerfirst
	cc, err := grpc.Dial("pass:///:9080#2,:9090#1", grpc.WithInsecure(),
		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{ "round_robin":{}}]}`))

	if err != nil {
		panic(err)
	}
	defer cc.Close()

	cl := api.NewTagServiceClient(cc)
	for i := 0; ; i++ {
		rsp, err := cl.All(context.Background(), &api.AllReq{
			Search: "all",
			From:   int32(i),
			Size:   10,
		})
		if err != nil {
			panic(err)
		}

		fmt.Printf("%v: %v\n", i, kits.ToJson(rsp.Data))
		time.Sleep(500 * time.Millisecond)
	}

}

  • resolver
package main

import (
	"google.golang.org/grpc/attributes"
	"google.golang.org/grpc/resolver"
	"strconv"
	"strings"
)

/*
支持2種scheme:
1. etcd:///endpoint#N
2. pass:///endpoint1#N1,endpoint2#N2....
*/

func init() {
	resolver.Register(new(passAnchorBuilder))
}

type AnchorAddress struct {
	Addr string // 服務地址
	Anch int    // 錨記數量
}

/*
格式: addr1#anch1,addr2#anch2...
*/
func ParseAnchorAddress(endpoint string) (rt []*AnchorAddress) {
	var (
		addr string
		anch int
	)
	for _, val := range strings.Split(endpoint, ",") {
		idx := strings.IndexByte(val, '#')
		if idx > 0 {
			addr = val[:idx]
			anch, _ = strconv.Atoi(val[idx+1:])
		} else {
			addr = val
		}
		if anch < 1 {
			anch = 1
		}
		rt = append(rt, &AnchorAddress{
			Addr: addr,
			Anch: anch,
		})
	}
	return
}

type passAnchorResolver struct {
	target resolver.Target
	cc     resolver.ClientConn
}

func (r *passAnchorResolver) ResolveNow(resolver.ResolveNowOptions) {

}

func (r *passAnchorResolver) Close() {

}

func (r *passAnchorResolver) start() {
	var state resolver.State
	for _, item := range ParseAnchorAddress(r.target.Endpoint) {
		for i := 0; i < item.Anch; i++ {
			state.Addresses = append(state.Addresses, resolver.Address{
				Addr:       item.Addr,
				Attributes: attributes.New("idx", i),
			})
		}
	}
	r.cc.UpdateState(state)

	/*下述代碼會在ClientConn.conns生成多個連接對象,但無法配合roundrobin做相關負載均衡*/
	//for _, item := range ParseAnchorAddress(r.target.Endpoint) {
	//	for i := 0; i < item.Anch; i++ {
	//		r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{
	//			{
	//				Addr:       item.Addr,
	//				Attributes: attributes.New("idx", i),
	//			},
	//		}})
	//	}
	//}
}

type passAnchorBuilder struct {
}

func (b *passAnchorBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {

	r := &passAnchorResolver{
		target: target,
		cc:     cc,
	}
	r.start()

	return r, nil
}

func (b *passAnchorBuilder) Scheme() string {
	return "pass"
}

總結

1. balancer默認是pickerfirst,不是roundrobin
2. resolver.start()邏輯不能放在ResolveNow(),具體參考passthrough
3. ClientConn.UpdateState()多次調用會在ClientConn.conns生成多個連接對象,但無法與roundrobin共用
4. ClientConn.UpdateState()的State的Address必須指定不同的attribute對象,否則會覆蓋去重!
5. client-server端效果達到預期,自動容錯,負載均衡(根據#比例)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM