二、Golang代碼操作etcd
2.1、etcd安裝
# 官網:
https://github.com/etcd-io/etcd/tree/main/client/v3
https://pkg.go.dev/github.com/coreos/etcd/clientv3#pkg-index
# 安裝依賴
go get go.etcd.io/etcd/client/v3
# 安裝etcd
[root@node01 ~]# yum install -y etcd
# 設置開機自啟動
systemctl enable etcd
# 啟動etcd
systemctl start etcd
# 查看etcd運行狀態
systemctl status etcd
# systemd配置
從systemctl status etcd命令的輸出可以看到,etcd的 systemd配置文件位於/usr/lib/systemd/system/etcd.service,該配置文件的內容如下:
$ cat /usr/lib/systemd/system/etcd.service
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target
[Service]
Type=notify
WorkingDirectory=/var/lib/etcd/
EnvironmentFile=-/etc/etcd/etcd.conf
User=etcd
# set GOMAXPROCS to number of processors
ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${ETCD_NAME}\" --data-dir=\"${ETCD_DATA_DIR}\" --listen-client-urls=\"${ETCD_LISTEN_CLIENT_URLS}\""
Restart=on-failure
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
# 從上面的配置中可以看到,etcd的配置文件位於/etc/etcd/etcd.conf,如果我們想要修改某些配置項,可以編輯該文件。
# 遠程訪問
etcd安裝完成后,默認只能本地訪問,如果需要開啟遠程訪問,還需要修改/etc/etcd/etcd.conf中的配置。例如,本實例中我安裝etcd的機器IP是10.103.18.41,我嘗試通過自己的機器遠程訪問10.103.18.41上安裝的etcd的2379端口,結果訪問被拒絕:
# 修改/etc/etcd/etcd.conf配置:
ETCD_LISTEN_CLIENT_URLS="http://10.103.18.41:2379,http://localhost:2379"
# 然后重啟
systemctl restart etcd
2.2、代碼操作
- 連接etcd
package main
import (
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
client *clientv3.Client
err error
)
func main() {
// ETCD客戶端連接信息
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"}, // 節點信息
DialTimeout: 5 * time.Second, // 超時時間
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
fmt.Println(client)
}
- 操作etcd
- 相關理論
-
-
Revision
- 作用域為集群,邏輯時間戳,全局單調遞增,任何 key 修改都會使其自增
-
-
-
CreateRevision
- 作用域為 key, 等於創建這個 key 時的 Revision, 直到刪除前都保持不變
-
-
-
ModRevision
- 作用域為 key, 等於修改這個 key 時的 Revision, 只要這個 key 更新都會改變
-
-
-
Version
- 作用域為 key, 某一個 key 的修改次數(從創建到刪除),與以上三個 Revision 無關
-
關於 watch 哪個版本:
- watch 某一個 key 時,想要從歷史記錄開始就用 CreateRevision,最新一條(這一條直接返回)開始就用
ModRevision
- watch 某個前綴,就必須使用
Revision
- 增加一個key、查詢一個key、刪除一個key
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
client *clientv3.Client
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
delResp *clientv3.DeleteResponse
kv clientv3.KV
err error
)
func main() {
// ETCD客戶端連接信息
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"}, // 節點信息
DialTimeout: 5 * time.Second, // 超時時間
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
// 用於讀寫ETCD的鍵值對
kv = clientv3.NewKV(client)
// 操作etcd,context.TODO() 這是一個上下文,如果這上下文不知道選那種,就選這個萬精油;clientv3.WithPrevKV()加這參數獲取前一個kv的值
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "1008611", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
// Revision: 作用域為集群,邏輯時間戳,全局單調遞增,任何 key 修改都會使其自增
fmt.Println("Revision is:", putResp.Header.Revision)
if putResp.PrevKv != nil {
// 查看被更新的K V
fmt.Println("更新的Key是:", string(putResp.PrevKv.Key))
fmt.Println("被更新的Value是:", string(putResp.PrevKv.Value))
}
// 讀取ETCD數據
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
// 讀取ETCD數據,獲取前綴相同的WithPrefix()
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
// 刪除ETCD數據;WithPrevKV--->賦值數據給delResp.PrevKvs,方便后續判斷
// 刪除多個key:kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
// 打印被刪除之前的kv
if len(delResp.PrevKvs) != 0 {
for _, kvpx := range delResp.PrevKvs {
fmt.Println("被刪除的數據是: ", string(kvpx.Key), string(kvpx.Value))
}
}
}
- 租約、自動租約、lease
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
leaseID clientv3.LeaseID
client *clientv3.Client
LeaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse // 只讀管道
kv clientv3.KV
err error
)
func main() {
// 連接客戶端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 獲取kv API子集
kv = clientv3.NewKV(client)
// 申請一個租約 lease
lease := clientv3.Lease(client)
// 申請一個10s的租約
if LeaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println("租約申請失敗", err)
return
}
// 租約ID
leaseID = LeaseGrantResp.ID
// 自動續租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseID); err != nil {
fmt.Println("自動續租失敗", err)
return
}
/*
10s后自動過期
ctx, canceFunc := context.WithCancel(context.TODO())
// 自動續租
if keepRespChan, err = lease.KeepAlive(ctx, leaseID); err != nil {
fmt.Println("自動續租失敗", err)
return
}
canceFunc()
*/
// 處理續約應答的協程 消費keepRespChan
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租約已經失效了")
goto END
} else {
// KeepAlive每秒會續租一次,所以就會收到一次應答
fmt.Println("收到應答,租約ID是:", keepResp.ID)
}
}
}
END:
}()
// put一個kv,讓他與租約關聯起來,從而實現10s后自動過期,key就會被刪除; 關聯用的是clientv3.WithLease(leaseID)
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job3", "3", clientv3.WithLease(leaseID)); err != nil {
fmt.Println(err)
return
}
fmt.Println("寫入成功:", putResp.Header.Revision)
// 判斷key是否過期
for {
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job3"); err != nil {
fmt.Println(err)
return
}
// 如果等於0,說明過期了
if getResp.Count == 0 {
fmt.Println("kv過期了")
break
} else {
fmt.Println("沒過期", getResp.Kvs)
}
time.Sleep(2 * time.Second)
}
}
-
watch操作
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
leaseID clientv3.LeaseID
watcher clientv3.Watcher
kv clientv3.KV
watchResp clientv3.WatchResponse
event *clientv3.Event
client *clientv3.Client
LeaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse // 只讀管道
watchRespChan <-chan clientv3.WatchResponse
watchStartRevision int64
err error
)
func main() {
// 連接客戶端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 獲取kv API子集
kv = clientv3.NewKV(client)
// 模擬etcd中數據的變化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job18", "I am 18")
kv.Delete(context.TODO(), "/cron/jobs/job18")
time.Sleep(1 * time.Second)
}
}()
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job18"); err != nil {
fmt.Printf("getResp err:%v\n", err)
return
}
if len(getResp.Kvs) != 0 {
fmt.Println(getResp.Kvs[0].Value)
}
// 當前etcd集群事務ID,單調遞增的
watchStartRevision = getResp.Header.Revision + 1
// 創建個 watcher
watcher = clientv3.NewWatcher(client)
// 啟動監聽
fmt.Println("從該Revision版本向后監聽:", watchStartRevision)
// 一直監聽
// watchRespChan = watcher.Watch(context.TODO(), "/cron/jobs/job18", clientv3.WithRev(watchStartRevision))
// 自動關閉監聽,調用canceFunc()函數即可取消
xtc, canceFunc := context.WithCancel(context.TODO())
// xx秒后干什么事--->time.AfterFunc,執行匿名函數
time.AfterFunc(5*time.Second, func() {
canceFunc()
})
//啟動監聽
watchRespChan = watcher.Watch(xtc, "/cron/jobs/job18", clientv3.WithRev(watchStartRevision))
// 處理kv變化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改為:", string(event.Kv.Value), "CreateRevision is:", event.Kv.CreateRevision, "ModRevision is:", event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("刪除了:", "Revision is", event.Kv.ModRevision)
}
}
}
}
// xx秒后干什么事--->time.AfterFunc,執行匿名函數
time.AfterFunc(5*time.Second, func() {
fmt.Println("1")
})
-
OP的方式PUT、GET數據
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
kv clientv3.KV
putOp clientv3.Op
getOp clientv3.Op
opResp clientv3.OpResponse
client *clientv3.Client
err error
)
func main() {
// 連接客戶端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 獲取kv API子集
kv = clientv3.NewKV(client)
// 創建OP---> k v 對象
putOp = clientv3.OpPut("/cron/jobs/job19", "19")
// 執行OP
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Printf("執行OP faild, err:%v\n", err)
return
}
fmt.Println("Revision is:", opResp.Put().Header.Revision)
// 創建OP---> k v 對象
getOp = clientv3.OpGet("/cron/jobs/job19")
// 執行OP
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Printf("執行OP faild, err:%v\n", err)
return
}
// 打印數據
fmt.Println("數據ModRevision", opResp.Get().Kvs[0].ModRevision)
fmt.Println("數據Value", string(opResp.Get().Kvs[0].Value))
}
-
分布式鎖
- 同時運行兩次,驗證代碼
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
leaseID clientv3.LeaseID
ctx context.Context
canceFunc context.CancelFunc
txn clientv3.Txn
client *clientv3.Client
LeaseGrantResp *clientv3.LeaseGrantResponse
keepResp *clientv3.LeaseKeepAliveResponse
txnResp *clientv3.TxnResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse // 只讀管道
kv clientv3.KV
err error
)
/*
lease實現鎖自動過期
op操着
txn事務:if else then
*/
func main() {
// 連接客戶端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 1、上鎖(創建租約、自動續租、拿着租約去搶占一個key)
// 申請一個租約 lease
lease := clientv3.Lease(client)
// 申請一個5s的租約
if LeaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println("租約申請失敗", err)
return
}
// 租約ID
leaseID = LeaseGrantResp.ID
// 准備一個用於取消的自動續租的context; cancanceFunc 取消續租調用這個函數即可
ctx, canceFunc = context.WithCancel(context.TODO())
// 確保函數退出后,自動續約會停止
defer canceFunc()
defer lease.Revoke(context.TODO(), leaseID)
// 自動續租
if keepRespChan, err = lease.KeepAlive(ctx, leaseID); err != nil {
fmt.Println("自動續租失敗", err)
return
}
// 判斷續約應答的協程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租約已經失效了")
goto END
} else {
// KeepAlive每秒會續租一次,所以就會收到一次應答
fmt.Println("收到應答,租約ID是:", keepResp.ID)
}
}
}
END:
}()
// ***拿着租約去搶占一個key***
// 獲取kv API子集
kv = clientv3.NewKV(client)
// 創建事務
txn = kv.Txn(context.TODO())
// 定義事務
// 如果key不存在;關聯用的是clientv3.WithLease(leaseID)
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job19"), "=", 0)).
// 不存在就put一個key
Then(clientv3.OpPut("/cron/lock/job19", "xxx", clientv3.WithLease(leaseID))).
// 否則槍鎖失敗
Else(clientv3.OpGet("/cron/lock/job19"))
// 提交事務
if txnResp, err = txn.Commit(); err != nil {
fmt.Println("txn err", err)
return
}
// 判斷釋放搶到鎖
if !txnResp.Succeeded {
fmt.Println("鎖被占用", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2、處理業務
fmt.Println("處理任務")
time.Sleep(50 * time.Second)
// 3、釋放鎖(取消自動續租、釋放租約)
/*
defer canceFunc()
defer lease.Revoke(context.TODO(), leaseID)
上面這個釋放了租約,關聯的kv會被刪除,從而達到釋放鎖
*/
}