Golang代碼操作etcd


二、Golang代碼操作etcd

2.1、etcd安裝

# 官網:
https://github.com/etcd-io/etcd/tree/main/client/v3
https://pkg.go.dev/github.com/coreos/etcd/clientv3#pkg-index

# 安裝依賴
go get go.etcd.io/etcd/client/v3

# 安裝etcd
[root@node01 ~]# yum install -y etcd
# 設置開機自啟動
systemctl enable etcd
# 啟動etcd
systemctl start etcd
# 查看etcd運行狀態
systemctl status etcd

# systemd配置
從systemctl status etcd命令的輸出可以看到,etcd的 systemd配置文件位於/usr/lib/systemd/system/etcd.service,該配置文件的內容如下:

$ cat /usr/lib/systemd/system/etcd.service
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target

[Service]
Type=notify
WorkingDirectory=/var/lib/etcd/
EnvironmentFile=-/etc/etcd/etcd.conf
User=etcd
# set GOMAXPROCS to number of processors
ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${ETCD_NAME}\" --data-dir=\"${ETCD_DATA_DIR}\" --listen-client-urls=\"${ETCD_LISTEN_CLIENT_URLS}\""
Restart=on-failure
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target

# 從上面的配置中可以看到,etcd的配置文件位於/etc/etcd/etcd.conf,如果我們想要修改某些配置項,可以編輯該文件。

# 遠程訪問
etcd安裝完成后,默認只能本地訪問,如果需要開啟遠程訪問,還需要修改/etc/etcd/etcd.conf中的配置。例如,本實例中我安裝etcd的機器IP是10.103.18.41,我嘗試通過自己的機器遠程訪問10.103.18.41上安裝的etcd的2379端口,結果訪問被拒絕:

# 修改/etc/etcd/etcd.conf配置:
ETCD_LISTEN_CLIENT_URLS="http://10.103.18.41:2379,http://localhost:2379"

# 然后重啟
systemctl restart etcd

2.2、代碼操作

  • 連接etcd
package main

import (
	"fmt"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

var (
	config clientv3.Config
	client *clientv3.Client
	err    error
)

func main() {

	// ETCD客戶端連接信息
	config = clientv3.Config{
		Endpoints:   []string{"192.168.1.210:2379"}, // 節點信息
		DialTimeout: 5 * time.Second,                // 超時時間
	}

	// 建立連接
	if client, err = clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(client)
}
  • 操作etcd
  • 相關理論
  1. Revision
    作用域為集群,邏輯時間戳,全局單調遞增,任何 key 修改都會使其自增
  2. CreateRevision
    作用域為 key, 等於創建這個 key 時的 Revision, 直到刪除前都保持不變
  3. ModRevision
    作用域為 key, 等於修改這個 key 時的 Revision, 只要這個 key 更新都會改變
  4. Version
    作用域為 key, 某一個 key 的修改次數(從創建到刪除),與以上三個 Revision 無關

關於 watch 哪個版本:

  1. watch 某一個 key 時,想要從歷史記錄開始就用 CreateRevision,最新一條(這一條直接返回)開始就用 ModRevision
  2. watch 某個前綴,就必須使用 Revision
  • 增加一個key、查詢一個key、刪除一個key
package main

import (
	"context"
	"fmt"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

var (
	config  clientv3.Config
	client  *clientv3.Client
	putResp *clientv3.PutResponse
	getResp *clientv3.GetResponse
	delResp *clientv3.DeleteResponse
	kv      clientv3.KV
	err     error
)

func main() {

	// ETCD客戶端連接信息
	config = clientv3.Config{
		Endpoints:   []string{"192.168.1.210:2379"}, // 節點信息
		DialTimeout: 5 * time.Second,                // 超時時間
	}

	// 建立連接
	if client, err = clientv3.New(config); err != nil {
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")


	// 用於讀寫ETCD的鍵值對
	kv = clientv3.NewKV(client)
	// 操作etcd,context.TODO() 這是一個上下文,如果這上下文不知道選那種,就選這個萬精油;clientv3.WithPrevKV()加這參數獲取前一個kv的值
	if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "1008611", clientv3.WithPrevKV()); err != nil {
		fmt.Println(err)
		return
	}
	// Revision: 作用域為集群,邏輯時間戳,全局單調遞增,任何 key 修改都會使其自增
	fmt.Println("Revision is:", putResp.Header.Revision)
	if putResp.PrevKv != nil {
		// 查看被更新的K V
		fmt.Println("更新的Key是:", string(putResp.PrevKv.Key))
		fmt.Println("被更新的Value是:", string(putResp.PrevKv.Value))
	}

	// 讀取ETCD數據
	if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(getResp.Kvs)

	// 讀取ETCD數據,獲取前綴相同的WithPrefix()
	if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(getResp.Kvs)

	// 刪除ETCD數據;WithPrevKV--->賦值數據給delResp.PrevKvs,方便后續判斷
	// 刪除多個key:kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
	if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
		fmt.Println(err)
		return
	}
	// 打印被刪除之前的kv
	if len(delResp.PrevKvs) != 0 {
		for _, kvpx := range delResp.PrevKvs {
			fmt.Println("被刪除的數據是: ", string(kvpx.Key), string(kvpx.Value))
		}
	}
}
  • 租約、自動租約、lease
package main

import (
	"context"
	"fmt"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

var (
	config         clientv3.Config
	leaseID        clientv3.LeaseID
	client         *clientv3.Client
	LeaseGrantResp *clientv3.LeaseGrantResponse
	putResp        *clientv3.PutResponse
	getResp        *clientv3.GetResponse
	keepResp       *clientv3.LeaseKeepAliveResponse
	keepRespChan   <-chan *clientv3.LeaseKeepAliveResponse // 只讀管道
	kv             clientv3.KV
	err            error
)

func main() {
	// 連接客戶端配置文件
	config = clientv3.Config{
		Endpoints:   []string{"192.168.1.210:2379"},
		DialTimeout: 5 * time.Second,
	}

	// 建立連接
	if client, err = clientv3.New(config); err != nil {
		fmt.Printf("conect to etcd faild, err:%v\n", err)
		return
	} else {
		fmt.Println("connect to etcd success")
	}

	// 獲取kv API子集
	kv = clientv3.NewKV(client)

	// 申請一個租約 lease
	lease := clientv3.Lease(client)

	// 申請一個10s的租約
	if LeaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
		fmt.Println("租約申請失敗", err)
		return
	}

	// 租約ID
	leaseID = LeaseGrantResp.ID

	// 自動續租
	if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseID); err != nil {
		fmt.Println("自動續租失敗", err)
		return
	}

    /* 
      10s后自動過期
      ctx, canceFunc := context.WithCancel(context.TODO())
	  // 自動續租
	  if keepRespChan, err = lease.KeepAlive(ctx, leaseID); err != nil {
	  	  fmt.Println("自動續租失敗", err)
		  return
	  }
	  canceFunc()
    
    */
    
	// 處理續約應答的協程  消費keepRespChan
	go func() {
		for {
			select {
			case keepResp = <-keepRespChan:
				if keepRespChan == nil {
					fmt.Println("租約已經失效了")
					goto END
				} else {
					// KeepAlive每秒會續租一次,所以就會收到一次應答
					fmt.Println("收到應答,租約ID是:", keepResp.ID)
				}
			}
		}
	END:
	}()

	// put一個kv,讓他與租約關聯起來,從而實現10s后自動過期,key就會被刪除; 關聯用的是clientv3.WithLease(leaseID)
	if putResp, err = kv.Put(context.TODO(), "/cron/lock/job3", "3", clientv3.WithLease(leaseID)); err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("寫入成功:", putResp.Header.Revision)

	// 判斷key是否過期
	for {
		if getResp, err = kv.Get(context.TODO(), "/cron/lock/job3"); err != nil {
			fmt.Println(err)
			return
		}
		// 如果等於0,說明過期了
		if getResp.Count == 0 {
			fmt.Println("kv過期了")
			break
		} else {
			fmt.Println("沒過期", getResp.Kvs)
		}
		time.Sleep(2 * time.Second)
	}
}
  • watch操作

package main

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
)

var (
	config             clientv3.Config
	leaseID            clientv3.LeaseID
	watcher            clientv3.Watcher
	kv                 clientv3.KV
	watchResp          clientv3.WatchResponse
	event              *clientv3.Event
	client             *clientv3.Client
	LeaseGrantResp     *clientv3.LeaseGrantResponse
	putResp            *clientv3.PutResponse
	getResp            *clientv3.GetResponse
	keepResp           *clientv3.LeaseKeepAliveResponse
	keepRespChan       <-chan *clientv3.LeaseKeepAliveResponse // 只讀管道
	watchRespChan      <-chan clientv3.WatchResponse
	watchStartRevision int64
	err                error
)

func main() {
	// 連接客戶端配置文件
	config = clientv3.Config{
		Endpoints:   []string{"192.168.1.210:2379"},
		DialTimeout: 5 * time.Second,
	}

	// 建立連接
	if client, err = clientv3.New(config); err != nil {
		fmt.Printf("conect to etcd faild, err:%v\n", err)
		return
	} else {
		fmt.Println("connect to etcd success")
	}

	// 獲取kv API子集
	kv = clientv3.NewKV(client)

	// 模擬etcd中數據的變化
	go func() {
		for {
			kv.Put(context.TODO(), "/cron/jobs/job18", "I am 18")

			kv.Delete(context.TODO(), "/cron/jobs/job18")

			time.Sleep(1 * time.Second)
		}
	}()

	if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job18"); err != nil {
		fmt.Printf("getResp err:%v\n", err)
		return
	}

	if len(getResp.Kvs) != 0 {
		fmt.Println(getResp.Kvs[0].Value)
	}

	// 當前etcd集群事務ID,單調遞增的
	watchStartRevision = getResp.Header.Revision + 1

	// 創建個 watcher
	watcher = clientv3.NewWatcher(client)

	// 啟動監聽
	fmt.Println("從該Revision版本向后監聽:", watchStartRevision)

	// 一直監聽
	// watchRespChan = watcher.Watch(context.TODO(), "/cron/jobs/job18", clientv3.WithRev(watchStartRevision))

	// 自動關閉監聽,調用canceFunc()函數即可取消
	xtc, canceFunc := context.WithCancel(context.TODO())

	// xx秒后干什么事--->time.AfterFunc,執行匿名函數
	time.AfterFunc(5*time.Second, func() {
		canceFunc()
	})

	//啟動監聽
	watchRespChan = watcher.Watch(xtc, "/cron/jobs/job18", clientv3.WithRev(watchStartRevision))

	// 處理kv變化事件
	for watchResp = range watchRespChan {
		for _, event = range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改為:", string(event.Kv.Value), "CreateRevision is:", event.Kv.CreateRevision, "ModRevision is:", event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("刪除了:", "Revision is", event.Kv.ModRevision)
			}
		}
	}
}
// xx秒后干什么事--->time.AfterFunc,執行匿名函數
	time.AfterFunc(5*time.Second, func() {
        fmt.Println("1")
	})
  • OP的方式PUT、GET數據

package main

import (
	"context"
	"fmt"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

var (
	config clientv3.Config
	kv     clientv3.KV
	putOp  clientv3.Op
	getOp  clientv3.Op
	opResp clientv3.OpResponse
	client *clientv3.Client
	err    error
)

func main() {
	// 連接客戶端配置文件
	config = clientv3.Config{
		Endpoints:   []string{"192.168.1.210:2379"},
		DialTimeout: 5 * time.Second,
	}

	// 建立連接
	if client, err = clientv3.New(config); err != nil {
		fmt.Printf("conect to etcd faild, err:%v\n", err)
		return
	} else {
		fmt.Println("connect to etcd success")
	}

	// 獲取kv API子集
	kv = clientv3.NewKV(client)

	// 創建OP---> k v 對象
	putOp = clientv3.OpPut("/cron/jobs/job19", "19")

	// 執行OP
	if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
		fmt.Printf("執行OP faild, err:%v\n", err)
		return
	}

	fmt.Println("Revision is:", opResp.Put().Header.Revision)

	// 創建OP---> k v 對象
	getOp = clientv3.OpGet("/cron/jobs/job19")

	// 執行OP
	if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
		fmt.Printf("執行OP faild, err:%v\n", err)
		return
	}

	// 打印數據
	fmt.Println("數據ModRevision", opResp.Get().Kvs[0].ModRevision)
	fmt.Println("數據Value", string(opResp.Get().Kvs[0].Value))
}
  • 分布式鎖

    • 同時運行兩次,驗證代碼
package main

import (
	"context"
	"fmt"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

var (
	config         clientv3.Config
	leaseID        clientv3.LeaseID
	ctx            context.Context
	canceFunc      context.CancelFunc
	txn            clientv3.Txn
	client         *clientv3.Client
	LeaseGrantResp *clientv3.LeaseGrantResponse
	keepResp       *clientv3.LeaseKeepAliveResponse
	txnResp        *clientv3.TxnResponse
	keepRespChan   <-chan *clientv3.LeaseKeepAliveResponse // 只讀管道
	kv             clientv3.KV
	err            error
)

/*
	lease實現鎖自動過期
	op操着
	txn事務:if else then
*/

func main() {
	// 連接客戶端配置文件
	config = clientv3.Config{
		Endpoints:   []string{"192.168.1.210:2379"},
		DialTimeout: 5 * time.Second,
	}

	// 建立連接
	if client, err = clientv3.New(config); err != nil {
		fmt.Printf("conect to etcd faild, err:%v\n", err)
		return
	} else {
		fmt.Println("connect to etcd success")
	}

	// 1、上鎖(創建租約、自動續租、拿着租約去搶占一個key)
	// 申請一個租約 lease
	lease := clientv3.Lease(client)

	// 申請一個5s的租約
	if LeaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
		fmt.Println("租約申請失敗", err)
		return
	}

	// 租約ID
	leaseID = LeaseGrantResp.ID

	// 准備一個用於取消的自動續租的context; cancanceFunc 取消續租調用這個函數即可
	ctx, canceFunc = context.WithCancel(context.TODO())

	// 確保函數退出后,自動續約會停止
	defer canceFunc()
	defer lease.Revoke(context.TODO(), leaseID)

	// 自動續租
	if keepRespChan, err = lease.KeepAlive(ctx, leaseID); err != nil {
		fmt.Println("自動續租失敗", err)
		return
	}

	// 判斷續約應答的協程
	go func() {
		for {
			select {
			case keepResp = <-keepRespChan:
				if keepRespChan == nil {
					fmt.Println("租約已經失效了")
					goto END
				} else {
					// KeepAlive每秒會續租一次,所以就會收到一次應答
					fmt.Println("收到應答,租約ID是:", keepResp.ID)
				}
			}
		}
	END:
	}()

	// ***拿着租約去搶占一個key***
	// 獲取kv API子集
	kv = clientv3.NewKV(client)

	// 創建事務
	txn = kv.Txn(context.TODO())

	// 定義事務
	// 如果key不存在;關聯用的是clientv3.WithLease(leaseID)
	txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job19"), "=", 0)).
		// 不存在就put一個key
		Then(clientv3.OpPut("/cron/lock/job19", "xxx", clientv3.WithLease(leaseID))).
		// 否則槍鎖失敗
		Else(clientv3.OpGet("/cron/lock/job19"))

	// 提交事務
	if txnResp, err = txn.Commit(); err != nil {
		fmt.Println("txn err", err)
		return
	}

	// 判斷釋放搶到鎖
	if !txnResp.Succeeded {
		fmt.Println("鎖被占用", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
		return
	}

	// 2、處理業務
	fmt.Println("處理任務")
	time.Sleep(50 * time.Second)

	// 3、釋放鎖(取消自動續租、釋放租約)
	/*
		defer canceFunc()
		defer lease.Revoke(context.TODO(), leaseID)
		上面這個釋放了租約,關聯的kv會被刪除,從而達到釋放鎖
	*/
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM