golang中使用etcd


package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main(){
    var (
        config clientv3.Config
        err error
        client *clientv3.Client
    )
    //配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.1.188:2379"},
        DialTimeout:time.Second*5,
    }
    //連接
    if client,err = clientv3.New(config);err != nil{
        fmt.Println(err)
        return
    }
    client=client
}

 

 

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main(){
    var (
        config clientv3.Config
        err error
        client *clientv3.Client
        kv clientv3.KV
        putResp *clientv3.PutResponse

    )
    //配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.1.188:2379"},
        DialTimeout:time.Second*5,
    }
    //連接 床見一個客戶端
    if client,err = clientv3.New(config);err != nil{
        fmt.Println(err)
        return
    }
    //用於讀寫etcd的鍵值對
    kv = clientv3.NewKV(client)
    putResp, err = kv.Put(context.TODO(),"/cron/jobs/job1","bye",clientv3.WithPrevKV())
    if err != nil{
        fmt.Println(err)
    }else{
        //獲取版本信息
        fmt.Println("Revision:",putResp.Header.Revision)
        if putResp.PrevKv != nil{
            fmt.Println("key:",string(putResp.PrevKv.Key))
            fmt.Println("Value:",string(putResp.PrevKv.Value))
            fmt.Println("Version:",string(putResp.PrevKv.Version))
        }
    }
}

 

Revision: 10
key: /cron/jobs/job1
Value: hello
Version: 

get

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main(){
    var (
        config clientv3.Config
        err error
        client *clientv3.Client
        kv clientv3.KV
        getResp *clientv3.GetResponse

    )
    //配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.1.188:2379"},
        DialTimeout:time.Second*5,
    }
    //連接 床見一個客戶端
    if client,err = clientv3.New(config);err != nil{
        fmt.Println(err)
        return
    }


    //用於讀寫etcd的鍵值對
    kv = clientv3.NewKV(client)

    getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1")
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(getResp.Kvs)
}

 

[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye" ]

 

with用法

//用於讀寫etcd的鍵值對
    kv = clientv3.NewKV(client)

    getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1",clientv3.WithCountOnly())
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(getResp.Kvs,getResp.Count)
[] 1

 

讀取前綴

//用於讀寫etcd的鍵值對
    kv = clientv3.NewKV(client)

    //讀取前綴
    getResp,err = kv.Get(context.TODO(),"/cron/jobs/",clientv3.WithPrefix())
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(getResp.Kvs)

 

[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye"  
key:"/cron/jobs/job2" create_revision:12 mod_revision:12 version:1 value:"byehhhhhh" ]

 

Delete

    //用於讀寫etcd的鍵值對
    kv = clientv3.NewKV(client)

    delResp,err = kv.Delete(context.TODO(),"/cron/jobs/job2",clientv3.WithPrevKV())
    if err != nil{
        fmt.Println(err)
        return
    }else{
        if len(delResp.PrevKvs) > 0 {
            for idx,kvpair = range delResp.PrevKvs{
                idx = idx
                fmt.Println("刪除了",string(kvpair.Key),string(kvpair.Value))
            }
        }
    }

 

byehhhhhh

 

刪除多個key

delResp,err = kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())

續租:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main(){
    var (
        config clientv3.Config
        err error
        client *clientv3.Client
        kv clientv3.KV
        lease clientv3.Lease
        leaseid clientv3.LeaseID
        leaseGrantResp *clientv3.LeaseGrantResponse
        putResp *clientv3.PutResponse
        getResp *clientv3.GetResponse
        //keepresp *clientv3.LeaseKeepAliveResponse
        //keepRestChan <-chan *clientv3.LeaseKeepAliveResponse

    )
    //配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.1.188:2379"},
        DialTimeout:time.Second*5,
    }
    //連接 床見一個客戶端
    if client,err = clientv3.New(config);err != nil{
        fmt.Println(err)
        return
    }




    //申請一個lease 租約
    lease = clientv3.NewLease(client)

    //申請一個10秒的租約
    if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{
        fmt.Println(err)
        return
    }



    //拿到租約id
    leaseid = leaseGrantResp.ID

    //獲得kv api子集
    kv = clientv3.NewKV(client)


    //put一個kv 讓它與租約關聯起來 從而實現10秒自動過期
    if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{
        fmt.Println(err)
        return
    }

    fmt.Println("寫入成功",putResp.Header.Revision)

    //定時的看一下key過期了沒有
    for{
        if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{
            fmt.Println(err)
            return
        }
        if getResp.Count == 0{
            fmt.Println("kv過期了")
            break
        }
        fmt.Println("還沒過期:",getResp.Kvs)
        time.Sleep(time.Second*2)
    }
}

 

寫入成功 24
還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
還沒過期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
kv過期了

 

永不過期的租約

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main(){
    var (
        config clientv3.Config
        err error
        client *clientv3.Client
        kv clientv3.KV
        lease clientv3.Lease
        leaseid clientv3.LeaseID
        leaseGrantResp *clientv3.LeaseGrantResponse
        putResp *clientv3.PutResponse
        getResp *clientv3.GetResponse
        keepresp *clientv3.LeaseKeepAliveResponse
        keepRestChan <-chan *clientv3.LeaseKeepAliveResponse

    )
    //配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.1.188:2379"},
        DialTimeout:time.Second*5,
    }
    //連接 床見一個客戶端
    if client,err = clientv3.New(config);err != nil{
        fmt.Println(err)
        return
    }




    //申請一個lease 租約
    lease = clientv3.NewLease(client)

    //申請一個10秒的租約
    if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{
        fmt.Println(err)
        return
    }

    //拿到租約id
    leaseid = leaseGrantResp.ID

    //獲得kv api子集
    kv = clientv3.NewKV(client)


    //自動續租
    if keepRestChan,err = lease.KeepAlive(context.TODO(),leaseid);err != nil{
        fmt.Println(err)
        return
    }
    //處理續租應答的協程
    go func() {
        for {
            select {
                case keepresp = <-keepRestChan:
                    if keepRestChan == nil{
                        fmt.Println("租約已失效了")
                        goto END
                    }else{//每秒會續租一次,所以就會收到一次應答
                        fmt.Println("收到自動續租的應答")
                    }
            }
        }
        END:
    }()





    //put一個kv 讓它與租約關聯起來 從而實現10秒自動過期
    if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{
        fmt.Println(err)
        return
    }

    fmt.Println("寫入成功",putResp.Header.Revision)

    //定時的看一下key過期了沒有
    for{
        if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{
            fmt.Println(err)
            return
        }
        if getResp.Count == 0{
            fmt.Println("kv過期了")
            break
        }
        fmt.Println("還沒過期:",getResp.Kvs)
        time.Sleep(time.Second*2)
    }
}
View Code
寫入成功 38
收到自動續租的應答
還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
收到自動續租的應答
還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
收到自動續租的應答
還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
還沒過期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
收到自動續租的應答

 

watch 

  監聽k v變化  常用作與集群中配置下發,狀態同步 非常有價值

 

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
    "github.com/coreos/etcd/mvcc/mvccpb"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        kv clientv3.KV
        watcher clientv3.Watcher
        getResp *clientv3.GetResponse
        watchStartRevision int64
        watchRespChan <-chan clientv3.WatchResponse
        watchResp clientv3.WatchResponse
        event *clientv3.Event
    )

    // 客戶端配置
    config = clientv3.Config{
        Endpoints: []string{"36.111.184.221:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 建立連接
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    // KV
    kv = clientv3.NewKV(client)

    // 模擬etcd中KV的變化
    go func() {
        for {
            kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")

            kv.Delete(context.TODO(), "/cron/jobs/job7")

            time.Sleep(1 * time.Second)
        }
    }()

    // 先GET到當前的值,並監聽后續變化
    if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
        fmt.Println(err)
        return
    }

    // 現在key是存在的
    if len(getResp.Kvs) != 0 {
        fmt.Println("當前值:", string(getResp.Kvs[0].Value))
    }

    // 當前etcd集群事務ID, 單調遞增的
    watchStartRevision = getResp.Header.Revision + 1

    // 創建一個watcher
    watcher = clientv3.NewWatcher(client)

    // 啟動監聽
    fmt.Println("從該版本向后監聽:", watchStartRevision)

    ctx, cancelFunc := context.WithCancel(context.TODO())
    time.AfterFunc(5 * time.Second, func() {
        cancelFunc()
    })

    watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

    // 處理kv變化事件
    for watchResp = range watchRespChan {
        for _, event = range watchResp.Events {
            switch event.Type {
            case mvccpb.PUT:
                fmt.Println("修改為:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
            case mvccpb.DELETE:
                fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
            }
        }
    }
}

 

當前值: i am job7
從該版本向后監聽: 72
刪除了 key:"/cron/jobs/job7" mod_revision:72 
修改為: key:"/cron/jobs/job7" create_revision:73 mod_revision:73 version:1 value:"i am job7" 
刪除了 key:"/cron/jobs/job7" mod_revision:74 
修改為: key:"/cron/jobs/job7" create_revision:75 mod_revision:75 version:1 value:"i am job7" 
刪除了 key:"/cron/jobs/job7" mod_revision:76 
修改為: key:"/cron/jobs/job7" create_revision:77 mod_revision:77 version:1 value:"i am job7" 
刪除了 key:"/cron/jobs/job7" mod_revision:78 
修改為: key:"/cron/jobs/job7" create_revision:79 mod_revision:79 version:1 value:"i am job7" 
刪除了 key:"/cron/jobs/job7" mod_revision:80 
修改為: key:"/cron/jobs/job7" create_revision:81 mod_revision:81 version:1 value:"i am job7" 
刪除了 key:"/cron/jobs/job7" mod_revision:82 

 

op取代get put delete方法

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        kv clientv3.KV
        putOp clientv3.Op
        getOp clientv3.Op
        opResp clientv3.OpResponse
    )

    // 客戶端配置
    config = clientv3.Config{
        Endpoints: []string{"36.111.184.221:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 建立連接
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    kv = clientv3.NewKV(client)

    // 創建Op: operation
    putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")

    // 執行OP
    if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
        fmt.Println(err)
        return
    }

    // kv.Do(op)

    // kv.Put
    // kv.Get
    // kv.Delete

    fmt.Println("寫入Revision:", opResp.Put().Header.Revision)

    // 創建Op
    getOp = clientv3.OpGet("/cron/jobs/job8")

    // 執行OP
    if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
        fmt.Println(err)
        return
    }

    // 打印
    fmt.Println("數據Revision:", opResp.Get().Kvs[0].ModRevision)    // create rev == mod rev
    fmt.Println("數據value:", string(opResp.Get().Kvs[0].Value))
}
View Code

 

事務txn實現分布式鎖

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
        keepResp *clientv3.LeaseKeepAliveResponse
        ctx context.Context
        cancelFunc context.CancelFunc
        kv clientv3.KV
        txn clientv3.Txn
        txnResp *clientv3.TxnResponse
    )

    // 客戶端配置
    config = clientv3.Config{
        Endpoints: []string{"36.111.184.221:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 建立連接
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    // lease實現鎖自動過期:
    // op操作
    // txn事務: if else then

    // 1, 上鎖 (創建租約, 自動續租, 拿着租約去搶占一個key)
    lease = clientv3.NewLease(client)

    // 申請一個5秒的租約
    if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
        fmt.Println(err)
        return
    }

    // 拿到租約的ID
    leaseId = leaseGrantResp.ID

    // 准備一個用於取消自動續租的context
    ctx, cancelFunc = context.WithCancel(context.TODO())

    // 確保函數退出后, 自動續租會停止
    defer cancelFunc()
    defer lease.Revoke(context.TODO(), leaseId)

    // 5秒后會取消自動續租
    if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
        fmt.Println(err)
        return
    }

    // 處理續約應答的協程
    go func() {
        for {
            select {
            case keepResp = <- keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("租約已經失效了")
                    goto END
                } else {    // 每秒會續租一次, 所以就會受到一次應答
                    fmt.Println("收到自動續租應答:", keepResp.ID)
                }
            }
        }
    END:
    }()

    //  if 不存在key, then 設置它, else 搶鎖失敗
    kv = clientv3.NewKV(client)

    // 創建事務
    txn = kv.Txn(context.TODO())

    // 定義事務

    // 如果key不存在
    txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
        Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
        Else(clientv3.OpGet("/cron/lock/job9")) // 否則搶鎖失敗

    // 提交事務
    if txnResp, err = txn.Commit(); err != nil {
        fmt.Println(err)
        return // 沒有問題
    }

    // 判斷是否搶到了鎖
    if !txnResp.Succeeded {
        fmt.Println("鎖被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
        return
    }

    // 2, 處理業務

    fmt.Println("處理任務")
    time.Sleep(5 * time.Second)

    // 3, 釋放鎖(取消自動續租, 釋放租約)
    // defer 會把租約釋放掉, 關聯的KV就被刪除了
}

 執行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM