配置文件熱加載的go語言實現


通常我們更新應用程序的配置文件,都需要手動重啟程序或手動重新加載配置。假設一組服務部署在10台機器上,你需要借助批量運維工具執行重啟命令,而且10台同時重啟可能還會造成服務短暫不可用。要是更新配置后,服務自動刷新配置多好...今天我們就用go實現配置文件熱加載的小功能,以后更新配置再也不用手動重啟了...

1 基本思路

通常應用程序啟動的流程:加載配置,然后run()。我們怎么做到熱加載呢?我們的思路是這樣的:

【1】在加載配置文件之后,啟動一個線程

【2】該線程定時監聽這個配置文件是否有改動

【3】如果配置文件有變動,就重新加載一下

【4】重新加載之后通知需要使用這些配置的應用程序(進程或線程),實際上就是刷新內存中配置

2 加載配置

首先我們要實現加載配置功能。假設配置文件是k=v格式的,如下:

那我們得寫一個解析配置的包了...讓我們一起面向對象:

type Config struct{
	filename string
	data map[string]string
	lastModifyTime int64
	rwLock sync.RWMutex
	notifyList []Notifyer
}

filename string 配置文件名稱

data map[string]string  將配置文件中的k/v解析存放到map中

lastModifyTime int64   記錄配置文件上一次更改時間

rwLock sync.RWMutex   讀寫鎖,處理這樣一種競爭情況:更新這個結構體時其他線程正在讀取改結構體中的內容,后續用到的時候會講

notifyList []Notifyer  存放所有觀察者,此處我們用到了觀察者模式,也就是需要用到這個配置的對象,我們就把它加到這個切片。當配置更新之后,通知切片中的對象配置更新了。

接下來我們可以給這個結構體添加一些方法了:

2.1 構造函數

func NewConfig(file string)(conf *Config, err error){
	conf = &Config{
		filename: file,
		data: make(map[string]string, 1024),
	}

	m, err := conf.parse()
	if err != nil {
		fmt.Printf("parse conf error:%v\n", err)
		return
	}

	// 將解析配置文件后的數據更新到結構體的map中,寫鎖
	conf.rwLock.Lock()
	conf.data = m
	conf.rwLock.Unlock()

	// 啟一個后台線程去檢測配置文件是否更改
	go conf.reload()
	return
}

構造函數做了三件事:【1】初始化Config 【2】調用parse()函數,解析配置文件,並把解析后的map更新到Config 【3】啟動一個線程,准確說是啟動一個goroutine,即reload() 

注意此處更新map時加了寫鎖了,目的在於不影響擁有讀鎖的線程讀取數據。

2.2 parse()

解析函數比較簡單,主要是讀取配置文件,一行行解析,數據存放在map中。

func (c *Config) parse() (m map[string]string, err error) {
	// 如果在parse()中定義一個map,這樣就是一個新的map不用加鎖
	m = make(map[string]string, 1024)

	f, err := os.Open(c.filename)
	if err != nil {
		return
	}
	defer f.Close()

	reader := bufio.NewReader(f)
	// 聲明一個變量存放讀取行數
	var lineNo int
	for {
		line, errRet := reader.ReadString('\n')
		if errRet == io.EOF {
			// 這里有一個坑,最后一行如果不是\n結尾會漏讀
			lineParse(&lineNo, &line, &m)
			break
		}
		if errRet != nil {
			err = errRet
			return
		}
		
		lineParse(&lineNo, &line, &m)
	}

	return 
}

func lineParse(lineNo *int, line *string, m *map[string]string) {
		*lineNo++

		l := strings.TrimSpace(*line)
		// 如果空行 或者 是注釋 跳過
		if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
			return
		}

		itemSlice := strings.Split(l, "=")
		// =
		if len(itemSlice) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}

		key := strings.TrimSpace(itemSlice[0])
		if len(key) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}
		if len(key) == 1 {
			(*m)[key] = ""
			return
		}

		value := strings.TrimSpace(itemSlice[1])
		(*m)[key] = value	

		return
}

這里我寫了兩個函數。lineParse()是解析每一行配置的。parse()就是解析的主函數,在parse()中我調用了兩次lineParse()。原因是在使用bufio按行讀取配置文件的時候,有時候會出現這樣的情況:有的人在寫配置文件的時候,最后一行沒有換行,也就是沒有‘\n’,然后我們就直接讀到io.EOF了,這時候如果直接break就會導致最后一行丟失。所以這種情況下我們應該在break之前調用lineParse()把最后一行處理了。

3 封裝接口

上面我們已經實現了讀取配置文件,並放到一個Config示例中,我們需要為這個Config封裝一些接口,方便用戶通過接口訪問Config的內容。這步比較簡單:

func (c *Config) GetInt(key string)(value int, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	value, err = strconv.Atoi(str)
	return
}

func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		value = defaultInt
		return
	}
	value, err := strconv.Atoi(str)
	if err != nil {
		value = defaultInt
	}
	return
}

func (c *Config) GetString(key string)(value string, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	return
}

func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		value = defaultStr
		return
	}
	return
}

如上,一共封裝了4個接口:

GetInt(key string)(value int, err error)   通過key獲取value,並將value轉成int類型

GetIntDefault(key string, defaultInt int)(value int)    通過key獲取value,並將value轉成int類型;如果獲取失敗,使用默認值

GetString(key string)(value string, err error)    通過key獲取value,默認value為string類型

GetIStringDefault(key string, defaultStr string)(value string)   通過key獲取value,默認value為string類型;如果獲取失敗,使用默認值

注意:四個接口都用了讀鎖

4 reload()

上面我們已經實現了解析,加載配置文件,並為Config封裝了比較友好的接口。接下來,我們可以仔細看一下我們之前啟動的goroutine了,即reload()方法。

func (c *Config) reload(){
	// 定時器
	ticker := time.NewTicker(time.Second * 5) 
	for _ = range ticker.C {
		// 打開文件
		// 為什么使用匿名函數? 當匿名函數退出時可用defer去關閉文件
		// 如果不用匿名函數,在循環中不好關閉文件,一不小心就內存泄露
		func (){
			f, err := os.Open(c.filename)
			if err != nil {
				fmt.Printf("open file error:%s\n", err)
				return
			}
			defer f.Close()

			fileInfo, err := f.Stat()
			if err != nil {
				fmt.Printf("stat file error:%s\n", err)
				return
			}
			// 或取當前文件修改時間
			curModifyTime := fileInfo.ModTime().Unix() 
			if curModifyTime > c.lastModifyTime {
				// 重新解析時,要考慮應用程序正在讀取這個配置因此應該加鎖
				m, err := c.parse()
				if err != nil {
					fmt.Printf("parse config error:%v\n", err)
					return
				}

				c.rwLock.Lock()
				c.data = m
				c.rwLock.Unlock()

				c.lastModifyTime = curModifyTime

				// 配置更新通知所有觀察者
				for _, n := range c.notifyList {
					n.Callback(c)
				}
			}
		}()
	}
}

reload()函數中做了這幾件事:

【1】用time.NewTicker每隔5秒去檢查一下配置文件

【2】如果配置文件的修改時間比上一次修改時間大,我們認為配置文件更新了。那么我們調用parse()解析配置文件,並更新conf實例中的數據。並且更新配置文件的修改時間。

【3】通知所有觀察者,即通知所有使用配置文件的程序、進程或實例,配置更新了。

5 觀察者模式

我們反復提到觀察者,反復提到通知所有觀察者配置文件更新。那么我們就要實現這個觀察者:

type Notifyer  interface {
	Callback(*Config)
}  

定義這樣一個Notifyer接口,只要實現了Callback方法的對象,就都實現了這個Notifyer接口。實現了這個接口的對象,如果都需要被通知配置文件更新,那這些對象都可以加入到前面定義的notifyList []Notifyer這個切片中,等待被通知配置文件更新。

好了,此處我們是否少寫了添加觀察者的方法呢??是的,馬上寫:

// 添加觀察者
func (c *Config) AddObserver(n Notifyer) {
	c.notifyList = append(c.notifyList, n)
}

6 測試

經過上面一番折騰,咱們的熱加載就快實現了,我們來測一測:

通常我們在應用程序中怎么使用配置文件?【1】加載配置文件,加載之后數據放在一個全局結構體中 【2】run()

也就是run()中我們要使用全局的結構體,但是這個全局結構體會因為配置文件的更改被更新。此時又存在需要加鎖的情況了。我擦,是不是很麻煩。。不用擔心,我們用一個原子操作搞定。

假設我們的配置文件中存放的是hostname/port/kafkaAddr/kafkaPort這幾個字段。。

type AppConfig struct {
	hostname string
	port int
	kafkaAddr string
	kafkaPort int
}  

接下來我們要用原子操作保證數據一致性了:

// reload()協程寫 和 for循環的讀,都是對Appconfig對象,因此有讀寫沖突
type AppConfigMgr struct {
	config atomic.Value
}

// 初始化結構體
var appConfigMgr = &AppConfigMgr{}  

atomic.Value能保證存放數據和讀取出數據不會有沖突。所以當我們更新數據時存放到atomic.Value中,我們使用數據從atomic.Value加載出來,這樣不用加鎖就能保證數據的一致性了。完美~~

我們需要AppConfigMgr實現Callback方法,即實現Notifyer接口,這樣才能被通知配置更新:

func (a *AppConfigMgr)Callback(conf *reconf.Config) {
	appConfig := &AppConfig{}
	hostname, err := conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	appConfig.hostname = hostname

	kafkaPort, err := conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	appConfig.kafkaPort = kafkaPort

	appConfigMgr.config.Store(appConfig)

}

這個Callback實現功能是:當被通知配置更新時,馬上讀取更新的數據並存放到config   atomic.Value 中。

好了,我們要寫主函數了。

func initConfig(file string) {
	// [1] 打開配置文件
	conf, err := reconf.NewConfig(file)
	if err != nil {
		fmt.Printf("read config file err: %v\n", err)
		return
	}

	// 添加觀察者
	conf.AddObserver(appConfigMgr)

	// [2]第一次讀取配置文件
	var appConfig AppConfig
	appConfig.hostname, err = conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	fmt.Println("Hostname:", appConfig.hostname)

	appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	fmt.Println("kafkaPort:", appConfig.kafkaPort)

	// [3] 把讀取到的配置文件數據存儲到atomic.Value
	appConfigMgr.config.Store(&appConfig)
	fmt.Println("first load sucess.")

}

func run(){
	for {
		appConfig := appConfigMgr.config.Load().(*AppConfig)

		fmt.Println("Hostname:", appConfig.hostname)
		fmt.Println("kafkaPort:", appConfig.kafkaPort)
		fmt.Printf("%v\n", "--------------------")
		time.Sleep(5 * time.Second)
	}
}

func main() { 
	confFile := "../parseConfig/test.cfg"
	initConfig(confFile)
	// 應用程序 很多配置已經不是存在文件中而是etcd
	run() 
}

主函數中調用了initConfig()和run()。

initConfig()中:reconf.NewConfig(file)的時候我們已經第一次解析配置,並啟動線程不斷更新配置了。此外initConfig()還做了一些事,就是通過Config提供的接口,將配置文件中的數據讀取到appConfig 中,然后再將appConfig 存儲到 atomic.Value中。

run()就是模擬應用程序在運行過程中使用配置的過程:run()中獲取配置信息就是從 atomic.Value加載出來,這樣保證數據一致性。

編譯運行,然后不斷更改配置文件中kafkaAddr,測試結果如下:

這樣配置文集熱加載就實現了。

附一下所有代碼:

reconf/reconf.go:

package reconf

// 實現一個解析配置文件的包
import (
	"time"
	"io"
	"fmt"
	"os"
	"bufio"
	"strings"
	"strconv"
	"sync"
)

type Config struct{
	filename string
	data map[string]string
	lastModifyTime int64
	rwLock sync.RWMutex
	notifyList []Notifyer
}

func NewConfig(file string)(conf *Config, err error){
	conf = &Config{
		filename: file,
		data: make(map[string]string, 1024),
	}

	m, err := conf.parse()
	if err != nil {
		fmt.Printf("parse conf error:%v\n", err)
		return
	}

	// 將解析配置文件后的數據更新到結構體的map中,寫鎖
	conf.rwLock.Lock()
	conf.data = m
	conf.rwLock.Unlock()

	// 啟一個后台線程去檢測配置文件是否更改
	go conf.reload()
	return
}

// 添加觀察者
func (c *Config) AddObserver(n Notifyer) {
	c.notifyList = append(c.notifyList, n)
}

func (c *Config) reload(){
	// 定時器
	ticker := time.NewTicker(time.Second * 5) 
	for _ = range ticker.C {
		// 打開文件
		// 為什么使用匿名函數? 當匿名函數退出時可用defer去關閉文件
		// 如果不用匿名函數,在循環中不好關閉文件,一不小心就內存泄露
		func (){
			f, err := os.Open(c.filename)
			if err != nil {
				fmt.Printf("open file error:%s\n", err)
				return
			}
			defer f.Close()

			fileInfo, err := f.Stat()
			if err != nil {
				fmt.Printf("stat file error:%s\n", err)
				return
			}
			// 或取當前文件修改時間
			curModifyTime := fileInfo.ModTime().Unix() 
			if curModifyTime > c.lastModifyTime {
				// 重新解析時,要考慮應用程序正在讀取這個配置因此應該加鎖
				m, err := c.parse()
				if err != nil {
					fmt.Printf("parse config error:%v\n", err)
					return
				}

				c.rwLock.Lock()
				c.data = m
				c.rwLock.Unlock()

				c.lastModifyTime = curModifyTime

				// 配置更新通知所有觀察者
				for _, n := range c.notifyList {
					n.Callback(c)
				}
			}
		}()
	}
}

func (c *Config) parse() (m map[string]string, err error) {
	// 如果在parse()中定義一個map,這樣就是一個新的map不用加鎖
	m = make(map[string]string, 1024)

	f, err := os.Open(c.filename)
	if err != nil {
		return
	}
	defer f.Close()

	reader := bufio.NewReader(f)
	// 聲明一個變量存放讀取行數
	var lineNo int
	for {
		line, errRet := reader.ReadString('\n')
		if errRet == io.EOF {
			// 這里有一個坑,最后一行如果不是\n結尾會漏讀
			lineParse(&lineNo, &line, &m)
			break
		}
		if errRet != nil {
			err = errRet
			return
		}
		
		lineParse(&lineNo, &line, &m)
	}

	return 
}

func lineParse(lineNo *int, line *string, m *map[string]string) {
		*lineNo++

		l := strings.TrimSpace(*line)
		// 如果空行 或者 是注釋 跳過
		if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
			return
		}

		itemSlice := strings.Split(l, "=")
		// =
		if len(itemSlice) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}

		key := strings.TrimSpace(itemSlice[0])
		if len(key) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}
		if len(key) == 1 {
			(*m)[key] = ""
			return
		}

		value := strings.TrimSpace(itemSlice[1])
		(*m)[key] = value	

		return
}


func (c *Config) GetInt(key string)(value int, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	value, err = strconv.Atoi(str)
	return
}

func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		value = defaultInt
		return
	}
	value, err := strconv.Atoi(str)
	if err != nil {
		value = defaultInt
	}
	return
}

func (c *Config) GetString(key string)(value string, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	return
}

func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		value = defaultStr
		return
	}
	return
}

reconf/notify.go:

package reconf

// 通知應用程序文件改變

type Notifyer  interface {
	Callback(*Config)
}

reconf_test/main.go:

package main

import (
	"time"
	"go_dev/s23_conf/reconf"
	"sync/atomic"
	"fmt"
)

type AppConfig struct {
	hostname string
	port int
	kafkaAddr string
	kafkaPort int
}

// reload()協程寫 和 for循環的讀,都是對Appconfig對象,因此有讀寫沖突
type AppConfigMgr struct {
	config atomic.Value
}

// 初始化結構體
var appConfigMgr = &AppConfigMgr{}

func (a *AppConfigMgr)Callback(conf *reconf.Config) {
	appConfig := &AppConfig{}
	hostname, err := conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	appConfig.hostname = hostname

	kafkaPort, err := conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	appConfig.kafkaPort = kafkaPort

	appConfigMgr.config.Store(appConfig)

}

func initConfig(file string) {
	// [1] 打開配置文件
	conf, err := reconf.NewConfig(file)
	if err != nil {
		fmt.Printf("read config file err: %v\n", err)
		return
	}

	// 添加觀察者
	conf.AddObserver(appConfigMgr)

	// [2]第一次讀取配置文件
	var appConfig AppConfig
	appConfig.hostname, err = conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	fmt.Println("Hostname:", appConfig.hostname)

	appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	fmt.Println("kafkaPort:", appConfig.kafkaPort)

	// [3] 把讀取到的配置文件數據存儲到atomic.Value
	appConfigMgr.config.Store(&appConfig)
	fmt.Println("first load sucess.")

}

func run(){
	for {
		appConfig := appConfigMgr.config.Load().(*AppConfig)

		fmt.Println("Hostname:", appConfig.hostname)
		fmt.Println("kafkaPort:", appConfig.kafkaPort)
		fmt.Printf("%v\n", "--------------------")
		time.Sleep(5 * time.Second)
	}
}

func main() { 
	confFile := "../parseConfig/test.cfg"
	initConfig(confFile)
	// 應用程序 很多配置已經不是存在文件中而是etcd
	run() 
}  

本篇所有代碼都上傳到github上了https://github.com/zingp/goclub/tree/master/go_dev/s23_conf。參見該目錄下的reconf 和reconf_test兩個目錄。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM