gRPC負載均衡(客戶端負載均衡)


前言

上篇介紹了如何使用etcd實現服務發現,本篇將基於etcd的服務發現前提下,介紹如何實現gRPC客戶端負載均衡。

gRPC負載均衡

gRPC官方文檔提供了關於gRPC負載均衡方案Load Balancing in gRPC,此方案是為gRPC設計的,下面我們對此進行分析。

1、對每次調用進行負載均衡

gRPC中的負載平衡是以每次調用為基礎,而不是以每個連接為基礎。換句話說,即使所有的請求都來自一個客戶端,我們仍希望它們在所有的服務器上實現負載平衡。

2、負載均衡的方法

  • 集中式(Proxy Model)

在服務消費者和服務提供者之間有一個獨立的負載均衡(LB),通常是專門的硬件設備如 F5,或者基於軟件如 LVS,HAproxy等實現。LB上有所有服務的地址映射表,通常由運維配置注冊,當服務消費方調用某個目標服務時,它向LB發起請求,由LB以某種策略,比如輪詢(Round-Robin)做負載均衡后將請求轉發到目標服務。LB一般具備健康檢查能力,能自動摘除不健康的服務實例。

該方案主要問題:服務消費方、提供方之間增加了一級,有一定性能開銷,請求量大時,效率較低。

可能有讀者會認為集中式負載均衡存在這樣的問題,一旦負載均衡服務掛掉,那整個系統將不能使用。
解決方案:可以對負載均衡服務進行DNS負載均衡,通過對一個域名設置多個IP地址,每次DNS解析時輪詢返回負載均衡服務地址,從而實現簡單的DNS負載均衡。

  • 客戶端負載(Balancing-aware Client)

針對第一個方案的不足,此方案將LB的功能集成到服務消費方進程里,也被稱為軟負載或者客戶端負載方案。服務提供方啟動時,首先將服務地址注冊到服務注冊表,同時定期報心跳到服務注冊表以表明服務的存活狀態,相當於健康檢查,服務消費方要訪問某個服務時,它通過內置的LB組件向服務注冊表查詢,同時緩存並定期刷新目標服務地址列表,然后以某種負載均衡策略選擇一個目標服務地址,最后向目標服務發起請求。LB和服務發現能力被分散到每一個服務消費者的進程內部,同時服務消費方和服務提供方之間是直接調用,沒有額外開銷,性能比較好。

該方案主要問題:要用多種語言、多個版本的客戶端編寫和維護負載均衡策略,使客戶端的代碼大大復雜化。

  • 獨立LB服務(External Load Balancing Service)

該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。

不同之處是將LB和服務發現功能從進程內移出來,變成主機上的一個獨立進程。主機上的一個或者多個服務要訪問目標服務時,他們都通過同一主機上的獨立LB進程做服務發現和負載均衡。該方案也是一種分布式方案沒有單點問題,服務調用方和LB之間是進程內調用性能好,同時該方案還簡化了服務調用方,不需要為不同語言開發客戶庫。

本篇將介紹第二種負載均衡方法,客戶端負載均衡。

實現gRPC客戶端負載均衡

gRPC已提供了簡單的負載均衡策略(如:Round Robin),我們只需實現它提供的BuilderResolver接口,就能完成gRPC客戶端負載均衡。

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
	Scheme() string
}

Builder接口:創建一個resolver(本文稱之服務發現),用於監視名稱解析更新。
Build方法:為給定目標創建一個新的resolver,當調用grpc.Dial()時執行。
Scheme方法:返回此resolver支持的方案,Scheme定義可參考:https://github.com/grpc/grpc/blob/master/doc/naming.md

type Resolver interface {
	ResolveNow(ResolveNowOption)
	Close()
}

Resolver接口:監視指定目標的更新,包括地址更新和服務配置更新。
ResolveNow方法:被 gRPC 調用,以嘗試再次解析目標名稱。只用於提示,可忽略該方法。
Close方法:關閉resolver

根據以上兩個接口,我們把服務發現的功能寫在Build方法中,把獲取到的負載均衡服務地址返回到客戶端,並監視服務更新情況,以修改客戶端連接。
修改服務發現代碼,discovery.go

package etcdv3

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc/resolver"
)

const schema = "grpclb"

//ServiceDiscovery 服務發現
type ServiceDiscovery struct {
	cli        *clientv3.Client //etcd client
	cc         resolver.ClientConn
	serverList map[string]resolver.Address //服務列表
	lock       sync.Mutex
}

//NewServiceDiscovery  新建發現服務
func NewServiceDiscovery(endpoints []string) resolver.Builder {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli: cli,
	}
}

//Build 為給定目標創建一個新的`resolver`,當調用`grpc.Dial()`時執行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
	log.Println("Build")
	s.cc = cc
	s.serverList = make(map[string]resolver.Address)
	prefix := "/" + target.Scheme + "/" + target.Endpoint + "/"
	//根據前綴獲取現有的key
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return nil, err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}
	s.cc.NewAddress(s.getServices())
	//監視前綴,修改變更的server
	go s.watcher(prefix)
	return s, nil
}

// ResolveNow 監視目標更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
	log.Println("ResolveNow")
}

//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
	return schema
}

//Close 關閉
func (s *ServiceDiscovery) Close() {
	log.Println("Close")
	s.cli.Close()
}

//watcher 監聽前綴
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //新增或修改
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //刪除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList 新增服務地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serverList[key] = resolver.Address{Addr: val}
	s.cc.NewAddress(s.getServices())
	log.Println("put key :", key, "val:", val)
}

//DelServiceList 刪除服務地址
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serverList, key)
	s.cc.NewAddress(s.getServices())
	log.Println("del key:", key)
}

//GetServices 獲取服務地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
	addrs := make([]resolver.Address, 0, len(s.serverList))

	for _, v := range s.serverList {
		addrs = append(addrs, v)
	}
	return addrs
}

代碼主要修改以下地方:

  1. 把獲取的服務地址轉成resolver.Address,供gRPC客戶端連接。

  2. 根據schema的定義規則,修改key格式。

服務注冊主要修改key存儲格式,register.go

package etcdv3

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

//ServiceRegister 創建租約注冊服務
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //租約ID
	//租約keepalieve相應chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//NewServiceRegister 新建注冊服務
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	ser := &ServiceRegister{
		cli: cli,
		key: "/" + schema + "/" + serName + "/" + addr,
		val: addr,
	}

	//申請租約設置時間keepalive
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

//設置租約
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//設置租約時間
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	//注冊服務並綁定租約
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//設置續租 定期發送需求請求
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan 監聽 續租情況
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("續約成功", leaseKeepResp)
	}
	log.Println("關閉續租")
}

// Close 注銷服務
func (s *ServiceRegister) Close() error {
	//撤銷租約
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤銷租約")
	return s.cli.Close()
}

客戶端修改gRPC連接服務的部分代碼即可:

func main() {
	r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
	resolver.Register(r)
	// 連接服務器
	conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

	// 建立gRPC連接
	grpcClient = pb.NewSimpleClient(conn)

gRPC內置了簡單的負載均衡策略round_robin,根據負載均衡地址,以輪詢的方式進行調用服務。

服務端啟動時,把服務地址注冊到etcd中即可:

func main() {
	// 監聽本地端口
	listener, err := net.Listen(Network, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	log.Println(Address + " net.Listing...")
	// 新建gRPC服務器實例
	grpcServer := grpc.NewServer()
	// 在gRPC服務器注冊我們的服務
	pb.RegisterSimpleServer(grpcServer, &SimpleService{})
	//把服務注冊到etcd
	ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5)
	if err != nil {
		log.Fatalf("register service err: %v", err)
	}
	defer ser.Close()
	//用服務器 Serve() 方法以及我們的端口信息區實現阻塞等待,直到進程被殺死或者 Stop() 被調用
	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcServer.Serve err: %v", err)
	}
}

運行效果

我們先啟動並注冊三個服務

然后客戶端進行調用

看服務端接收到的請求

關閉localhost:8000服務,剩余localhost:8001localhost:8002服務接收請求

重新打開localhost:8000服務

可以看到,gRPC客戶端負載均衡運行良好。

總結

本文介紹了gRPC客戶端負載均衡的實現,它簡單實現了gRPC負載均衡的功能。但在對接其他語言時候比較麻煩,需要每種語言都實現一套服務發現和負載策略。

目前官方只提供了取第一個地址pick_first和輪詢round_robin兩種負載均衡策略。下篇將介紹如何自定義負載均衡策略。

有興趣了解第三種負載均衡方法External Load Balancing Service的,可以參考這個項目:https://github.com/bsm/grpclb

源碼地址:https://github.com/Bingjian-Zhu/etcd-example

參考:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM