go-zero服務注冊和發現


go-zero 服務注冊和發現

在沒有服務注冊和發現的時候, 沒新上一個服務, 或者沒部署一個新的節點, 都要改所有調用方的配置文件, 簡直就是配置噩夢, 還容易配置錯誤
分析一個go-zero的服務注冊和發現,

接着上面的代碼, go-zero實戰, 看看rpc客戶端怎么尋址到rpc服務端的

//logic調用的代碼
regRsp, err := l.svcCtx.UserServiceRpc.Register(l.ctx, in)

//rpc/userserviceclient/userservice.go


func NewUserService(cli zrpc.Client) UserService {
   return &defaultUserService{
   	cli: cli,
   }
}

// 注冊
func (m *defaultUserService) Register(ctx context.Context, in *RegisterRequest) (*RegisterResponse, error) {
   //發起調用, 使用的是上面NewUserService里的zrpc.Client
   client := userService.NewUserServiceClient(m.cli.Conn())
   return client.Register(ctx, in)
}

//api/internal/svc/servicecontext.go 中調用的NewUserService
func NewServiceContext(c config.Config) *ServiceContext {
   return &ServiceContext{
   	Config:         c,
   	Model:          model.NewUserinfoModel(sqlx.NewMysql(c.DataSource), c.Cache),
   	UserServiceRpc: userserviceclient.NewUserService(zrpc.MustNewClient(c.Rpc)), //初始化rpcClient
   }
}

//先看一下zrpc.MustNewClient 這個方法, 傳入的配置文件中的etcd 的hosts和服務key, 跟進去看下這個方法
//github.com/tal-tech/go-zero/zrpc/client.go
//這個方法沒啥, 繼續往下面走
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
   cli, err := NewClient(c, options...)
   if err != nil {
   	log.Fatal(err)
   }

   return cli
}


//方法中主要方法是 internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
   var opts []ClientOption
   if c.HasCredential() {
   	opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
   		App:   c.App,
   		Token: c.Token,
   	})))
   }
   if c.Timeout > 0 {
   	opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
   }
   opts = append(opts, options...)

   var client Client
   var err error
   if len(c.Endpoints) > 0 {
   	client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
   } else if err = c.Etcd.Validate(); err == nil {
   	client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
   }
   if err != nil {
   	return nil, err
   }

   return &RpcClient{
   	client: client,
   }, nil
}

//先看一下internal.BuildDiscovTarget, 這個方法入參是etcd的hosts和 服務的key, 返回的是一個類似url的東西, 協議是DiscovScheme = "discov"
func BuildDiscovTarget(endpoints []string, key string) string {
   return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
   	strings.Join(endpoints, resolver.EndpointSep), key)
}


//BuildDiscovTarget返回的url類似 : discov://127.0.0.1:2379/user-service, 傳入NewClient中
//這個函數有兩個核心邏輯 一個是grpc.WithBalancerName(p2c.Name)

// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
   var cli client
   opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
   if err := cli.dial(target, opts...); err != nil {
   	return nil, err
   }

   return &cli, nil
}

//WithBalancerName 這方法看名字知道是負載均衡的作用, 通過balancerName獲取的, 對應入參的p2c.Name
func WithBalancerName(balancerName string) DialOption {
   builder := balancer.Get(balancerName)
   if builder == nil {
   	panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
   }
   return newFuncDialOption(func(o *dialOptions) {
   	o.balancerBuilder = builder
   })
}

//跟着p2c.Name進去, github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/balancer/p2c/p2c.go
//在這里注入的負載均衡, 核心的邏輯在Pick中, 大致是一個可選就選一個, 兩個就選擇連接數最小的, 兩個以上就隨機兩個出來進行選擇
//如何選擇的邏輯在choose(c1, c2 *subConn)方法中, 基本上就兩個選連接數小的那個, 
func init() {
   balancer.Register(newBuilder())
}

type p2cPickerBuilder struct{}

func newBuilder() balancer.Builder {
   return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}

//....
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
   conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
   p.lock.Lock()
   defer p.lock.Unlock()

   var chosen *subConn
   switch len(p.conns) {
   case 0:
   	return nil, nil, balancer.ErrNoSubConnAvailable
   case 1:
   	chosen = p.choose(p.conns[0], nil)
   case 2:
   	chosen = p.choose(p.conns[0], p.conns[1])
   default:
   	var node1, node2 *subConn
   	for i := 0; i < pickTimes; i++ {
   		a := p.r.Intn(len(p.conns))
   		b := p.r.Intn(len(p.conns) - 1)
   		if b >= a {
   			b++
   		}
   		node1 = p.conns[a]
   		node2 = p.conns[b]
   		if node1.healthy() && node2.healthy() {
   			break
   		}
   	}

   	chosen = p.choose(node1, node2)
   }

   atomic.AddInt64(&chosen.inflight, 1)
   atomic.AddInt64(&chosen.requests, 1)
   return chosen.conn, p.buildDoneFunc(chosen), nil
}

//繼續返回去看NewClient方法中的dial方法, 傳入的是target, 也就是那個url, discov://127.0.0.1:2379/user-service
   if err := cli.dial(target, opts...); err != nil {
   	return nil, err
   }


//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/client.go
//這個方法本身沒什么, 主要就是調用grpc.DialContext()方法, 這里就進入了grpc的邏輯了, 相當於通過grpc dial 了discov://127.0.0.1:2379/user-service, 繼續進去看
func (c *client) dial(server string, opts ...ClientOption) error {
   options := c.buildDialOptions(opts...)
   timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
   defer cancel()
   conn, err := grpc.DialContext(timeCtx, server, options...)
   if err != nil {
   	service := server
   	if errors.Is(err, context.DeadlineExceeded) {
   		pos := strings.LastIndexByte(server, separator)
   		// len(server) - 1 is the index of last char
   		if 0 < pos && pos < len(server)-1 {
   			service = server[pos+1:]
   		}
   	}
   	return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
   		server, err.Error(), service)
   }

   c.conn = conn
   return nil
}

//google.golang.org/grpc@v1.29.1/clientconn.go, 這個代碼邏輯比較多, 我們找到我們關心的部分(服務發現), 就是如何解析discov://127.0.0.1:2379/user-service 成為一個ip:port, 通過分析發現cc.parsedTarget.Scheme, 也就是一開始拼接的discov字符串, 跟這getResolver方法進去
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
   cc := &ClientConn{
   	target:            target,
   	csMgr:             &connectivityStateManager{},
   	conns:             make(map[*addrConn]struct{}),
   	dopts:             defaultDialOptions(),
   	blockingpicker:    newPickerWrapper(),
   	czData:            new(channelzData),
   	firstResolveEvent: grpcsync.NewEvent(),
   }
   //.....
   //發現是通過Scheme去獲取的, 也就是discov, 
   resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
   
   //....
   return cc, nil
}

//google.golang.org/grpc@v1.29.1/resolver/resolver.go
//發現get是用map中讀的, map數據是Register方法注入的, 返回到DiscovScheme = "discov"定義的地方, 看看有沒有調用Register方法
var (
   // m is a map from scheme to resolver builder.
   m = make(map[string]Builder)
   // defaultScheme is the default scheme to use.
   defaultScheme = "passthrough"
)
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
   m[b.Scheme()] = b
}

// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
   if b, ok := m[scheme]; ok {
   	return b
   }
   return nil
}


//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/resolver.go
//注入的是discovBuilder, 繼續看下discovBuilder的具體實現
func RegisterResolver() {
   resolver.Register(&dirBuilder)
   resolver.Register(&disBuilder)
}

//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/discovbuilder.go
//具體邏輯看函數中的知識

func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
   resolver.Resolver, error) {
   hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
   	return r == EndpointSepChar
   })
   //new一個服務發現的客戶端, 里面基本上就是個etcd的封裝, etcd的邏輯在NewSubscriber里面, 比較簡單, 就寫出來了
   sub, err := discov.NewSubscriber(hosts, target.Endpoint)
   if err != nil {
   	return nil, err
   }
   //拿到服務key的所有etcd中的數據
   update := func() {
   	var addrs []resolver.Address
   	for _, val := range subset(sub.Values(), subsetSize) {
   		addrs = append(addrs, resolver.Address{
   			Addr: val,
   		})
   	}
   	cc.UpdateState(resolver.State{
   		Addresses: addrs,
   	})
   }
   //實時監聽etcd數據變化, 然后通過update方法更新數據到grpc的client
   sub.AddListener(update)
   //初始化的時候調用一次
   update()

   return &nopResolver{cc: cc}, nil
}

func (d *discovBuilder) Scheme() string {
   return DiscovScheme
}
//到這里客戶端的服務發現就結束了

//服務注冊的代碼在rpc/userservice.go中, MustNewServer調用NewServer
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
   server, err := NewServer(c, register)
   if err != nil {
   	log.Fatal(err)
   }

   return server
}

//NewServer調用的是server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, c.ListenOn, internal.WithMetrics(metrics))
//這個方法進去就是調用discov.NewPublishe
   	pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)
//繼續跟進去, 發現在KeepAlive()中會調用register方法, 用etcd的put方法注冊到etcd中(client.Put)
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
   resp, err := client.Grant(client.Ctx(), TimeToLive)
   if err != nil {
   	return clientv3.NoLease, err
   }

   lease := resp.ID
   if p.id > 0 {
   	p.fullKey = makeEtcdKey(p.key, p.id)
   } else {
   	p.fullKey = makeEtcdKey(p.key, int64(lease))
   }
   _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

   return lease, err
}

總結

  1. go-zero的注冊發現代碼比較容易懂, 比較簡單, 可以作為初步閱讀源碼的練手項目
  2. 業務上基本上是夠用的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM