gRPC接入etcd 實現服務注冊與發現


  本文講解gRPC接入etcd,實現服務注冊與服務發現。
需要先安裝Go語言的etcd客戶端包:

go get go.etcd.io/etcd/clientv3

然后就可以開始操作一波了。
說明:
以下代碼需要根據實際代碼位置對import語句內容進行微調。
我的目錄結構:
$GOPATH/src/go-git/etcd-demo:

 

 

一. 協議制定(proto/greet.proto)

syntax = "proto3";

option go_package = ".;greet";

service Greet {
    rpc Morning(GreetRequest)returns(GreetResponse){}
    rpc Night(GreetRequest)returns(GreetResponse){}
}

message GreetRequest {
    string name = 1;
}

message GreetResponse {
    string message = 1;
    string from = 2;
}

生成代碼:(proto子目錄下執行)

protoc --go_out=plugins=grpc:. *.proto

執行完成,proto子目錄生成文件greet.pb.go。

 

二. 服務端(server/main.go)
服務端主要有以下步驟:
監聽網絡端口
創建gRPC句柄,注冊gRPC服務
將服務地址注冊到etcd
監聽並處理服務請求

這里主要介紹一下將服務地址注冊到etcd的過程(雙保險):
一方面,由於服務端無法保證自身是一直可用的,所以與etcd的租約是有時間期限的,租約一旦過期,服務端存儲在etcd上的服務地址信息就會消失。
另一方面,服務端可用時又必須保證調用方能發現自己,即保證自己在etcd上的服務地址信息不消失,所以需要發送心跳檢測,一旦發現etcd上沒有自己的服務地址時,請求重新添加(續租)。

代碼邏輯:

/**
* etcd demo server
* author: JetWu
* date: 2020.05.01
 */
package main

import (
    "flag"
    "fmt"
    proto "go-git/etcd-demo/proto"
    "net"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    "go.etcd.io/etcd/clientv3"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

const schema = "ns"

var host = "127.0.0.1" //服務器主機
var (
    Port        = flag.Int("Port", 3000, "listening port")                           //服務器監聽端口
    ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服務名稱
    EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client

//rpc服務接口
type greetServer struct{}

func (gs *greetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    fmt.Printf("Morning 調用: %s\n", req.Name)
    return &proto.GreetResponse{
        Message: "Good morning, " + req.Name,
        From:    fmt.Sprintf("127.0.0.1:%d", *Port),
    }, nil
}

func (gs *greetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    fmt.Printf("Night 調用: %s\n", req.Name)
    return &proto.GreetResponse{
        Message: "Good night, " + req.Name,
        From:    fmt.Sprintf("127.0.0.1:%d", *Port),
    }, nil
}

//將服務地址注冊到etcd中
func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
    var err error

    if cli == nil {
        //構建etcd client
        cli, err = clientv3.New(clientv3.Config{
            Endpoints:   strings.Split(etcdAddr, ";"),
            DialTimeout: 15 * time.Second,
        })
        if err != nil {
            fmt.Printf("連接etcd失敗:%s\n", err)
            return err
        }
    }

    //與etcd建立長連接,並保證連接不斷(心跳檢測)
    ticker := time.NewTicker(time.Second * time.Duration(ttl))
    go func() {
        key := "/" + schema + "/" + serviceName + "/" + serverAddr
        for {
            resp, err := cli.Get(context.Background(), key)
            //fmt.Printf("resp:%+v\n", resp)
            if err != nil {
                fmt.Printf("獲取服務地址失敗:%s", err)
            } else if resp.Count == 0 { //尚未注冊
                err = keepAlive(serviceName, serverAddr, ttl)
                if err != nil {
                    fmt.Printf("保持連接失敗:%s", err)
                }
            }
            <-ticker.C
        }
    }()

    return nil
}

//保持服務器與etcd的長連接
func keepAlive(serviceName, serverAddr string, ttl int64) error {
    //創建租約
    leaseResp, err := cli.Grant(context.Background(), ttl)
    if err != nil {
        fmt.Printf("創建租期失敗:%s\n", err)
        return err
    }

    //將服務地址注冊到etcd中
    key := "/" + schema + "/" + serviceName + "/" + serverAddr
    _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
    if err != nil {
        fmt.Printf("注冊服務失敗:%s", err)
        return err
    }

    //建立長連接
    ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
    if err != nil {
        fmt.Printf("建立長連接失敗:%s\n", err)
        return err
    }

    //清空keepAlive返回的channel
    go func() {
        for {
            <-ch
        }
    }()
    return nil
}

//取消注冊
func unRegister(serviceName, serverAddr string) {
    if cli != nil {
        key := "/" + schema + "/" + serviceName + "/" + serverAddr
        cli.Delete(context.Background(), key)
    }
}

func main() {
    flag.Parse()

    //監聽網絡
    listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
    if err != nil {
        fmt.Println("監聽網絡失敗:", err)
        return
    }
    defer listener.Close()

    //創建grpc句柄
    srv := grpc.NewServer()
    defer srv.GracefulStop()

    //將greetServer結構體注冊到grpc服務中
    proto.RegisterGreetServer(srv, &greetServer{})

    //將服務地址注冊到etcd中
    serverAddr := fmt.Sprintf("%s:%d", host, *Port)
    fmt.Printf("greeting server address: %s\n", serverAddr)
    register(*EtcdAddr, *ServiceName, serverAddr, 5)

    //關閉信號處理
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    go func() {
        s := <-ch
        unRegister(*ServiceName, serverAddr)
        if i, ok := s.(syscall.Signal); ok {
            os.Exit(int(i))
        } else {
            os.Exit(0)
        }
    }()

    //監聽服務
    err = srv.Serve(listener)
    if err != nil {
        fmt.Println("監聽異常:", err)
        return
    }
}

 

 

三. 客戶端(client/main.go)
客戶端首先需要實現接口resolver.Resolver,其中方法Build()用於創建一個etcd解析器,grpc.Dial()會同步調用該方法,解析器需要根據key前綴監聽etcd中服務地址列表的變化並更新本地列表。然后注冊解析器,創建gRPC句柄,使用輪詢負載均衡請求服務。

代碼邏輯:

/**
* etcd demo client
* author: JetWu
* date: 2020.05.02
 */
 package main

import (
    "flag"
    "fmt"
    proto "go-git/etcd-demo/proto"
    "log"
    "strings"
    "time"

    "github.com/coreos/etcd/mvcc/mvccpb"
    "go.etcd.io/etcd/clientv3"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
)

const schema = "ns"

var (
    ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服務名稱
    EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)

var cli *clientv3.Client

//etcd解析器
type etcdResolver struct {
    etcdAddr   string
    clientConn resolver.ClientConn
}

//初始化一個etcd解析器
func newResolver(etcdAddr string) resolver.Builder {
    return &etcdResolver{etcdAddr: etcdAddr}
}

func (r *etcdResolver) Scheme() string {
    return schema
}

//watch有變化以后會調用
func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
    log.Println("ResolveNow")
    fmt.Println(rn)
}

//解析器關閉時調用
func (r *etcdResolver) Close() {
    log.Println("Close")
}

//構建解析器 grpc.Dial()同步調用
func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    var err error

    //構建etcd client
    if cli == nil {
        cli, err = clientv3.New(clientv3.Config{
            Endpoints:   strings.Split(r.etcdAddr, ";"),
            DialTimeout: 15 * time.Second,
        })
        if err != nil {
            fmt.Printf("連接etcd失敗:%s\n", err)
            return nil, err
        }
    }

    r.clientConn = clientConn

    go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")

    return r, nil
}

//監聽etcd中某個key前綴的服務地址列表的變化
func (r *etcdResolver) watch(keyPrefix string) {
    //初始化服務地址列表
    var addrList []resolver.Address

    resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
    if err != nil {
        fmt.Println("獲取服務地址列表失敗:", err)
    } else {
        for i := range resp.Kvs {
            addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
        }
    }

    r.clientConn.NewAddress(addrList)

    //監聽服務地址列表的變化
    rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
    for n := range rch {
        for _, ev := range n.Events {
            addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
            switch ev.Type {
            case mvccpb.PUT:
                if !exists(addrList, addr) {
                    addrList = append(addrList, resolver.Address{Addr: addr})
                    r.clientConn.NewAddress(addrList)
                }
            case mvccpb.DELETE:
                if s, ok := remove(addrList, addr); ok {
                    addrList = s
                    r.clientConn.NewAddress(addrList)
                }
            }
        }
    }
}

func exists(l []resolver.Address, addr string) bool {
    for i := range l {
        if l[i].Addr == addr {
            return true
        }
    }
    return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    for i := range s {
        if s[i].Addr == addr {
            s[i] = s[len(s)-1]
            return s[:len(s)-1], true
        }
    }
    return nil, false
}

func main() {
    flag.Parse()

    //注冊etcd解析器
    r := newResolver(*EtcdAddr)
    resolver.Register(r)

    //客戶端連接服務器(負載均衡:輪詢) 會同步調用r.Build()
    conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
    if err != nil {
        fmt.Println("連接服務器失敗:", err)
    }
    defer conn.Close()

    //獲得grpc句柄
    c := proto.NewGreetClient(conn)
    ticker := time.NewTicker(1 * time.Second)
    for range ticker.C {
        fmt.Println("Morning 調用...")
        resp1, err := c.Morning(
            context.Background(),
            &proto.GreetRequest{Name: "JetWu"},
        )
        if err != nil {
            fmt.Println("Morning調用失敗:", err)
            return
        }
        fmt.Printf("Morning 響應:%s,來自:%s\n", resp1.Message, resp1.From)

        fmt.Println("Night 調用...")
        resp2, err := c.Night(
            context.Background(),
            &proto.GreetRequest{Name: "JetWu"},
        )
        if err != nil {
            fmt.Println("Night調用失敗:", err)
            return
        }
        fmt.Printf("Night 響應:%s,來自:%s\n", resp2.Message, resp2.From)
    }
}

  

 

四. 運行驗證

啟動etcd,使用3個不同端口運行三個服務端:

啟動客戶端:

可以看到,客戶端使用輪詢的方式對三個服務端進行請求,從而實現負載均衡。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM