一、前言
grpc中沒有像go-micro那樣集成可插拔式的etcd庫使用,如何使得grpc能夠使用服務注冊發現及命名解析的功能,因此本文基於etcd實現了Name Resolver。
二、所需的grpc版本及高版本grpc、protobuf與etcd兼容問題
grpc相關庫:
google.golang.org/grpc v1.26.0
google.golang.org/grpc/balancer/roundrobin
google.golang.org/grpc/resolver
etcd相關庫:
go.etcd.io/etcd/clientv3
github.com/coreos/etcd/mvcc/mvccpb
此處需要注意的是,新版本grpc不兼容etcd相關庫, 如果grpc版本大於1.26.0或者protobuf版本過高會出現以下問題:
1. grpc版本過高,新版本不支持etcd 需降級
# github.com/coreos/etcd/clientv3/balancer/picker ../../vendor/github.com/coreos/etcd/clientv3/balancer/picker/err.go:37:44: undefined: balancer.PickOptions ../../vendor/github.com/coreos/etcd/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions # github.com/coreos/etcd/clientv3/balancer/resolver/endpoint ../../vendor/github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption ../../vendor/github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption
解決辦法:在go.mod 加入:replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
2. grpc版本過低 protobuf版本過高
# ut-blogger/api/protos/user ../../api/protos/user/user.pb.go:327:11: undefined: grpc.SupportPackageIsVersion6 ../../api/protos/user/user.pb.go:338:5: undefined: grpc.ClientConnInterface
解決辦法:降級protoc-gen-go的版本 go get github.com/golang/protobuf/protoc-gen-go@v1.3.2 重新生成proto
三、自定義實現naming
package etcdservice import ( "context" "go.etcd.io/etcd/clientv3" "log" "strings" "time" ) const Schema = "grpcEtcd" // Register 注冊地址到ETCD組件中 使用 ; 分割 func Register(etcdAddr, name string, addr string, ttl int64) error { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { log.Printf("connect to etcd err:%s", err) return err } } ticker := time.NewTicker(time.Second * time.Duration(ttl)) go func() { for { getResp, err := cli.Get(context.Background(), "/"+Schema+"/"+name+"/"+addr) if err != nil { log.Printf("getResp:%+v\n", getResp) log.Printf("Register:%s", err) } else if getResp.Count == 0 { err = withAlive(name, addr, ttl) if err != nil { log.Printf("keep alive:%s", err) } } <-ticker.C } }() return nil } // withAlive 創建租約 func withAlive(name string, addr string, ttl int64) error { leaseResp, err := cli.Grant(context.Background(), ttl) if err != nil { return err } log.Printf("key:%v\n", "/"+Schema+"/"+name+"/"+addr) _, err = cli.Put(context.Background(), "/"+Schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID)) if err != nil { log.Printf("put etcd error:%s", err) return err } ch, err := cli.KeepAlive(context.Background(), leaseResp.ID) if err != nil { log.Printf("keep alive error:%s", err) return err } // 清空 keep alive 返回的channel go func() { for { <-ch } }() return nil } // UnRegister remove service from etcd func UnRegister(name string, addr string) { if cli != nil { cli.Delete(context.Background(), "/"+Schema+"/"+name+"/"+addr) } }
四、自定義實現resolver
package etcdservice import ( "context" "log" "strings" "time" "go.etcd.io/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "google.golang.org/grpc/resolver" ) var cli *clientv3.Client // etcdResolver 解析struct type etcdResolver struct { rawAddr string cc resolver.ClientConn } // NewResolver initialize an etcd client func NewResolver(etcdAddr string) resolver.Builder { return &etcdResolver{rawAddr: etcdAddr} } // Build 構建etcd client func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.rawAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { return nil, err } } r.cc = cc go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/") return r, nil } // Scheme etcd resolve scheme func (r etcdResolver) Scheme() string { return Schema } // ResolveNow func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { log.Println("ResolveNow") } // Close closes the resolver func (r etcdResolver) Close() { log.Println("Close") } // watch 監聽resolve列表變化 func (r *etcdResolver) watch(keyPrefix string) { var addrList []resolver.Address getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { log.Println(err) } else { for i := range getResp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)}) } } // 新版本etcd去除了NewAddress方法 以UpdateState代替 r.cc.UpdateState(resolver.State{Addresses: addrList}) rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exist(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) r.cc.UpdateState(resolver.State{Addresses: addrList}) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s r.cc.UpdateState(resolver.State{Addresses: addrList}) } } log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } } // exist 判斷resolve address是否存在 func exist(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false } // remove 從resolver列表移除 func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false }
五、服務端調用ETCD服務注冊
EtcdAddr、ServiceName和Ttl可以從配置文件讀取,讀取配置文件的方式很多,本文不在此闡述
//將服務地址注冊到etcd中 go etcdservice.Register(s.o.EtcdAddr, s.o.ServiceName, addr, s.o.Ttl) ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { sig := <-ch etcdservice.UnRegister(s.o.ServiceName, addr) if i, ok := sig.(syscall.Signal); ok { os.Exit(int(i)) } else { os.Exit(0) } }()
六、 客戶端grpc服務 ectd服務發現
客戶端調用grpc服務以serviceName的形式從etcd中獲取服務節點,此處采用roundrobin的形式作為負載均衡
//注冊etcd解析器 r := etcdservice.NewResolver(o.EtcdAddr) resolver.Register(r) // 客戶端連接服務器 conn, err := grpc2.Dial(r.Scheme()+"://"+o.Caller+"/"+o.Callee, grpc2.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), grpc2.WithInsecure()) if err != nil { log.Println("連接服務器失敗", err) return nil, errors.Wrap(err, "notify client dial error") }