本文講解gRPC接入etcd,實現服務注冊與服務發現。
需要先安裝Go語言的etcd客戶端包:
go get go.etcd.io/etcd/clientv3
然后就可以開始操作一波了。
說明:
以下代碼需要根據實際代碼位置對import語句內容進行微調。
我的目錄結構:
$GOPATH/src/go-git/etcd-demo:
一. 協議制定(proto/greet.proto)
syntax = "proto3";
option go_package = ".;greet";
service Greet {
rpc Morning(GreetRequest)returns(GreetResponse){}
rpc Night(GreetRequest)returns(GreetResponse){}
}
message GreetRequest {
string name = 1;
}
message GreetResponse {
string message = 1;
string from = 2;
}
生成代碼:(proto子目錄下執行)
protoc --go_out=plugins=grpc:. *.proto
執行完成,proto子目錄生成文件greet.pb.go。
二. 服務端(server/main.go)
服務端主要有以下步驟:
監聽網絡端口
創建gRPC句柄,注冊gRPC服務
將服務地址注冊到etcd
監聽並處理服務請求
這里主要介紹一下將服務地址注冊到etcd的過程(雙保險):
一方面,由於服務端無法保證自身是一直可用的,所以與etcd的租約是有時間期限的,租約一旦過期,服務端存儲在etcd上的服務地址信息就會消失。
另一方面,服務端可用時又必須保證調用方能發現自己,即保證自己在etcd上的服務地址信息不消失,所以需要發送心跳檢測,一旦發現etcd上沒有自己的服務地址時,請求重新添加(續租)。
代碼邏輯:
/**
* etcd demo server
* author: JetWu
* date: 2020.05.01
*/
package main
import (
"flag"
"fmt"
proto "go-git/etcd-demo/proto"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"go.etcd.io/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
const schema = "ns"
var host = "127.0.0.1" //服務器主機
var (
Port = flag.Int("Port", 3000, "listening port") //服務器監聽端口
ServiceName = flag.String("ServiceName", "greet_service", "service name") //服務名稱
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client
//rpc服務接口
type greetServer struct{}
func (gs *greetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Morning 調用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Good morning, " + req.Name,
From: fmt.Sprintf("127.0.0.1:%d", *Port),
}, nil
}
func (gs *greetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Night 調用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Good night, " + req.Name,
From: fmt.Sprintf("127.0.0.1:%d", *Port),
}, nil
}
//將服務地址注冊到etcd中
func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
var err error
if cli == nil {
//構建etcd client
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
fmt.Printf("連接etcd失敗:%s\n", err)
return err
}
}
//與etcd建立長連接,並保證連接不斷(心跳檢測)
ticker := time.NewTicker(time.Second * time.Duration(ttl))
go func() {
key := "/" + schema + "/" + serviceName + "/" + serverAddr
for {
resp, err := cli.Get(context.Background(), key)
//fmt.Printf("resp:%+v\n", resp)
if err != nil {
fmt.Printf("獲取服務地址失敗:%s", err)
} else if resp.Count == 0 { //尚未注冊
err = keepAlive(serviceName, serverAddr, ttl)
if err != nil {
fmt.Printf("保持連接失敗:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
//保持服務器與etcd的長連接
func keepAlive(serviceName, serverAddr string, ttl int64) error {
//創建租約
leaseResp, err := cli.Grant(context.Background(), ttl)
if err != nil {
fmt.Printf("創建租期失敗:%s\n", err)
return err
}
//將服務地址注冊到etcd中
key := "/" + schema + "/" + serviceName + "/" + serverAddr
_, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Printf("注冊服務失敗:%s", err)
return err
}
//建立長連接
ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
fmt.Printf("建立長連接失敗:%s\n", err)
return err
}
//清空keepAlive返回的channel
go func() {
for {
<-ch
}
}()
return nil
}
//取消注冊
func unRegister(serviceName, serverAddr string) {
if cli != nil {
key := "/" + schema + "/" + serviceName + "/" + serverAddr
cli.Delete(context.Background(), key)
}
}
func main() {
flag.Parse()
//監聽網絡
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
if err != nil {
fmt.Println("監聽網絡失敗:", err)
return
}
defer listener.Close()
//創建grpc句柄
srv := grpc.NewServer()
defer srv.GracefulStop()
//將greetServer結構體注冊到grpc服務中
proto.RegisterGreetServer(srv, &greetServer{})
//將服務地址注冊到etcd中
serverAddr := fmt.Sprintf("%s:%d", host, *Port)
fmt.Printf("greeting server address: %s\n", serverAddr)
register(*EtcdAddr, *ServiceName, serverAddr, 5)
//關閉信號處理
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
unRegister(*ServiceName, serverAddr)
if i, ok := s.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}()
//監聽服務
err = srv.Serve(listener)
if err != nil {
fmt.Println("監聽異常:", err)
return
}
}
三. 客戶端(client/main.go)
客戶端首先需要實現接口resolver.Resolver,其中方法Build()用於創建一個etcd解析器,grpc.Dial()會同步調用該方法,解析器需要根據key前綴監聽etcd中服務地址列表的變化並更新本地列表。然后注冊解析器,創建gRPC句柄,使用輪詢負載均衡請求服務。
代碼邏輯:
/**
* etcd demo client
* author: JetWu
* date: 2020.05.02
*/
package main
import (
"flag"
"fmt"
proto "go-git/etcd-demo/proto"
"log"
"strings"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)
const schema = "ns"
var (
ServiceName = flag.String("ServiceName", "greet_service", "service name") //服務名稱
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client
//etcd解析器
type etcdResolver struct {
etcdAddr string
clientConn resolver.ClientConn
}
//初始化一個etcd解析器
func newResolver(etcdAddr string) resolver.Builder {
return &etcdResolver{etcdAddr: etcdAddr}
}
func (r *etcdResolver) Scheme() string {
return schema
}
//watch有變化以后會調用
func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
log.Println("ResolveNow")
fmt.Println(rn)
}
//解析器關閉時調用
func (r *etcdResolver) Close() {
log.Println("Close")
}
//構建解析器 grpc.Dial()同步調用
func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var err error
//構建etcd client
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.etcdAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
fmt.Printf("連接etcd失敗:%s\n", err)
return nil, err
}
}
r.clientConn = clientConn
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
//監聽etcd中某個key前綴的服務地址列表的變化
func (r *etcdResolver) watch(keyPrefix string) {
//初始化服務地址列表
var addrList []resolver.Address
resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Println("獲取服務地址列表失敗:", err)
} else {
for i := range resp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
}
}
r.clientConn.NewAddress(addrList)
//監聽服務地址列表的變化
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exists(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.clientConn.NewAddress(addrList)
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.clientConn.NewAddress(addrList)
}
}
}
}
}
func exists(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
func main() {
flag.Parse()
//注冊etcd解析器
r := newResolver(*EtcdAddr)
resolver.Register(r)
//客戶端連接服務器(負載均衡:輪詢) 會同步調用r.Build()
conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
if err != nil {
fmt.Println("連接服務器失敗:", err)
}
defer conn.Close()
//獲得grpc句柄
c := proto.NewGreetClient(conn)
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
fmt.Println("Morning 調用...")
resp1, err := c.Morning(
context.Background(),
&proto.GreetRequest{Name: "JetWu"},
)
if err != nil {
fmt.Println("Morning調用失敗:", err)
return
}
fmt.Printf("Morning 響應:%s,來自:%s\n", resp1.Message, resp1.From)
fmt.Println("Night 調用...")
resp2, err := c.Night(
context.Background(),
&proto.GreetRequest{Name: "JetWu"},
)
if err != nil {
fmt.Println("Night調用失敗:", err)
return
}
fmt.Printf("Night 響應:%s,來自:%s\n", resp2.Message, resp2.From)
}
}
四. 運行驗證
啟動etcd,使用3個不同端口運行三個服務端:
啟動客戶端:
可以看到,客戶端使用輪詢的方式對三個服務端進行請求,從而實現負載均衡。