通常我們更新應用程序的配置文件,都需要手動重啟程序或手動重新加載配置。假設一組服務部署在10台機器上,你需要借助批量運維工具執行重啟命令,而且10台同時重啟可能還會造成服務短暫不可用。要是更新配置后,服務自動刷新配置多好...今天我們就用go實現配置文件熱加載的小功能,以后更新配置再也不用手動重啟了...
1 基本思路
通常應用程序啟動的流程:加載配置,然后run()。我們怎么做到熱加載呢?我們的思路是這樣的:
【1】在加載配置文件之后,啟動一個線程
【2】該線程定時監聽這個配置文件是否有改動
【3】如果配置文件有變動,就重新加載一下
【4】重新加載之后通知需要使用這些配置的應用程序(進程或線程),實際上就是刷新內存中配置
2 加載配置
首先我們要實現加載配置功能。假設配置文件是k=v格式的,如下:

那我們得寫一個解析配置的包了...讓我們一起面向對象:
type Config struct{
filename string
data map[string]string
lastModifyTime int64
rwLock sync.RWMutex
notifyList []Notifyer
}
filename string 配置文件名稱
data map[string]string 將配置文件中的k/v解析存放到map中
lastModifyTime int64 記錄配置文件上一次更改時間
rwLock sync.RWMutex 讀寫鎖,處理這樣一種競爭情況:更新這個結構體時其他線程正在讀取改結構體中的內容,后續用到的時候會講
notifyList []Notifyer 存放所有觀察者,此處我們用到了觀察者模式,也就是需要用到這個配置的對象,我們就把它加到這個切片。當配置更新之后,通知切片中的對象配置更新了。
接下來我們可以給這個結構體添加一些方法了:
2.1 構造函數
func NewConfig(file string)(conf *Config, err error){
conf = &Config{
filename: file,
data: make(map[string]string, 1024),
}
m, err := conf.parse()
if err != nil {
fmt.Printf("parse conf error:%v\n", err)
return
}
// 將解析配置文件后的數據更新到結構體的map中,寫鎖
conf.rwLock.Lock()
conf.data = m
conf.rwLock.Unlock()
// 啟一個后台線程去檢測配置文件是否更改
go conf.reload()
return
}
構造函數做了三件事:【1】初始化Config 【2】調用parse()函數,解析配置文件,並把解析后的map更新到Config 【3】啟動一個線程,准確說是啟動一個goroutine,即reload()
注意此處更新map時加了寫鎖了,目的在於不影響擁有讀鎖的線程讀取數據。
2.2 parse()
解析函數比較簡單,主要是讀取配置文件,一行行解析,數據存放在map中。
func (c *Config) parse() (m map[string]string, err error) {
// 如果在parse()中定義一個map,這樣就是一個新的map不用加鎖
m = make(map[string]string, 1024)
f, err := os.Open(c.filename)
if err != nil {
return
}
defer f.Close()
reader := bufio.NewReader(f)
// 聲明一個變量存放讀取行數
var lineNo int
for {
line, errRet := reader.ReadString('\n')
if errRet == io.EOF {
// 這里有一個坑,最后一行如果不是\n結尾會漏讀
lineParse(&lineNo, &line, &m)
break
}
if errRet != nil {
err = errRet
return
}
lineParse(&lineNo, &line, &m)
}
return
}
func lineParse(lineNo *int, line *string, m *map[string]string) {
*lineNo++
l := strings.TrimSpace(*line)
// 如果空行 或者 是注釋 跳過
if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
return
}
itemSlice := strings.Split(l, "=")
// =
if len(itemSlice) == 0 {
fmt.Printf("invalid config, line:%d", lineNo)
return
}
key := strings.TrimSpace(itemSlice[0])
if len(key) == 0 {
fmt.Printf("invalid config, line:%d", lineNo)
return
}
if len(key) == 1 {
(*m)[key] = ""
return
}
value := strings.TrimSpace(itemSlice[1])
(*m)[key] = value
return
}
這里我寫了兩個函數。lineParse()是解析每一行配置的。parse()就是解析的主函數,在parse()中我調用了兩次lineParse()。原因是在使用bufio按行讀取配置文件的時候,有時候會出現這樣的情況:有的人在寫配置文件的時候,最后一行沒有換行,也就是沒有‘\n’,然后我們就直接讀到io.EOF了,這時候如果直接break就會導致最后一行丟失。所以這種情況下我們應該在break之前調用lineParse()把最后一行處理了。
3 封裝接口
上面我們已經實現了讀取配置文件,並放到一個Config示例中,我們需要為這個Config封裝一些接口,方便用戶通過接口訪問Config的內容。這步比較簡單:
func (c *Config) GetInt(key string)(value int, err error){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
str, ok := c.data[key]
if !ok {
err = fmt.Errorf("key [%s] not found", key)
}
value, err = strconv.Atoi(str)
return
}
func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
str, ok := c.data[key]
if !ok {
value = defaultInt
return
}
value, err := strconv.Atoi(str)
if err != nil {
value = defaultInt
}
return
}
func (c *Config) GetString(key string)(value string, err error){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
value, ok := c.data[key]
if !ok {
err = fmt.Errorf("key [%s] not found", key)
}
return
}
func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
value, ok := c.data[key]
if !ok {
value = defaultStr
return
}
return
}
如上,一共封裝了4個接口:
GetInt(key string)(value int, err error) 通過key獲取value,並將value轉成int類型
GetIntDefault(key string, defaultInt int)(value int) 通過key獲取value,並將value轉成int類型;如果獲取失敗,使用默認值
GetString(key string)(value string, err error) 通過key獲取value,默認value為string類型
GetIStringDefault(key string, defaultStr string)(value string) 通過key獲取value,默認value為string類型;如果獲取失敗,使用默認值
注意:四個接口都用了讀鎖
4 reload()
上面我們已經實現了解析,加載配置文件,並為Config封裝了比較友好的接口。接下來,我們可以仔細看一下我們之前啟動的goroutine了,即reload()方法。
func (c *Config) reload(){
// 定時器
ticker := time.NewTicker(time.Second * 5)
for _ = range ticker.C {
// 打開文件
// 為什么使用匿名函數? 當匿名函數退出時可用defer去關閉文件
// 如果不用匿名函數,在循環中不好關閉文件,一不小心就內存泄露
func (){
f, err := os.Open(c.filename)
if err != nil {
fmt.Printf("open file error:%s\n", err)
return
}
defer f.Close()
fileInfo, err := f.Stat()
if err != nil {
fmt.Printf("stat file error:%s\n", err)
return
}
// 或取當前文件修改時間
curModifyTime := fileInfo.ModTime().Unix()
if curModifyTime > c.lastModifyTime {
// 重新解析時,要考慮應用程序正在讀取這個配置因此應該加鎖
m, err := c.parse()
if err != nil {
fmt.Printf("parse config error:%v\n", err)
return
}
c.rwLock.Lock()
c.data = m
c.rwLock.Unlock()
c.lastModifyTime = curModifyTime
// 配置更新通知所有觀察者
for _, n := range c.notifyList {
n.Callback(c)
}
}
}()
}
}
reload()函數中做了這幾件事:
【1】用time.NewTicker每隔5秒去檢查一下配置文件
【2】如果配置文件的修改時間比上一次修改時間大,我們認為配置文件更新了。那么我們調用parse()解析配置文件,並更新conf實例中的數據。並且更新配置文件的修改時間。
【3】通知所有觀察者,即通知所有使用配置文件的程序、進程或實例,配置更新了。
5 觀察者模式
我們反復提到觀察者,反復提到通知所有觀察者配置文件更新。那么我們就要實現這個觀察者:
type Notifyer interface {
Callback(*Config)
}
定義這樣一個Notifyer接口,只要實現了Callback方法的對象,就都實現了這個Notifyer接口。實現了這個接口的對象,如果都需要被通知配置文件更新,那這些對象都可以加入到前面定義的notifyList []Notifyer這個切片中,等待被通知配置文件更新。
好了,此處我們是否少寫了添加觀察者的方法呢??是的,馬上寫:
// 添加觀察者
func (c *Config) AddObserver(n Notifyer) {
c.notifyList = append(c.notifyList, n)
}
6 測試
經過上面一番折騰,咱們的熱加載就快實現了,我們來測一測:
通常我們在應用程序中怎么使用配置文件?【1】加載配置文件,加載之后數據放在一個全局結構體中 【2】run()
也就是run()中我們要使用全局的結構體,但是這個全局結構體會因為配置文件的更改被更新。此時又存在需要加鎖的情況了。我擦,是不是很麻煩。。不用擔心,我們用一個原子操作搞定。
假設我們的配置文件中存放的是hostname/port/kafkaAddr/kafkaPort這幾個字段。。
type AppConfig struct {
hostname string
port int
kafkaAddr string
kafkaPort int
}
接下來我們要用原子操作保證數據一致性了:
// reload()協程寫 和 for循環的讀,都是對Appconfig對象,因此有讀寫沖突
type AppConfigMgr struct {
config atomic.Value
}
// 初始化結構體
var appConfigMgr = &AppConfigMgr{}
atomic.Value能保證存放數據和讀取出數據不會有沖突。所以當我們更新數據時存放到atomic.Value中,我們使用數據從atomic.Value加載出來,這樣不用加鎖就能保證數據的一致性了。完美~~
我們需要AppConfigMgr實現Callback方法,即實現Notifyer接口,這樣才能被通知配置更新:
func (a *AppConfigMgr)Callback(conf *reconf.Config) {
appConfig := &AppConfig{}
hostname, err := conf.GetString("hostname")
if err != nil {
fmt.Printf("get hostname err: %v\n", err)
return
}
appConfig.hostname = hostname
kafkaPort, err := conf.GetInt("kafkaPort")
if err != nil {
fmt.Printf("get kafkaPort err: %v\n", err)
return
}
appConfig.kafkaPort = kafkaPort
appConfigMgr.config.Store(appConfig)
}
這個Callback實現功能是:當被通知配置更新時,馬上讀取更新的數據並存放到config atomic.Value 中。
好了,我們要寫主函數了。
func initConfig(file string) {
// [1] 打開配置文件
conf, err := reconf.NewConfig(file)
if err != nil {
fmt.Printf("read config file err: %v\n", err)
return
}
// 添加觀察者
conf.AddObserver(appConfigMgr)
// [2]第一次讀取配置文件
var appConfig AppConfig
appConfig.hostname, err = conf.GetString("hostname")
if err != nil {
fmt.Printf("get hostname err: %v\n", err)
return
}
fmt.Println("Hostname:", appConfig.hostname)
appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
if err != nil {
fmt.Printf("get kafkaPort err: %v\n", err)
return
}
fmt.Println("kafkaPort:", appConfig.kafkaPort)
// [3] 把讀取到的配置文件數據存儲到atomic.Value
appConfigMgr.config.Store(&appConfig)
fmt.Println("first load sucess.")
}
func run(){
for {
appConfig := appConfigMgr.config.Load().(*AppConfig)
fmt.Println("Hostname:", appConfig.hostname)
fmt.Println("kafkaPort:", appConfig.kafkaPort)
fmt.Printf("%v\n", "--------------------")
time.Sleep(5 * time.Second)
}
}
func main() {
confFile := "../parseConfig/test.cfg"
initConfig(confFile)
// 應用程序 很多配置已經不是存在文件中而是etcd
run()
}
主函數中調用了initConfig()和run()。
initConfig()中:reconf.NewConfig(file)的時候我們已經第一次解析配置,並啟動線程不斷更新配置了。此外initConfig()還做了一些事,就是通過Config提供的接口,將配置文件中的數據讀取到appConfig 中,然后再將appConfig 存儲到 atomic.Value中。
run()就是模擬應用程序在運行過程中使用配置的過程:run()中獲取配置信息就是從 atomic.Value加載出來,這樣保證數據一致性。
編譯運行,然后不斷更改配置文件中kafkaAddr,測試結果如下:

這樣配置文集熱加載就實現了。
附一下所有代碼:
reconf/reconf.go:
package reconf
// 實現一個解析配置文件的包
import (
"time"
"io"
"fmt"
"os"
"bufio"
"strings"
"strconv"
"sync"
)
type Config struct{
filename string
data map[string]string
lastModifyTime int64
rwLock sync.RWMutex
notifyList []Notifyer
}
func NewConfig(file string)(conf *Config, err error){
conf = &Config{
filename: file,
data: make(map[string]string, 1024),
}
m, err := conf.parse()
if err != nil {
fmt.Printf("parse conf error:%v\n", err)
return
}
// 將解析配置文件后的數據更新到結構體的map中,寫鎖
conf.rwLock.Lock()
conf.data = m
conf.rwLock.Unlock()
// 啟一個后台線程去檢測配置文件是否更改
go conf.reload()
return
}
// 添加觀察者
func (c *Config) AddObserver(n Notifyer) {
c.notifyList = append(c.notifyList, n)
}
func (c *Config) reload(){
// 定時器
ticker := time.NewTicker(time.Second * 5)
for _ = range ticker.C {
// 打開文件
// 為什么使用匿名函數? 當匿名函數退出時可用defer去關閉文件
// 如果不用匿名函數,在循環中不好關閉文件,一不小心就內存泄露
func (){
f, err := os.Open(c.filename)
if err != nil {
fmt.Printf("open file error:%s\n", err)
return
}
defer f.Close()
fileInfo, err := f.Stat()
if err != nil {
fmt.Printf("stat file error:%s\n", err)
return
}
// 或取當前文件修改時間
curModifyTime := fileInfo.ModTime().Unix()
if curModifyTime > c.lastModifyTime {
// 重新解析時,要考慮應用程序正在讀取這個配置因此應該加鎖
m, err := c.parse()
if err != nil {
fmt.Printf("parse config error:%v\n", err)
return
}
c.rwLock.Lock()
c.data = m
c.rwLock.Unlock()
c.lastModifyTime = curModifyTime
// 配置更新通知所有觀察者
for _, n := range c.notifyList {
n.Callback(c)
}
}
}()
}
}
func (c *Config) parse() (m map[string]string, err error) {
// 如果在parse()中定義一個map,這樣就是一個新的map不用加鎖
m = make(map[string]string, 1024)
f, err := os.Open(c.filename)
if err != nil {
return
}
defer f.Close()
reader := bufio.NewReader(f)
// 聲明一個變量存放讀取行數
var lineNo int
for {
line, errRet := reader.ReadString('\n')
if errRet == io.EOF {
// 這里有一個坑,最后一行如果不是\n結尾會漏讀
lineParse(&lineNo, &line, &m)
break
}
if errRet != nil {
err = errRet
return
}
lineParse(&lineNo, &line, &m)
}
return
}
func lineParse(lineNo *int, line *string, m *map[string]string) {
*lineNo++
l := strings.TrimSpace(*line)
// 如果空行 或者 是注釋 跳過
if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
return
}
itemSlice := strings.Split(l, "=")
// =
if len(itemSlice) == 0 {
fmt.Printf("invalid config, line:%d", lineNo)
return
}
key := strings.TrimSpace(itemSlice[0])
if len(key) == 0 {
fmt.Printf("invalid config, line:%d", lineNo)
return
}
if len(key) == 1 {
(*m)[key] = ""
return
}
value := strings.TrimSpace(itemSlice[1])
(*m)[key] = value
return
}
func (c *Config) GetInt(key string)(value int, err error){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
str, ok := c.data[key]
if !ok {
err = fmt.Errorf("key [%s] not found", key)
}
value, err = strconv.Atoi(str)
return
}
func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
str, ok := c.data[key]
if !ok {
value = defaultInt
return
}
value, err := strconv.Atoi(str)
if err != nil {
value = defaultInt
}
return
}
func (c *Config) GetString(key string)(value string, err error){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
value, ok := c.data[key]
if !ok {
err = fmt.Errorf("key [%s] not found", key)
}
return
}
func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
c.rwLock.RLock()
defer c.rwLock.RUnlock()
value, ok := c.data[key]
if !ok {
value = defaultStr
return
}
return
}
reconf/notify.go:
package reconf
// 通知應用程序文件改變
type Notifyer interface {
Callback(*Config)
}
reconf_test/main.go:
package main
import (
"time"
"go_dev/s23_conf/reconf"
"sync/atomic"
"fmt"
)
type AppConfig struct {
hostname string
port int
kafkaAddr string
kafkaPort int
}
// reload()協程寫 和 for循環的讀,都是對Appconfig對象,因此有讀寫沖突
type AppConfigMgr struct {
config atomic.Value
}
// 初始化結構體
var appConfigMgr = &AppConfigMgr{}
func (a *AppConfigMgr)Callback(conf *reconf.Config) {
appConfig := &AppConfig{}
hostname, err := conf.GetString("hostname")
if err != nil {
fmt.Printf("get hostname err: %v\n", err)
return
}
appConfig.hostname = hostname
kafkaPort, err := conf.GetInt("kafkaPort")
if err != nil {
fmt.Printf("get kafkaPort err: %v\n", err)
return
}
appConfig.kafkaPort = kafkaPort
appConfigMgr.config.Store(appConfig)
}
func initConfig(file string) {
// [1] 打開配置文件
conf, err := reconf.NewConfig(file)
if err != nil {
fmt.Printf("read config file err: %v\n", err)
return
}
// 添加觀察者
conf.AddObserver(appConfigMgr)
// [2]第一次讀取配置文件
var appConfig AppConfig
appConfig.hostname, err = conf.GetString("hostname")
if err != nil {
fmt.Printf("get hostname err: %v\n", err)
return
}
fmt.Println("Hostname:", appConfig.hostname)
appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
if err != nil {
fmt.Printf("get kafkaPort err: %v\n", err)
return
}
fmt.Println("kafkaPort:", appConfig.kafkaPort)
// [3] 把讀取到的配置文件數據存儲到atomic.Value
appConfigMgr.config.Store(&appConfig)
fmt.Println("first load sucess.")
}
func run(){
for {
appConfig := appConfigMgr.config.Load().(*AppConfig)
fmt.Println("Hostname:", appConfig.hostname)
fmt.Println("kafkaPort:", appConfig.kafkaPort)
fmt.Printf("%v\n", "--------------------")
time.Sleep(5 * time.Second)
}
}
func main() {
confFile := "../parseConfig/test.cfg"
initConfig(confFile)
// 應用程序 很多配置已經不是存在文件中而是etcd
run()
}
本篇所有代碼都上傳到github上了:https://github.com/zingp/goclub/tree/master/go_dev/s23_conf。參見該目錄下的reconf 和reconf_test兩個目錄。
