go etcd服務發現


 一.etcd簡介

  etcd 是一個分布式鍵值對存儲系統,由coreos 開發,內部采用 raft 協議作為一致性算法,用於可靠、快速地保存關鍵數據,並提供訪問。通過分布式鎖、leader選舉和寫屏障(write barriers),來實現可靠的分布式協作。etcd集群是為高可用、持久化數據存儲和檢索而准備。

  概念詞匯

    Raft:         etcd所采用的保證分布式系統強一致性的算法。
    Node:       一個Raft狀態機實例。
    Member:  一個etcd實例。它管理着一個Node,並且可以為客戶端請求提供服務。
    Cluster:    由多個Member構成、可以協同工作的etcd集群。
    Peer:        對同一個etcd集群中另外一個Member的稱呼。
    Client:      向etcd集群發送HTTP請求的客戶端。
    WAL:        預寫式日志,etcd用於持久化存儲的日志格式。
    snapshot: etcd防止WAL文件過多而設置的快照,存儲etcd數據狀態。
    Proxy:       etcd的一種模式,為etcd集群提供反向代理服務。
    Leader:     Raft算法中,通過競選而產生的、處理所有數據提交的節點。
    Follower:  競選失敗的節點作為Raft中的從屬節點,為算法提供強一致性保證。
    Candidate:當Follower超過一定時間接收不到Leader的心跳時轉變為Candidate開始競選。
    Term:        某個節點成為Leader到下一次競選時間,稱為一個Term。
    Index:       數據項編號。Raft中通過Term和Index來定位數據

  應用場景

    服務發現

    消息發布與訂閱

    負載均衡

    分布式通知與協調

    分布式鎖、分布式隊列

    集群監控與Leader競選

  etcd與redis

    etcd: 用於共享配置和服務發現的分布式一致鍵值存儲. etcd 是一種分布式鍵值存儲, 它提供了一種跨機器集群存儲數據的可靠方式. etcd 在網絡分區期間優雅地處理 master 選舉, 並且會容忍機器故障.

    redis:  持久化在磁盤上的內存數據庫, Redis 是一個開源、BSD 許可的高級鍵值存儲. 它通常被稱為數據結構服務器, 因為鍵可以包含字符串、散列、列表、集合和排序集合.

二.etcd安裝

  采用二進制安裝,解壓后將etcd和etcdctl二進制文件復制到/user/bin/下即可:

wget https://github.com/coreos/etcd/releases/download/v3.5.1/etcd-v3.5.1-linux-amd64.tar.gz

  版本查看:

root@master ~ > etcdctl version etcdctl version: 3.5.1 API version: 3.5

  etcd常用命令:

COMMANDS:
    alarm disarm        Disarms all alarms
    alarm list        Lists all alarms
    auth disable        Disables authentication
    auth enable        Enables authentication
    auth status        Returns authentication status
    check datascale        Check the memory usage of holding data for different workloads on a given server endpoint.
    check perf        Check the performance of the etcd cluster
    compaction        Compacts the event history in etcd
    defrag            Defragments the storage of the etcd members with given endpoints
    del            Removes the specified key or range of keys [key, range_end)
    elect            Observes and participates in leader election
    endpoint hashkv        Prints the KV history hash for each endpoint in --endpoints
    endpoint health        Checks the healthiness of endpoints specified in `--endpoints` flag
    endpoint status        Prints out the status of endpoints specified in `--endpoints` flag
    get            Gets the key or a range of keys
    help            Help about any command
    lease grant        Creates leases
    lease keep-alive    Keeps leases alive (renew)
    lease list        List all active leases
    lease revoke        Revokes leases
    lease timetolive    Get lease information
    lock            Acquires a named lock
    make-mirror        Makes a mirror at the destination etcd cluster
    member add        Adds a member into the cluster
    member list        Lists all members in the cluster
    member promote        Promotes a non-voting member in the cluster
    member remove        Removes a member from the cluster
    member update        Updates a member in the cluster
    move-leader        Transfers leadership to another etcd cluster member.
    put            Puts the given key into the store
    role add        Adds a new role
    role delete        Deletes a role
    role get        Gets detailed information of a role
    role grant-permission    Grants a key to a role
    role list        Lists all roles
    role revoke-permission    Revokes a key from a role
    snapshot restore    Restores an etcd member snapshot to an etcd directory
    snapshot save        Stores an etcd node backend snapshot to a given file
    snapshot status        [deprecated] Gets backend snapshot status of a given file
    txn            Txn processes all the requests in one transaction
    user add        Adds a new user
    user delete        Deletes a user
    user get        Gets detailed information of a user
    user grant-role        Grants a role to a user
    user list        Lists all users
    user passwd        Changes password of user
    user revoke-role    Revokes a role from a user
    version            Prints the version of etcdctl
    watch            Watches events stream on keys or prefixes

OPTIONS:
      --cacert=""                verify certificates of TLS-enabled secure servers using this CA bundle
      --cert=""                    identify secure client using this TLS certificate file
      --command-timeout=5s            timeout for short running command (excluding dial timeout)
      --debug[=false]                enable client-side debug logging
      --dial-timeout=2s                dial timeout for client connections
  -d, --discovery-srv=""            domain name to query for SRV records describing cluster endpoints
      --discovery-srv-name=""            service name to query when using DNS discovery
      --endpoints=[127.0.0.1:2379]        gRPC endpoints
  -h, --help[=false]                help for etcdctl
      --hex[=false]                print byte strings as hex encoded strings
      --insecure-discovery[=true]        accept insecure SRV records describing cluster endpoints
      --insecure-skip-tls-verify[=false]    skip server certificate verification (CAUTION: this option should be enabled only for testing purposes)
      --insecure-transport[=true]        disable transport security for client connections
      --keepalive-time=2s            keepalive time for client connections
      --keepalive-timeout=6s            keepalive timeout for client connections
      --key=""                    identify secure client using this TLS key file
      --password=""                password for authentication (if this option is used, --user option shouldn't include password)
      --user=""                    username[:password] for authentication (prompt if password is not supplied)
  -w, --write-out="simple"            set the output format (fields, json, protobuf, simple, table)
View Code

  指定ip端口啟動etcd:

etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380

三.go操作etcd

  put和get

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"192.168.79.134:2379"}, // etcd節點,因為使用的單節點,所以這里只有一個
		DialTimeout: 5 * time.Second,                 //超時時間
	})
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("[INFO] connect to etcd success")
	defer cli.Close()
	// put
	// 設置超時時間
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	_, err = cli.Put(ctx, "test", "hello")
	cancel()
	if err != nil {
		fmt.Printf("put to etcd failed, err:%v\n", err)
		return
	}
	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, "test")
	cancel()
	if err != nil {
		fmt.Printf("get from etcd failed, err:%v\n", err)
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
	}
}

// [INFO] connect to etcd success
// test:hello

  watch

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"192.168.79.134:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// watch 
	// Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
	rch := cli.Watch(context.Background(), "test")
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
		}
	}
}

    監聽test的變化,Watch返回的是一個WatchResponse的管道類型,所以我們可以用for循環取值,每當test發生變化,watch就會發現並做相應操作:

type WatchResponse struct {
Header pb.ResponseHeader
Events []*storagepb.Event
CompactRevision int64
Canceled bool
}
type Event struct {
Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=storagepb.Event_EventType" json:"type,omitempty"`
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
}
root@master  ~ > etcdctl put test world
OK
root@master  ~ > etcdctl del test      
1

// $ go run watch.go
// connect to etcd success
// Type: PUT Key:test Value:world
// Type: DELETE Key:test Value: 

四.go中安裝etcd v3的坑

  ***建議直接使用clientv3的3.5版本,與grpc最新版本兼容,安裝: go get go.etcd.io/etcd/client/v3@v3.5.4,詳細版本迭代及操作建議參考官方說明: https://github.com/etcd-io/etcd/tree/main/client/v3

  ***如果受go版本或公司大環境影響不能使用最新版本,可以參考下邊操作

  1.當我們直接使用go get github.com/coreos/etcd/clientv3或者go get go.etcd.io/etcd時,會自動安裝etcd2.3.8版本,一個很久的版本,所以在安裝時一定要指定版本, 如: go get github.com/coreos/etcd/clientv3@v3.3.25

  2.必須安裝有grpc v1.26.0版本. 如果裝有多個版本的grpc,需要在go.mod中需添加下邊代碼或者直接不使用go mod tidy導包,而是通過go get google.golang.org/grpc@v1.26.0和go get github.com/coreos/etcd/clientv3@v3.3.25完成導包

replace google.golang.org/grpc v1.38.0 => google.golang.org/grpc v1.26.0 

五.etcd實現服務注冊和發現

  方法匯總:

    clientv3.New:          創建etcdv3客戶端(func New(cfg Config) (*Client, error))

    clientv3.Config:     創建客戶端時使用的配置

    Grant:                    初始化一個新租約(Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error))

    Put:                        注冊服務並綁定租約

    KeepAlive:              設置續租,定期發送續租請求(KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error))

    Revoke:                  撤銷租約

    Get:                        獲取服務

    Watch:                    監控服務

  實現流程:

                           

  實現代碼:

    1.服務注冊

      注冊一個前綴為/web的服務:

package main

import (
	"context"
	"github.com/coreos/etcd/clientv3"
	"log"
	"time"
)

//ServiceRegister 創建租約注冊服務
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //租約ID
	//租約keepalieve相應chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//NewServiceRegister 新建注冊服務
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	ser := &ServiceRegister{
		cli: cli,
		key: key,
		val: val,
	}

	//申請租約設置時間keepalive並注冊服務
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

//設置租約
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//設置租約時間
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	//注冊服務並綁定租約
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//設置續租 定期發送需求請求
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	log.Println(s.leaseID)
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan 監聽 續租情況
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("續約成功", leaseKeepResp)
	}
	log.Println("關閉續租")
}

// Close 注銷服務
func (s *ServiceRegister) Close() error {
	//撤銷租約
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤銷租約")
	return s.cli.Close()
}

func main() {
	var endpoints = []string{"192.168.79.134:2379"}
	ser, err := NewServiceRegister(endpoints, "/web", "192.168.1.51:8000", 5)
	if err != nil {
		log.Fatalln(err)
	}
	//監聽續租相應chan
	go ser.ListenLeaseRespChan()
	select {
	case <-time.After(20 * time.Second):
		ser.Close()
	}
}

    2.服務發現  

      通過/web前綴發現服務,並持續監測/web服務的變化

package main

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
)

//ServiceDiscovery 服務發現
type ServiceDiscovery struct {
	cli        *clientv3.Client  //etcd client
	serverList map[string]string //服務列表
	lock       sync.Mutex
}

//NewServiceDiscovery  新建發現服務
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli:        cli,
		serverList: make(map[string]string),
	}
}

//WatchService 初始化服務列表和監視
func (s *ServiceDiscovery) WatchService(prefix string) error {
	//根據前綴獲取現有的key
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}

	//監視前綴,修改變更的server
	go s.watcher(prefix)
	return nil
}

//watcher 監聽前綴
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //修改或者新增
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //刪除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList 新增服務地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serverList[key] = string(val)
	log.Println("put key :", key, "val:", val)
}

//DelServiceList 刪除服務地址
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serverList, key)
	log.Println("del key:", key)
}

//GetServices 獲取服務地址
func (s *ServiceDiscovery) GetServices() []string {
	s.lock.Lock()
	defer s.lock.Unlock()
	addrs := make([]string, 0)

	for _, v := range s.serverList {
		addrs = append(addrs, v)
	}
	return addrs
}

//Close 關閉服務
func (s *ServiceDiscovery) Close() error {
	return s.cli.Close()
}

func main() {
	var endpoints = []string{"192.168.79.134:2379"}
	ser := NewServiceDiscovery(endpoints)
	defer ser.Close()
	_ = ser.WatchService("/web")
	for {
		select {
		case <-time.Tick(10 * time.Second):
			log.Println(ser.GetServices())
		}
	}
}

    3.測試

六.grpc注冊etcd集群

  在使用grpc作為服務,並用etcd作為服務發現工具時要注意grpc版本.

      示例鏈接: https://github.com/UUPthub/grpc-etcdCluster

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM