package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" ) func main(){ var ( config clientv3.Config err error client *clientv3.Client ) //配置 config = clientv3.Config{ Endpoints:[]string{"192.168.1.188:2379"}, DialTimeout:time.Second*5, } //連接 if client,err = clientv3.New(config);err != nil{ fmt.Println(err) return } client=client }
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main(){ var ( config clientv3.Config err error client *clientv3.Client kv clientv3.KV putResp *clientv3.PutResponse ) //配置 config = clientv3.Config{ Endpoints:[]string{"192.168.1.188:2379"}, DialTimeout:time.Second*5, } //連接 床見一個客戶端 if client,err = clientv3.New(config);err != nil{ fmt.Println(err) return } //用於讀寫etcd的鍵值對 kv = clientv3.NewKV(client) putResp, err = kv.Put(context.TODO(),"/cron/jobs/job1","bye",clientv3.WithPrevKV()) if err != nil{ fmt.Println(err) }else{ //獲取版本信息 fmt.Println("Revision:",putResp.Header.Revision) if putResp.PrevKv != nil{ fmt.Println("key:",string(putResp.PrevKv.Key)) fmt.Println("Value:",string(putResp.PrevKv.Value)) fmt.Println("Version:",string(putResp.PrevKv.Version)) } } }
Revision: 10 key: /cron/jobs/job1 Value: hello Version:
get
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main(){ var ( config clientv3.Config err error client *clientv3.Client kv clientv3.KV getResp *clientv3.GetResponse ) //配置 config = clientv3.Config{ Endpoints:[]string{"192.168.1.188:2379"}, DialTimeout:time.Second*5, } //連接 床見一個客戶端 if client,err = clientv3.New(config);err != nil{ fmt.Println(err) return } //用於讀寫etcd的鍵值對 kv = clientv3.NewKV(client) getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1") if err != nil { fmt.Println(err) return } fmt.Println(getResp.Kvs) }
[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye" ]
with用法
//用於讀寫etcd的鍵值對 kv = clientv3.NewKV(client) getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1",clientv3.WithCountOnly()) if err != nil { fmt.Println(err) return } fmt.Println(getResp.Kvs,getResp.Count)
[] 1
讀取前綴
//用於讀寫etcd的鍵值對 kv = clientv3.NewKV(client) //讀取前綴 getResp,err = kv.Get(context.TODO(),"/cron/jobs/",clientv3.WithPrefix()) if err != nil { fmt.Println(err) return } fmt.Println(getResp.Kvs)
[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye" key:"/cron/jobs/job2" create_revision:12 mod_revision:12 version:1 value:"byehhhhhh" ]
Delete
//用於讀寫etcd的鍵值對 kv = clientv3.NewKV(client) delResp,err = kv.Delete(context.TODO(),"/cron/jobs/job2",clientv3.WithPrevKV()) if err != nil{ fmt.Println(err) return }else{ if len(delResp.PrevKvs) > 0 { for idx,kvpair = range delResp.PrevKvs{ idx = idx fmt.Println("刪除了",string(kvpair.Key),string(kvpair.Value)) } } }
byehhhhhh
刪除多個key
delResp,err = kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
續租:
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main(){ var ( config clientv3.Config err error client *clientv3.Client kv clientv3.KV lease clientv3.Lease leaseid clientv3.LeaseID leaseGrantResp *clientv3.LeaseGrantResponse putResp *clientv3.PutResponse getResp *clientv3.GetResponse //keepresp *clientv3.LeaseKeepAliveResponse //keepRestChan <-chan *clientv3.LeaseKeepAliveResponse ) //配置 config = clientv3.Config{ Endpoints:[]string{"192.168.1.188:2379"}, DialTimeout:time.Second*5, } //連接 床見一個客戶端 if client,err = clientv3.New(config);err != nil{ fmt.Println(err) return } //申請一個lease 租約 lease = clientv3.NewLease(client) //申請一個10秒的租約 if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{ fmt.Println(err) return } //拿到租約id leaseid = leaseGrantResp.ID //獲得kv api子集 kv = clientv3.NewKV(client) //put一個kv 讓它與租約關聯起來 從而實現10秒自動過期 if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{ fmt.Println(err) return } fmt.Println("寫入成功",putResp.Header.Revision) //定時的看一下key過期了沒有 for{ if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{ fmt.Println(err) return } if getResp.Count == 0{ fmt.Println("kv過期了") break } fmt.Println("還沒過期:",getResp.Kvs) time.Sleep(time.Second*2) } }
寫入成功 24 還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ] 還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ] 還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ] 還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ] 還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ] 還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ] kv過期了
永不過期的租約
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main(){ var ( config clientv3.Config err error client *clientv3.Client kv clientv3.KV lease clientv3.Lease leaseid clientv3.LeaseID leaseGrantResp *clientv3.LeaseGrantResponse putResp *clientv3.PutResponse getResp *clientv3.GetResponse keepresp *clientv3.LeaseKeepAliveResponse keepRestChan <-chan *clientv3.LeaseKeepAliveResponse ) //配置 config = clientv3.Config{ Endpoints:[]string{"192.168.1.188:2379"}, DialTimeout:time.Second*5, } //連接 床見一個客戶端 if client,err = clientv3.New(config);err != nil{ fmt.Println(err) return } //申請一個lease 租約 lease = clientv3.NewLease(client) //申請一個10秒的租約 if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{ fmt.Println(err) return } //拿到租約id leaseid = leaseGrantResp.ID //獲得kv api子集 kv = clientv3.NewKV(client) //自動續租 if keepRestChan,err = lease.KeepAlive(context.TODO(),leaseid);err != nil{ fmt.Println(err) return } //處理續租應答的協程 go func() { for { select { case keepresp = <-keepRestChan: if keepRestChan == nil{ fmt.Println("租約已失效了") goto END }else{//每秒會續租一次,所以就會收到一次應答 fmt.Println("收到自動續租的應答") } } } END: }() //put一個kv 讓它與租約關聯起來 從而實現10秒自動過期 if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{ fmt.Println(err) return } fmt.Println("寫入成功",putResp.Header.Revision) //定時的看一下key過期了沒有 for{ if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{ fmt.Println(err) return } if getResp.Count == 0{ fmt.Println("kv過期了") break } fmt.Println("還沒過期:",getResp.Kvs) time.Sleep(time.Second*2) } }
寫入成功 38 收到自動續租的應答 還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ] 還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ] 收到自動續租的應答 還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ] 還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ] 收到自動續租的應答 還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ] 還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ] 收到自動續租的應答
watch
監聽k v變化 常用作與集群中配置下發,狀態同步 非常有價值
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" "github.com/coreos/etcd/mvcc/mvccpb" ) func main() { var ( config clientv3.Config client *clientv3.Client err error kv clientv3.KV watcher clientv3.Watcher getResp *clientv3.GetResponse watchStartRevision int64 watchRespChan <-chan clientv3.WatchResponse watchResp clientv3.WatchResponse event *clientv3.Event ) // 客戶端配置 config = clientv3.Config{ Endpoints: []string{"36.111.184.221:2379"}, DialTimeout: 5 * time.Second, } // 建立連接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // KV kv = clientv3.NewKV(client) // 模擬etcd中KV的變化 go func() { for { kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7") kv.Delete(context.TODO(), "/cron/jobs/job7") time.Sleep(1 * time.Second) } }() // 先GET到當前的值,並監聽后續變化 if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil { fmt.Println(err) return } // 現在key是存在的 if len(getResp.Kvs) != 0 { fmt.Println("當前值:", string(getResp.Kvs[0].Value)) } // 當前etcd集群事務ID, 單調遞增的 watchStartRevision = getResp.Header.Revision + 1 // 創建一個watcher watcher = clientv3.NewWatcher(client) // 啟動監聽 fmt.Println("從該版本向后監聽:", watchStartRevision) ctx, cancelFunc := context.WithCancel(context.TODO()) time.AfterFunc(5 * time.Second, func() { cancelFunc() }) watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision)) // 處理kv變化事件 for watchResp = range watchRespChan { for _, event = range watchResp.Events { switch event.Type { case mvccpb.PUT: fmt.Println("修改為:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision) case mvccpb.DELETE: fmt.Println("刪除了", "Revision:", event.Kv.ModRevision) } } } }
當前值: i am job7 從該版本向后監聽: 72 刪除了 key:"/cron/jobs/job7" mod_revision:72 修改為: key:"/cron/jobs/job7" create_revision:73 mod_revision:73 version:1 value:"i am job7" 刪除了 key:"/cron/jobs/job7" mod_revision:74 修改為: key:"/cron/jobs/job7" create_revision:75 mod_revision:75 version:1 value:"i am job7" 刪除了 key:"/cron/jobs/job7" mod_revision:76 修改為: key:"/cron/jobs/job7" create_revision:77 mod_revision:77 version:1 value:"i am job7" 刪除了 key:"/cron/jobs/job7" mod_revision:78 修改為: key:"/cron/jobs/job7" create_revision:79 mod_revision:79 version:1 value:"i am job7" 刪除了 key:"/cron/jobs/job7" mod_revision:80 修改為: key:"/cron/jobs/job7" create_revision:81 mod_revision:81 version:1 value:"i am job7" 刪除了 key:"/cron/jobs/job7" mod_revision:82
op取代get put delete方法
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { var ( config clientv3.Config client *clientv3.Client err error kv clientv3.KV putOp clientv3.Op getOp clientv3.Op opResp clientv3.OpResponse ) // 客戶端配置 config = clientv3.Config{ Endpoints: []string{"36.111.184.221:2379"}, DialTimeout: 5 * time.Second, } // 建立連接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } kv = clientv3.NewKV(client) // 創建Op: operation putOp = clientv3.OpPut("/cron/jobs/job8", "123123123") // 執行OP if opResp, err = kv.Do(context.TODO(), putOp); err != nil { fmt.Println(err) return } // kv.Do(op) // kv.Put // kv.Get // kv.Delete fmt.Println("寫入Revision:", opResp.Put().Header.Revision) // 創建Op getOp = clientv3.OpGet("/cron/jobs/job8") // 執行OP if opResp, err = kv.Do(context.TODO(), getOp); err != nil { fmt.Println(err) return } // 打印 fmt.Println("數據Revision:", opResp.Get().Kvs[0].ModRevision) // create rev == mod rev fmt.Println("數據value:", string(opResp.Get().Kvs[0].Value)) }
事務txn實現分布式鎖
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { var ( config clientv3.Config client *clientv3.Client err error lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID keepRespChan <-chan *clientv3.LeaseKeepAliveResponse keepResp *clientv3.LeaseKeepAliveResponse ctx context.Context cancelFunc context.CancelFunc kv clientv3.KV txn clientv3.Txn txnResp *clientv3.TxnResponse ) // 客戶端配置 config = clientv3.Config{ Endpoints: []string{"36.111.184.221:2379"}, DialTimeout: 5 * time.Second, } // 建立連接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // lease實現鎖自動過期: // op操作 // txn事務: if else then // 1, 上鎖 (創建租約, 自動續租, 拿着租約去搶占一個key) lease = clientv3.NewLease(client) // 申請一個5秒的租約 if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { fmt.Println(err) return } // 拿到租約的ID leaseId = leaseGrantResp.ID // 准備一個用於取消自動續租的context ctx, cancelFunc = context.WithCancel(context.TODO()) // 確保函數退出后, 自動續租會停止 defer cancelFunc() defer lease.Revoke(context.TODO(), leaseId) // 5秒后會取消自動續租 if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { fmt.Println(err) return } // 處理續約應答的協程 go func() { for { select { case keepResp = <- keepRespChan: if keepRespChan == nil { fmt.Println("租約已經失效了") goto END } else { // 每秒會續租一次, 所以就會受到一次應答 fmt.Println("收到自動續租應答:", keepResp.ID) } } } END: }() // if 不存在key, then 設置它, else 搶鎖失敗 kv = clientv3.NewKV(client) // 創建事務 txn = kv.Txn(context.TODO()) // 定義事務 // 如果key不存在 txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)). Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("/cron/lock/job9")) // 否則搶鎖失敗 // 提交事務 if txnResp, err = txn.Commit(); err != nil { fmt.Println(err) return // 沒有問題 } // 判斷是否搶到了鎖 if !txnResp.Succeeded { fmt.Println("鎖被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } // 2, 處理業務 fmt.Println("處理任務") time.Sleep(5 * time.Second) // 3, 釋放鎖(取消自動續租, 釋放租約) // defer 會把租約釋放掉, 關聯的KV就被刪除了 }
執行結果:

