etcd(實時共享配置信息)


前言

在分布式集群架構中各個組件之間如何解決以下2個關鍵問題?

1.配置共享:共享同一份配置文件,如果這份配置文件更新之后,各個組件如何馬上得知(我就是沖着watch for changes來的....)?

2.服務注冊發現:集群中新增節點如何做到自動發現?

 

etcd簡介

 

etcd是Go語言開發的一個開源的、支持分布式的、高可用的key-value存儲系統。

可用於組冊發現、配置共享中心。

A distributed, reliable key-value store for the most critical data of a distributed system.

是什么優勢讓etcd官網說 for the most critical data?

優勢:

完全復制:etcd集群中的每個節點都可以使用完整的文檔

高可用:防止單點故障有leader和follower的選舉機制

一致性:每次讀取都會返回跨多主機的最新寫入

部署簡單:二進制直接運行

安全:支持TSL

數據可靠:使用Raft算法實現各etcd間存儲的數據強一致性。

 ps:etcd使用raft算法實現了分布式鎖。保證了etcd集群中節點中存儲的數據永遠一致,

也就是說1個Python程序和1個Go程序同時向etcd中put同1個key是有鎖的。

 

架構

 

 

從etcd的架構圖中可以看到,etcd主要分為4個部分。

http server:用戶client發送的API操作請求、以及其他etcd節點的同步和心跳新校區

store:處理etcd支持的各類功能事物,包括索引、節點變更、監控反饋、事件處理和執行,是API請求的具體實現。

Raft:etcd集群中數據一致性的關鍵

Wal(write ahead log):預寫日志,是etcd數據存儲的方式,實現持久化存儲。

 

使用etcd

2379端口:接收client的http請求

2380端口:用於etcd集群節點間通信

 

啟動

root@zhanggen etcd-v3.3.18-linux-amd64]# ./etcd --name master1 --data-dir /data/etcd/ --wal-dir /data/etcd/wal/ --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
2020-05-21 01:35:01.096751 I | etcdmain: etcd Version: 3.3.18
2020-05-21 01:35:01.096801 I | etcdmain: Git SHA: 3c8740a79
2020-05-21 01:35:01.096806 I | etcdmain: Go Version: go1.12.9
2020-05-21 01:35:01.096809 I | etcdmain: Go OS/Arch: linux/amd64
2020-05-21 01:35:01.096812 I | etcdmain: setting maximum number of CPUs to 1, total number of available CPUs is 1
2020-05-21 01:35:01.096820 I | etcdmain: advertising using detected default host "192.168.56.133"
2020-05-21 01:35:01.096852 N | etcdmain: the server is already initialized as member before, starting as etcd member...
2020-05-21 01:35:01.097083 I | embed: listening for peers on http://0.0.0.0:2381
2020-05-21 01:35:01.097118 I | embed: listening for client requests on 0.0.0.0:2371
2020-05-21 01:35:01.102854 I | etcdserver: name = master1
2020-05-21 01:35:01.102869 I | etcdserver: data dir = /data/etcd/
2020-05-21 01:35:01.102877 I | etcdserver: member dir = /data/etcd/member
2020-05-21 01:35:01.102879 I | etcdserver: dedicated WAL dir = /data/etcd/wal/
2020-05-21 01:35:01.102882 I | etcdserver: heartbeat = 100ms
2020-05-21 01:35:01.102885 I | etcdserver: election = 1000ms
2020-05-21 01:35:01.102887 I | etcdserver: snapshot count = 100000
2020-05-21 01:35:01.102895 I | etcdserver: advertise client URLs = http://0.0.0.0:2371
2020-05-21 01:35:01.102899 I | etcdserver: initial advertise peer URLs = http://192.168.56.133:2381
2020-05-21 01:35:01.102905 I | etcdserver: initial cluster = master1=http://192.168.56.133:2381
2020-05-21 01:35:01.112656 I | etcdserver: starting member 36d9af938bdf88c5 in cluster 24d7db2fa7e0796c
2020-05-21 01:35:01.112685 I | raft: 36d9af938bdf88c5 became follower at term 0
2020-05-21 01:35:01.112696 I | raft: newRaft 36d9af938bdf88c5 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
2020-05-21 01:35:01.112699 I | raft: 36d9af938bdf88c5 became follower at term 1
2020-05-21 01:35:01.115541 W | auth: simple token is not cryptographically signed
2020-05-21 01:35:01.117263 I | etcdserver: starting server... [version: 3.3.18, cluster version: to_be_decided]
2020-05-21 01:35:01.119207 I | etcdserver: 36d9af938bdf88c5 as single-node; fast-forwarding 9 ticks (election ticks 10)
2020-05-21 01:35:01.119674 I | etcdserver/membership: added member 36d9af938bdf88c5 [http://192.168.56.133:2381] to cluster 24d7db2fa7e0796c
2020-05-21 01:35:01.413844 I | raft: 36d9af938bdf88c5 is starting a new election at term 1
2020-05-21 01:35:01.413947 I | raft: 36d9af938bdf88c5 became candidate at term 2
2020-05-21 01:35:01.414120 I | raft: 36d9af938bdf88c5 received MsgVoteResp from 36d9af938bdf88c5 at term 2
2020-05-21 01:35:01.414179 I | raft: 36d9af938bdf88c5 became leader at term 2
2020-05-21 01:35:01.414206 I | raft: raft.node: 36d9af938bdf88c5 elected leader 36d9af938bdf88c5 at term 2
2020-05-21 01:35:01.415431 I | etcdserver: setting up the initial cluster version to 3.3
2020-05-21 01:35:01.420094 N | etcdserver/membership: set the initial cluster version to 3.3
2020-05-21 01:35:01.420297 I | etcdserver/api: enabled capabilities for version 3.3
2020-05-21 01:35:01.420398 I | etcdserver: published {Name:master1 ClientURLs:[http://0.0.0.0:2371]} to cluster 24d7db2fa7e0796c
2020-05-21 01:35:01.421353 E | etcdmain: forgot to set Type=notify in systemd service file?
2020-05-21 01:35:01.421407 I | embed: ready to serve client requests
2020-05-21 01:35:01.424229 N | embed: serving insecure client requests on [::]:2371, this is strongly discouraged!

 

測試

[root@zhanggen etcd-v3.3.18-linux-amd64]# export ETCDCTL_API=3
[root@zhanggen etcd-v3.3.18-linux-amd64]# ./etcdctl --endpoints=192.168.56.133:2371 put name "Martin"
OK
[root@zhanggen etcd-v3.3.18-linux-amd64]# ./etcdctl --endpoints=192.168.56.133:2371 get name 
name
Martiin
[root@zhanggen etcd-v3.3.18-linux-amd64]# 

 

go連接etcd

D:\goproject\src\jd.com\etcdDemo>go get go.etcd.io/etcd/clientv3
go: downloading google.golang.org/grpc v1.26.0
go: downloading github.com/golang/protobuf v1.3.2
go: extracting github.com/golang/protobuf v1.3.2
go: extracting google.golang.org/grpc v1.26.0

 

 put和get模式

package main

import (
	"context"
	"go.etcd.io/etcd/clientv3"
	"fmt"
	"time"
)

func main() {
	//創建1個連接etcd的連接
	client,err:=clientv3.New(clientv3.Config{
		Endpoints: []string{"192.168.56.133:2371"},
		DialTimeout: 5*time.Second,
	})
	if err!=nil{
		fmt.Println("I try to connect etcd fiald ",err)
	}
	fmt.Println("The connection to etcd was connected!")
	//記得關閉連接
	defer client.Close()

	//put值
	ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
	//設置1秒鍾超時時間
	key:="name"
	_,err=client.Put(ctx,key,"Martin")
	cancel()
	if err!=nil{
		fmt.Println("The instraction put value to etcd was faild.")
	}
	//get值
	ctx,cancel=context.WithTimeout(context.Background(),time.Second)
	//查詢所有以name開頭的key(支持模糊查詢)
	response,err:=client.Get(ctx,key,clientv3.WithPrefix())
	cancel()
	if err!=nil{
		fmt.Println("The instraction get value from etcd was faild.")
	}
	//fmt.Println(response)
	/*
	&{cluster_id:2654831503084648812 member_id:3952383196236056773 revision:3 raft_term:2  [key:"name" create_revision:2 mod_revision:3 version:2 value:",martin"
	] false 1 {} [] 0}
	*/
	for _,ev:=range response.Kvs{
		fmt.Printf("key:%s value:%s\n",ev.Key,ev.Value)
	}

}

  

watch模式

排1個哨兵去監控key發生的事件,馬上通知給我!(watcher模式可以實現配置文件的熱加載!)

package main

import (
	"context"
	"go.etcd.io/etcd/clientv3"
	"fmt"
	"time"
)

func main() {
	//創建1個連接etcd的連接
	client,err:=clientv3.New(clientv3.Config{
		Endpoints: []string{"192.168.56.133:2371"},
		DialTimeout: 5*time.Second,
	})
	if err!=nil{
		fmt.Println("I try to connect etcd fiald ",err)
	}
	fmt.Println("The connection to etcd was connected!")
	//記得關閉連接
	defer client.Close()

	//put值
	ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
	//設置1秒鍾超時時間
	key:="name"
	_,err=client.Put(ctx,key,"Martin")
	cancel()
	if err!=nil{
		fmt.Println("The instraction put value to etcd was faild.")
	}
	//get值
	ctx,cancel=context.WithTimeout(context.Background(),time.Second)
	//查詢所有以name開頭的key(支持模糊查詢)
	response,err:=client.Get(ctx,key,clientv3.WithPrefix())
	cancel()
	if err!=nil{
		fmt.Println("The instraction get value from etcd was faild.")
	}
	//fmt.Println(response)
	/*
	&{cluster_id:2654831503084648812 member_id:3952383196236056773 revision:3 raft_term:2  [key:"name" create_revision:2 mod_revision:3 version:2 value:",martin"
	] false 1 {} [] 0}
	*/
	for _,ev:=range response.Kvs{
		fmt.Printf("key:%s value:%s\n",ev.Key,ev.Value)
	}
	ctx=context.TODO()
	//安插1個探針 監控name key,返回1個通道
	monitorChanel:=client.Watch(ctx,key)
	//不斷從通道中獲取監控事件
	for monitor:= range monitorChanel{
		//獲取監控事件
		for _,event:=range monitor.Events{
			fmt.Printf("type:%v,key:%v,value:%v\n",event.Type,string(event.Kv.Key),string(event.Kv.Value))
		}

	}


}

  

利用etcd的watcher功能實現配置信息熱加載

package taillog

import (
	"fmt"
	"time"
	"jd.com/logagent/etcd"
)

//定義1個全局的taskpool
var tiallpoolObj *tailPool

type tailPool struct {
	//保存從etcd中獲取的所有logAgent配置
	logConfigs            []*etcd.LogEntry
	taskMaping            map[string]*TaillTask
	watchNewConfigChannel chan []*etcd.LogEntry
}

func InitTaskPool(logConfigList []*etcd.LogEntry) {
	//初始化1個taill連接池
	tiallpoolObj = &tailPool{
		logConfigs:            logConfigList, //把當前獲取的配置項保存起來
		taskMaping:            make(map[string]*TaillTask,32),
		watchNewConfigChannel: make(chan []*etcd.LogEntry), //無緩沖區通道(沒有值1一直阻塞)

	}
	//在pool中初始化日志采集task
	for _, cfg := range logConfigList {
		//生成真正的日志采集模塊
		var taiiobj TaillTask
		task,err:=taiiobj.NewTaillTask(cfg.Path, cfg.Topic)
		taskKey := fmt.Sprint(cfg.Path, cfg.Topic)
		tiallpoolObj.taskMaping[taskKey] = task
		if err!=nil {
			fmt.Printf("初始化%s采集日志模塊失敗%s",cfg.Path,err)
		}

	}
	//開啟日志采集池
	go tiallpoolObj.run()

}

//watchNewConfigChannel 配置更新之后,做對應的處理
//1.配置新增
//2.配置刪除
//3.配置變更

func (T *tailPool)seekDifference(confs[]*etcd.LogEntry,confsMap map[string]*TaillTask)(difference []*etcd.LogEntry){
	for _, conf := range confs{
		MK := fmt.Sprint(conf.Path, conf.Topic)
		_, ok := confsMap[MK]
		if !ok {
			fmt.Println("檢查到key發生變化----->:",MK)
			difference= append(difference,conf)
		}
		continue

	}
	return
}

func (T *tailPool) run() {
	for {
		select {
		case newConfSlice := <-T.watchNewConfigChannel:
			fmt.Println("---------新的配置來了---------")
			//增加
			addList:=T.seekDifference(newConfSlice,T.taskMaping)
			for _, conf := range addList {
				var taiiobj TaillTask
				task,err:=taiiobj.NewTaillTask(conf.Path, conf.Topic)
				if err!=nil {
					fmt.Println("初始化采集日志模塊失敗",err)
				}
				taskKey := fmt.Sprint(conf.Path, conf.Topic)
				tiallpoolObj.taskMaping[taskKey] = task
				fmt.Printf("增加%s日志采集模塊成功\n",taskKey)
			}
			//刪除
			newTaskMaping:=make(map[string]*TaillTask,32)
			for _, cnf := range newConfSlice {
				mk:=fmt.Sprint(cnf.Path,cnf.Topic)
				newTaskMaping[mk]=nil
			}
			deleteList:=T.seekDifference(T.logConfigs,newTaskMaping)
			for _, item := range deleteList {
				taskKey := fmt.Sprint(item.Path, item.Topic)
				T.taskMaping[taskKey].exit()
				delete(T.taskMaping,taskKey)
			}
			//更新logConfigs
			tiallpoolObj.logConfigs=newConfSlice
			fmt.Println("最新日志采集任務列表",T.taskMaping)
		default:
			time.Sleep(time.Second)

		}

	}
}

//向外暴露watchNewConfigChannel
func PushNewConfig() chan<- []*etcd.LogEntry {
	return tiallpoolObj.watchNewConfigChannel
}

  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM