帶你入門etcd


etcd是什么?

etcd是一個用Go語言寫的,用於分布式系統下高性能的鍵值(K-V)存儲、服務發現、負載均衡、、分布式鎖、配置管理等場景地應用,類似於Java的zookeeper。基於Raft協議,能保證數據的一致性。

官方地址

[etcd.io] https://etcd.io

[github.com]https://github.com/etcd-io/etcd

etcd的安裝

有兩種方式安裝,可以通過編譯好的二進制或源碼安裝,我這里用的是二進制安裝的,另一個源碼安裝的我沒有試過。

另外,這里的安裝也不是集群方式安裝的,用的單結點的,只是一些基本的操作。

一、二進制安裝

  1. 點擊下面的地址,根椐自已的系統,選擇不同的壓縮包下載,如 MacOs下載etcd-v3.4.4-darwin-amd64.zip

[https://etcd.io/docs/v3.4.0/integrations/] https://etcd.io/docs/v3.4.0/integrations/
2. 將下載的壓縮包,解壓出來,見如下內容

 $ ls ~/Applications/etcd
Documentation       README-etcdctl.md   README.md           READMEv2-etcdctl.md etcd     etcdctl
  1. 上面的文件中,etcd是服務端; etcdctl是客戶端,里面集成了常用一些Api。

二、源碼安裝

因為是Go寫的,可直接使用 go get https://github.com/etcd-io/etcd 下載就行了,不過這個etcd包有點大,如果網絡不好,還不一定能下載下來,凡正我是沒整下來。

所以,如果下載不下來,可以嘗試用github的zip打包下載試試,如果也還是不行,那么可用國內的下載加速。

我在網上找到一個 [https://www.newbe.pro/Mirrors/Mirrors-Etcd/] https://www.newbe.pro/Mirrors/Mirrors-Etcd/,從這里面直接把壓縮包下載下來,放到你的Gopath下就行了。

接下來進入到etcd的目錄下面,執行 ./build構建就行了。

etcd啟動

不管用那種方式,要啟動etcd,先找到etcd這個文件,然后執行它.默認在你本機的2379端口,如果你要在公網開啟,指定下參數就行了.

$ ~/Applications/etcd/etcd  --advertise-client-urls="https://123.33.44.34:2379"  --listen-client-urls="https://123.33.44.34:2379"

本地如下方式啟動

$ ~/Applications/etcd/etcd
[WARNING] Deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead
2020-03-11 12:55:38.289362 I | etcdmain: etcd Version: 3.4.4
2020-03-11 12:55:38.289461 I | etcdmain: Git SHA: c65a9e2dd
2020-03-11 12:55:38.289468 I | etcdmain: Go Version: go1.12.12
2020-03-11 12:55:38.289476 I | etcdmain: Go OS/Arch: darwin/amd64
2020-03-11 12:55:38.289482 I | etcdmain: setting maximum number of CPUs to 4, total number of available CPUs is 4
2020-03-11 12:55:38.289492 N | etcdmain: failed to detect default host (default host not supported on darwin_amd64)
2020-03-11 12:55:38.289500 W | etcdmain: no data-dir provided, using default data-dir ./default.etcd
[WARNING] Deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead
2020-03-11 12:55:38.292027 I | embed: name = default
2020-03-11 12:55:38.292038 I | embed: data dir = default.etcd
2020-03-11 12:55:38.292045 I | embed: member dir = default.etcd/member
2020-03-11 12:55:38.292050 I | embed: heartbeat = 100ms
2020-03-11 12:55:38.292054 I | embed: election = 1000ms
2020-03-11 12:55:38.292058 I | embed: snapshot count = 100000
2020-03-11 12:55:38.292079 I | embed: advertise client URLs = http://localhost:2379
2020-03-11 12:55:38.407474 I | etcdserver: starting member 8e9e05c52164694d in cluster cdf818194e3a8c32

好了,看到上面的輸出,服務端就啟動好了,如果要測試,可以用etcdctl測試下。

etcdctl put  "task/task1" "task任務"
OK

etcdctl get  "task/task1"
task/task1
task任務

代碼操作

這里用Go對etcd進行一些簡單的操作,go的客戶端可以在官網上去找[https://etcd.io/docs/v3.4.0/integrations/] https://etcd.io/docs/v3.4.0/integrations/

我這里用的是clientv3這個包, 這個包在上面源碼安裝時,下載的包里面就有,那個包是服務端和客戶端代碼都有的。

Get操作

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main(){
	var (
		config clientv3.Config
		client *clientv3.Client
		kv clientv3.KV
		resp *clientv3.GetResponse
		err error
	)
	config = clientv3.Config{
		Endpoints:[]string{"localhost:2379"},
		DialTimeout:5 * time.Second,
	}

	if client,err = clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}

	kv = clientv3.NewKV(client)

	if resp,  err = kv.Get(context.TODO(),"cron/jobs/",clientv3.WithPrefix()); err != nil {
		fmt.Println(err)
		return
	}
	for k,v := range resp.Kvs {
		fmt.Println(k,v,string(v.Key), string(v.Value))
	}
}

續約操作

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main(){
	var (
		config clientv3.Config
		client *clientv3.Client
		Lease clientv3.Lease
		leaseGrantResp *clientv3.LeaseGrantResponse
		leaseId clientv3.LeaseID
		leaseKeepResp *clientv3.LeaseKeepAliveResponse
		leaseKeepRespChan <-chan *clientv3.LeaseKeepAliveResponse
		getResp *clientv3.GetResponse
		putResp *clientv3.PutResponse

		kv clientv3.KV
		err error
	)
	config = clientv3.Config{
		Endpoints:[]string{"localhost:2379"},
		DialTimeout:5 * time.Second,
	}

	if client,err = clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}

	Lease = clientv3.NewLease(client)

	if leaseGrantResp, err = Lease.Grant(context.TODO(), 10); err != nil {
		fmt.Println(err)
	}

	leaseId = leaseGrantResp.ID

	ctx, _ := context.WithTimeout(context.TODO(), 10 * time.Second) //10秒停止續約

	if leaseKeepRespChan, err = Lease.KeepAlive(ctx,leaseId); err != nil {
		fmt.Println(err)
	}

	//啟動協程,進行續約
	go func(){
		for {
			select {
			   case leaseKeepResp  = <-leaseKeepRespChan:
			   	if leaseKeepResp == nil{
			   		fmt.Println("租約失效了")
			   		goto END
				} else {
					fmt.Println("收到續約", leaseKeepResp)
				}
			}
		}
		END:
	}()

	kv = clientv3.NewKV(client)

	if putResp ,err = kv.Put(context.TODO(),"cron/jobs/job3","job3",clientv3.WithLease(leaseId)); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("寫入成功", putResp.Header.Revision)
	}

	for {
		if getResp, err = kv.Get(context.TODO(),"cron/jobs/job3"); err != nil {
			fmt.Println(err)
		}

		if getResp.Count != 0 {
			fmt.Println(getResp.Kvs)
		} else {
			fmt.Println(" 過期了")
			break
		}

		time.Sleep(2 * time.Second)
	}


}

Op操作

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main(){
	var (
		config clientv3.Config
		client *clientv3.Client
		opPut clientv3.Op
		opGet clientv3.Op
		opResp  clientv3.OpResponse
		kv clientv3.KV

		err error
	)
	config = clientv3.Config{
		Endpoints:[]string{"localhost:2379"},
		DialTimeout:5 * time.Second,
	}

	if client,err = clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}

	kv = clientv3.NewKV(client)

	opPut = clientv3.OpPut("cron/jobs/job3","job3333")

	if opResp, err  = kv.Do(context.TODO(),opPut); err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("獲取revision", opResp.Put().Header.Revision)


	opGet = clientv3.OpGet("cron/jobs/job3")
	if opResp ,err = kv.Do(context.TODO(),opGet); err != nil {
		fmt.Println(err)
		return
	}

	if len(opResp.Get().Kvs) != 0 {
		fmt.Println("獲取值:", string(opResp.Get().Kvs[0].Value))
	}

}

簡單分布式鎖

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

//簡單分布式鎖

func main(){
	var (
		config clientv3.Config
		client *clientv3.Client
		/*opPut clientv3.Op
		opGet clientv3.Op
		opResp  clientv3.OpResponse*/
		kv clientv3.KV
		txn clientv3.Txn
		txnResp *clientv3.TxnResponse

		Lease clientv3.Lease
		leaseGrantResp *clientv3.LeaseGrantResponse
		leaseId clientv3.LeaseID
		leaseKeepResp *clientv3.LeaseKeepAliveResponse
		leaseKeepRespChan <-chan *clientv3.LeaseKeepAliveResponse

		ctx context.Context
		cancelFunc context.CancelFunc
		err error
	)
	config = clientv3.Config{
		Endpoints:[]string{"localhost:2379"},
		DialTimeout:5 * time.Second,
	}

	if client,err = clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}

	//租約操作
	Lease = clientv3.NewLease(client)

	if leaseGrantResp, err = Lease.Grant(context.TODO(), 5); err != nil {
		fmt.Println(err)
	}

	leaseId = leaseGrantResp.ID

	ctx, cancelFunc = context.WithCancel(context.TODO())

	defer cancelFunc()
	defer Lease.Revoke(context.TODO(),leaseId)

	if leaseKeepRespChan, err = Lease.KeepAlive(ctx,leaseId); err != nil {
		fmt.Println(err)
	}

	//啟動協程,進行續約
	go func(){
		for {
			select {
			case leaseKeepResp  = <-leaseKeepRespChan:
				if leaseKeepResp == nil{
					fmt.Println("租約失效了")
					goto END
				} else {
					fmt.Println("收到續約", leaseKeepResp)
				}
			}
		}
	END:
	}()



	kv = clientv3.NewKV(client)

	//開啟事務
	txn = kv.Txn(context.TODO())

	txn.If(clientv3.Compare(clientv3.CreateRevision("cron/lock/job3"),"=", 0)).
		Then(clientv3.OpPut("cron/lock/job3","123",clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet("cron/lock/job3"))

	if txnResp,err  = txn.Commit(); err != nil {
		fmt.Println(err)
		return
	}

	if !txnResp.Succeeded {
		fmt.Println("OOH,鎖被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
		return
	} else {
		fmt.Println("搶到了")
	}

	fmt.Println("處理業務")
	time.Sleep( 10 * time.Second)



}

以上就是記錄我在學習etcd的基本入門和一些常用的操作,希望大家能喜歡,期待和大家的一些進步。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM