Consul 入門-gRPC 服務注冊與發現


前言

假如我有錢,我想買一個降噪耳機,我應該哪里買? 答案很簡單,可以去京東或者線下實體店。
那如果把這個問題映射到微服務架構中:我打開京東,選中某款耳機進入詳情頁瀏覽,我可以看到這款耳機的價格、庫存、規格、評價等。以我的理解,這個鏈路應該是這樣的:

暫定這個系統由3個微服務組成:商品詳情服務、庫存服務、評價服務。

  • 商品詳情服務:聚合端上用戶看到的所有信息
  • 庫存服務:維護商品的庫存信息、規格信息、價格信息
  • 評價服務:維護用戶對商品的評價

微服務的目的是為了基於松耦合高內聚將單體服務進行拆分,然后將個服務進行多副本部署(我們甚至不知道它會被部署到哪里,實體機?虛擬機?容器?雲上?)以達到高可用的目的。這也要付出點代價,商品詳情服務需要知道:庫存服務和評價服務在哪里?

由此,我們將繼續學習 Consul 這款不錯的服務發現工具,通前面的學習,我們已經對 Consul 的原理、使用、搭建有了認知。本次將學習:Consul 如何在 gRPC 構建的微服務網絡環境中做一名合格的“指路人”。

編寫一個 Go gRPC 服務

gRPC 是由 Google 開發並開源的RPC框架,詳見官網。我們將通過官網的指導來編寫一個簡單的 go gRPC 服務

獲取樣例代碼

  1. 克隆grpc-go倉庫
$ git clone -b v1.29.1.0 https://github.com/grpc/grpc-go
  1. 切換到樣例代碼目錄
$ cd cd grpc-go/examples/helloworld

目錄結構如下:

├── greeter_client
│   └── main.go
├── helloword
│   └── helloword.proto
└── greeter_server
|   └── main.go

運行樣例代碼

  1. 編譯執行 server 代碼:
$ go run greeter_server/main.go
  1. 在新開一個終端,編譯執行 client 代碼,可以看到輸出:
$ go run greeter_client/main.go
2021/09/11 16:28:29 Greeting: Hello world

gRPC 的 Banlancer

greeter_client/main.go中,是通過指定 server 地址的方式來實現訪問到目標服務的

...
const (
    address = "localhost:50051"
)

func main () {
    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    ...
}

但這種方式在生產環境是不可行的,因為我們並不知道目標服務的地址(目標服務的地址也有可能不只一個)。實際上,gRPC 已經為我們提供來解決方案:Balancer。

首先,看一下 gRPC 客戶端負載均衡實現的官方架構圖:

從圖中,可以看到 Balancer 均衡器位於架構的最右方,內置一個 Picker 模塊,Balancer 主要完成下面幾個功能:

  • 與 Rersovler 通信(維持通信機制),接收 Resovler 通知的服務端列表更新,維護 Connection Pool 及每個連接的狀態
  • 對上一步獲取的服務端列表,調用newSubConn異步建立長連接(每個 Backend 一個長連接),同時,監控連接的狀態,及時更新 Connection Pool
  • 創建 Picker,Picker 執行的算法就是真正的 LB 邏輯,當客戶端使用conn初始化 PRC 方法時,通過 Picker 選擇一個存活的連接,返回給客戶端,然后調用 UpdatePicker 更新 LB 算法的內置狀態,為下一次調用做准備
  • Balaner 是 gRPC 負載均衡最核心的模塊

據此,我們可用通過自定義的 Balancer,在 Balaner 基礎上通過實現自定義的naming.Resolver來達到使用 Consul 看發現服務的功能。

大概流程是:

  1. grpc 在 Dial 的時候通過 WithBalancer 傳入 Balancer
  2. Balaner 會通過 naming.Resolver 去解析 (Resovle)Dial 傳入的 target 得到一個nameing.Watcher
  3. naming.Watcher持續監視 target 解析到地址列表的變更並通過 Next 返回給 Balancer

實現 Consul Resolver

grpc-go/naming/naming.go中可以看到Resolver接口的聲明

type Resolver interface {
	// Resolve creates a Watcher for target.
	Resolve(target string) (Watcher, error)
}

需要實現一個Consul Resolver,在里面返回可用的服務端地址列表,在examples目錄下新建grpcresolver文件夾,在該文件夾下新建consul.go文件:

package grpcresolver

import (
	"fmt"
	"net"
	"strconv"
	"sync"
	"sync/atomic"

	"github.com/hashicorp/consul/api"
	"google.golang.org/grpc/naming"
)

type watchEntry struct {
	addr string
	modi uint64
	last uint64
}

type consulWatcher struct {
	down      int32
	c         *api.Client
	service   string
	mu        sync.Mutex
	watched   map[string]*watchEntry
	lastIndex uint64
}

func (w *consulWatcher) Close() {
	atomic.StoreInt32(&w.down, 1)
}

func (w *consulWatcher) Next() ([]*naming.Update, error) {
	w.mu.Lock()
	defer w.mu.Unlock()
	watched := w.watched
	lastIndex := w.lastIndex
retry:
        // 訪問 Consul, 獲取可用的服務列表
	services, meta, err := w.c.Catalog().Service(w.service, "", &api.QueryOptions{
		WaitIndex: lastIndex,
	})
	if err != nil {
		return nil, err
	}
	if lastIndex == meta.LastIndex {
		if atomic.LoadInt32(&w.down) != 0 {
			return nil, nil
		}
		goto retry
	}
	lastIndex = meta.LastIndex
	var updating []*naming.Update
	for _, s := range services {
		ws := watched[s.ServiceID]
		fmt.Println(s.ServiceAddress, s.ServicePort)
		if ws == nil {
                        // 如果是新注冊的服務
			ws = &watchEntry{
				addr: net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)),
				modi: s.ModifyIndex,
			}
			watched[s.ServiceID] = ws
                          
			updating = append(updating, &naming.Update{
				Op:   naming.Add,
				Addr: ws.addr,
			})
		} else if ws.modi != s.ModifyIndex {
                        // 如果是原來的服務
			updating = append(updating, &naming.Update{
				Op:   naming.Delete,
				Addr: ws.addr,
			})
			ws.addr = net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort))
			ws.modi = s.ModifyIndex
			updating = append(updating, &naming.Update{
				Op:   naming.Add,
				Addr: ws.addr,
			})
		}
		ws.last = lastIndex
	}
	for id, ws := range watched {
		if ws.last != lastIndex {
			delete(watched, id)
			updating = append(updating, &naming.Update{
				Op:   naming.Delete,
				Addr: ws.addr,
			})
		}
	}
	w.watched = watched
	w.lastIndex = lastIndex
	return updating, nil
}

type consulResolver api.Client

func (r *consulResolver) Resolve(target string) (naming.Watcher, error) {
	return &consulWatcher{
		c:       (*api.Client)(r),
		service: target,
		watched: make(map[string]*watchEntry),
	}, nil
}

func ForConsul(reg *api.Client) naming.Resolver {
	return (*consulResolver)(reg)
}

server 端通過 Consul 注冊服務

修改examples/helloword/greeter_server/main.go,在啟動服務前,將服務的信息注冊到 Consul

package main

import (
	"context"
	"encoding/hex"
	"flag"
	"fmt"
	"log"
	"math/rand"
	"net"
	"strconv"
	"time"

	"github.com/hashicorp/consul/api"
	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
	// host = "192.168.10.102"
	// port = 50051
	ttl = 30 * time.Second
)

// server is used to implement helloworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
	port int
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received: %v", in.GetName())
	return &pb.HelloReply{Message: fmt.Sprintf("Hello %s, from %d", in.GetName(), s.port)}, nil
}

func main() {

	host := flag.String("h", "127.0.0.1", "host")
	port := flag.Int("p", 50051, "port")
	flag.Parse()

	lis, err := net.Listen("tcp", net.JoinHostPort(*host, strconv.Itoa(*port)))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// Consul Client
	registry, err := api.NewClient(api.DefaultConfig())
	if err != nil {
		log.Fatalln(err)
	}

	var h [16]byte
	rand.Read(h[:])
	// 生成一個全局ID
	id := fmt.Sprintf("helloserver-%s-%d", hex.EncodeToString(h[:]), *port)
	fmt.Println(id)
	// 注冊到 Consul,包含地址、端口信息,以及健康檢查
	err = registry.Agent().ServiceRegister(&api.AgentServiceRegistration{
		ID:      id,
		Name:    "helloserver",
		Port:    *port,
		Address: *host,
		Check: &api.AgentServiceCheck{
			TTL:     (ttl + time.Second).String(),
			Timeout: time.Minute.String(),
		},
	})
	if err != nil {
		log.Fatalln(err)
	}
	go func() {
		checkid := "service:" + id
		for range time.Tick(ttl) {
			err := registry.Agent().PassTTL(checkid, "")
			if err != nil {
				log.Fatalln(err)
			}
		}
	}()

	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{port: *port})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

client 端通過 Consul 發現服務

package main

import (
	"context"
	"log"
	"time"

	"github.com/hashicorp/consul/api"
	"google.golang.org/grpc"
	"google.golang.org/grpc/examples/grpcresolver"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {
	// consul
	registry, err := api.NewClient(api.DefaultConfig())
	if err != nil {
		log.Fatalln(err)
	}

	// 自定義 LB,並使用剛才寫的 Consul Resolver
	lbrr := grpc.RoundRobin(grpcresolver.ForConsul(registry))

	// Set up a connection to the server.
	conn, err := grpc.Dial("helloserver", grpc.WithInsecure(), grpc.WithBalancer(lbrr))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// 調用 server 端 RPC,通過響應觀察負載均衡
	for range time.Tick(time.Second) {
		name := defaultName
		r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
		if err != nil {
			log.Fatalf("could not greet: %v", err)
			continue
		}
		log.Printf("server reply: %s", r.GetMessage())
	}
}

啟動 & 查看

  1. 啟動兩個 Server,設置不同的啟動端口
# 啟動 server1
$ go run grpc-go\examples\helloworld\greeter_server\main.go -p 50015
helloserver-52fdfc072182654f163f5f0f9a621d72-50015
# 啟動 server2
$ go run grpc-go\examples\helloworld\greeter_server\main.go -p 50014
helloserver-52fdfc072182654f163f5f0f9a621d72-50014

通過 Consul Web UI 查看,兩個 instance 均是健康的

  1. 啟動 Client
$ go run grpc-go\examples\helloworld\greeter_client\main.go
2021/09/12 16:42:39 server reply: Hello world, from 50014
2021/09/12 16:42:40 server reply: Hello world, from 50015
2021/09/12 16:42:41 server reply: Hello world, from 50014
2021/09/12 16:42:42 server reply: Hello world, from 50015
2021/09/12 16:42:43 server reply: Hello world, from 50014
2021/09/12 16:42:44 server reply: Hello world, from 50015
2021/09/12 16:42:45 server reply: Hello world, from 50014

可以看到是均勻對兩個 server 發起調用,當我們將其中一個 instance server2 關掉(模擬不可用的情況),流量全面全部轉移到另一台上了

說明失敗轉移也是正常的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM