go-zero 服務注冊和發現
在沒有服務注冊和發現的時候, 沒新上一個服務, 或者沒部署一個新的節點, 都要改所有調用方的配置文件, 簡直就是配置噩夢, 還容易配置錯誤
分析一個go-zero的服務注冊和發現,
接着上面的代碼, go-zero實戰, 看看rpc客戶端怎么尋址到rpc服務端的
//logic調用的代碼
regRsp, err := l.svcCtx.UserServiceRpc.Register(l.ctx, in)
//rpc/userserviceclient/userservice.go
func NewUserService(cli zrpc.Client) UserService {
return &defaultUserService{
cli: cli,
}
}
// 注冊
func (m *defaultUserService) Register(ctx context.Context, in *RegisterRequest) (*RegisterResponse, error) {
//發起調用, 使用的是上面NewUserService里的zrpc.Client
client := userService.NewUserServiceClient(m.cli.Conn())
return client.Register(ctx, in)
}
//api/internal/svc/servicecontext.go 中調用的NewUserService
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
Model: model.NewUserinfoModel(sqlx.NewMysql(c.DataSource), c.Cache),
UserServiceRpc: userserviceclient.NewUserService(zrpc.MustNewClient(c.Rpc)), //初始化rpcClient
}
}
//先看一下zrpc.MustNewClient 這個方法, 傳入的配置文件中的etcd 的hosts和服務key, 跟進去看下這個方法
//github.com/tal-tech/go-zero/zrpc/client.go
//這個方法沒啥, 繼續往下面走
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
cli, err := NewClient(c, options...)
if err != nil {
log.Fatal(err)
}
return cli
}
//方法中主要方法是 internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
var opts []ClientOption
if c.HasCredential() {
opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App: c.App,
Token: c.Token,
})))
}
if c.Timeout > 0 {
opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
}
opts = append(opts, options...)
var client Client
var err error
if len(c.Endpoints) > 0 {
client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
} else if err = c.Etcd.Validate(); err == nil {
client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
}
if err != nil {
return nil, err
}
return &RpcClient{
client: client,
}, nil
}
//先看一下internal.BuildDiscovTarget, 這個方法入參是etcd的hosts和 服務的key, 返回的是一個類似url的東西, 協議是DiscovScheme = "discov"
func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)
}
//BuildDiscovTarget返回的url類似 : discov://127.0.0.1:2379/user-service, 傳入NewClient中
//這個函數有兩個核心邏輯 一個是grpc.WithBalancerName(p2c.Name)
// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
}
//WithBalancerName 這方法看名字知道是負載均衡的作用, 通過balancerName獲取的, 對應入參的p2c.Name
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = builder
})
}
//跟着p2c.Name進去, github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/balancer/p2c/p2c.go
//在這里注入的負載均衡, 核心的邏輯在Pick中, 大致是一個可選就選一個, 兩個就選擇連接數最小的, 兩個以上就隨機兩個出來進行選擇
//如何選擇的邏輯在choose(c1, c2 *subConn)方法中, 基本上就兩個選連接數小的那個,
func init() {
balancer.Register(newBuilder())
}
type p2cPickerBuilder struct{}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}
//....
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
p.lock.Lock()
defer p.lock.Unlock()
var chosen *subConn
switch len(p.conns) {
case 0:
return nil, nil, balancer.ErrNoSubConnAvailable
case 1:
chosen = p.choose(p.conns[0], nil)
case 2:
chosen = p.choose(p.conns[0], p.conns[1])
default:
var node1, node2 *subConn
for i := 0; i < pickTimes; i++ {
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
if b >= a {
b++
}
node1 = p.conns[a]
node2 = p.conns[b]
if node1.healthy() && node2.healthy() {
break
}
}
chosen = p.choose(node1, node2)
}
atomic.AddInt64(&chosen.inflight, 1)
atomic.AddInt64(&chosen.requests, 1)
return chosen.conn, p.buildDoneFunc(chosen), nil
}
//繼續返回去看NewClient方法中的dial方法, 傳入的是target, 也就是那個url, discov://127.0.0.1:2379/user-service
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/client.go
//這個方法本身沒什么, 主要就是調用grpc.DialContext()方法, 這里就進入了grpc的邏輯了, 相當於通過grpc dial 了discov://127.0.0.1:2379/user-service, 繼續進去看
func (c *client) dial(server string, opts ...ClientOption) error {
options := c.buildDialOptions(opts...)
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
conn, err := grpc.DialContext(timeCtx, server, options...)
if err != nil {
service := server
if errors.Is(err, context.DeadlineExceeded) {
pos := strings.LastIndexByte(server, separator)
// len(server) - 1 is the index of last char
if 0 < pos && pos < len(server)-1 {
service = server[pos+1:]
}
}
return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
server, err.Error(), service)
}
c.conn = conn
return nil
}
//google.golang.org/grpc@v1.29.1/clientconn.go, 這個代碼邏輯比較多, 我們找到我們關心的部分(服務發現), 就是如何解析discov://127.0.0.1:2379/user-service 成為一個ip:port, 通過分析發現cc.parsedTarget.Scheme, 也就是一開始拼接的discov字符串, 跟這getResolver方法進去
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
//.....
//發現是通過Scheme去獲取的, 也就是discov,
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
//....
return cc, nil
}
//google.golang.org/grpc@v1.29.1/resolver/resolver.go
//發現get是用map中讀的, map數據是Register方法注入的, 返回到DiscovScheme = "discov"定義的地方, 看看有沒有調用Register方法
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme = "passthrough"
)
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/resolver.go
//注入的是discovBuilder, 繼續看下discovBuilder的具體實現
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}
//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/discovbuilder.go
//具體邏輯看函數中的知識
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
//new一個服務發現的客戶端, 里面基本上就是個etcd的封裝, etcd的邏輯在NewSubscriber里面, 比較簡單, 就寫出來了
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
//拿到服務key的所有etcd中的數據
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
//實時監聽etcd數據變化, 然后通過update方法更新數據到grpc的client
sub.AddListener(update)
//初始化的時候調用一次
update()
return &nopResolver{cc: cc}, nil
}
func (d *discovBuilder) Scheme() string {
return DiscovScheme
}
//到這里客戶端的服務發現就結束了
//服務注冊的代碼在rpc/userservice.go中, MustNewServer調用NewServer
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
server, err := NewServer(c, register)
if err != nil {
log.Fatal(err)
}
return server
}
//NewServer調用的是server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, c.ListenOn, internal.WithMetrics(metrics))
//這個方法進去就是調用discov.NewPublishe
pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)
//繼續跟進去, 發現在KeepAlive()中會調用register方法, 用etcd的put方法注冊到etcd中(client.Put)
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
resp, err := client.Grant(client.Ctx(), TimeToLive)
if err != nil {
return clientv3.NoLease, err
}
lease := resp.ID
if p.id > 0 {
p.fullKey = makeEtcdKey(p.key, p.id)
} else {
p.fullKey = makeEtcdKey(p.key, int64(lease))
}
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
return lease, err
}
總結
- go-zero的注冊發現代碼比較容易懂, 比較簡單, 可以作為初步閱讀源碼的練手項目
- 業務上基本上是夠用的
