Golang 實現 Redis(4): AOF 持久化與AOF重寫


本文是使用 golang 實現 redis 系列的第四篇文章,將介紹如何使用 golang 實現 Append Only File 持久化及 AOF 文件重寫。

本文完整源代碼在作者Githubgithub.com/hdt3213/godis

AOF 文件

AOF 持久化是典型的異步任務,主協程(goroutine) 可以使用 channel 將數據發送到異步協程由異步協程執行持久化操作。

在 DB 中定義相關字段:

type DB struct {
	// 主線程使用此channel將要持久化的命令發送到異步協程
	aofChan	 chan *reply.MultiBulkReply 
	// append file 文件描述符
	aofFile	 *os.File  
	// append file 路徑
	aofFilename string 

	// aof 重寫需要的緩沖區,將在AOF重寫一節詳細介紹
	aofRewriteChan chan *reply.MultiBulkReply 
	// 在必要的時候使用此字段暫停持久化操作
	pausingAof	 sync.RWMutex 
}

在進行持久化時需要注意兩個細節:

  1. get 之類的讀命令並不需要進行持久化
  2. expire 命令要用等效的 expireat 命令替換。舉例說明,10:00 執行 expire a 3600 表示鍵 a 在 11:00 過期,在 10:30 載入AOF文件時執行 expire a 3600 就成了 11:30 過期與原數據不符。

我們在命令處理方法中返回 AOF 需要的額外信息:

type extra struct {
	// 表示該命令是否需要持久化
	toPersist  bool 
	// 如上文所述 expire 之類的命令不能直接持久化
	// 若 specialAof == nil 則將命令原樣持久化,否則持久化 specialAof 中的指令
   specialAof []*reply.MultiBulkReply 
}

type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)

以 SET 命令為例:

func Set(db *DB, args [][]byte) (redis.Reply, *extra) {
	//....
	var result int
	switch policy {
	case upsertPolicy:
		result = db.Put(key, entity)
	case insertPolicy:
		result = db.PutIfAbsent(key, entity)
	case updatePolicy:
		result = db.PutIfExists(key, entity)
	}
	extra := &extra{toPersist: result > 0} // 若實際寫入了數據則toPresist=true, 若因為XX或NX選項沒有實際寫入數據則toPresist=false
	if result > 0 {
		if ttl != unlimitedTTL { // 使用了 EX 或 NX 選項
			expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
			db.Expire(key, expireTime)
			// 持久化時使用 set key value 和 pexpireat 命令代替 set key value EX ttl 命令
			extra.specialAof = []*reply.MultiBulkReply{ 
				reply.MakeMultiBulkReply([][]byte{
					[]byte("SET"),
					args[0],
					args[1],
				}),
				makeExpireCmd(key, expireTime),
			}
		} else {
			db.Persist(key) // override ttl
		}
	}
	return &reply.OkReply{}, extra
}

var pExpireAtCmd = []byte("PEXPIREAT")

func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
  args := make([][]byte, 3)
  args[0] = pExpireAtCmd
  args[1] = []byte(key)
  args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
  return reply.MakeMultiBulkReply(args)
}

在異步協程中寫入命令:

func (handler *Handler) handleAof() {
	handler.currentDB = 0
	for p := range handler.aofChan {
		// 使用鎖保證每次都會寫入一條完整的命令
		handler.pausingAof.RLock() 
		// 每個客戶端都可以選擇自己的數據庫,所以 payload 中要保存客戶端選擇的數據庫
		// 選擇的數據庫與 aof 文件中最新的數據庫不一致時寫入一條 Select 命令
		if p.dbIndex != handler.currentDB {
			// select db
			data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
			_, err := handler.aofFile.Write(data)
			if err != nil {
				logger.Warn(err)
				continue // skip this command
			}
			handler.currentDB = p.dbIndex
		}
		// 寫入命令內容
		data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
		_, err := handler.aofFile.Write(data)
		if err != nil {
			logger.Warn(err)
		}
		handler.pausingAof.RUnlock()
	}
	// 關閉過程中主協程會先關閉 handler.aofChan,然后使用 <-handler.aofFinished 等待緩沖區中的命令落盤
	// 通過 handler.aofFinished 通知主協程 aof 緩沖區處理完畢
	handler.aofFinished <- struct{}{}
}

讀取時復用了協議解析器一節中實現的解析器:

func (db *DB) loadAof(maxBytes int) {
	// delete aofChan to prevent write again
	aofChan := db.aofChan
	db.aofChan = nil
	defer func(aofChan chan *reply.MultiBulkReply) {
		db.aofChan = aofChan
	}(aofChan)

	file, err := os.Open(db.aofFilename)
	if err != nil {
		if _, ok := err.(*os.PathError); ok {
			return
		}
		logger.Warn(err)
		return
	}
	defer file.Close()

	reader := utils.NewLimitedReader(file, maxBytes)
	ch := parser.ParseStream(reader)
	for p := range ch {
		if p.Err != nil {
			if p.Err == io.EOF {
				break
			}
			logger.Error("parse error: " + p.Err.Error())
			continue
		}
		if p.Data == nil {
			logger.Error("empty payload")
			continue
		}
		r, ok := p.Data.(*reply.MultiBulkReply)
		if !ok {
			logger.Error("require multi bulk reply")
			continue
		}
		cmd := strings.ToLower(string(r.Args[0]))
		command, ok := cmdTable[cmd]
		if ok {
			handler := command.executor
			handler(db, r.Args[1:])
		}
	}
}

AOF 重寫

若我們對鍵a賦值100次會在AOF文件中產生100條指令但只有最后一條指令是有效的,為了減少持久化文件的大小需要進行AOF重寫以刪除無用的指令。

重寫必須在固定不變的數據集上進行,不能直接使用內存中的數據。Redis 重寫的實現方式是進行 fork 並在子進程中遍歷數據庫內的數據重新生成AOF文件。由於 golang 不支持 fork 操作,我們只能采用讀取AOF文件生成副本的方式來代替fork。

在進行AOF重寫操作時需要滿足兩個要求:

  1. 若 AOF 重寫失敗或被中斷,AOF 文件需保持重寫之前的狀態不能丟失數據
  2. 進行 AOF 重寫期間執行的命令必須保存到新的AOF文件中, 不能丟失

因此我們設計了一套比較復雜的流程:

  1. 暫停AOF寫入 -> 更改狀態為重寫中 -> 准備重寫 -> 恢復AOF寫入
  2. 重寫協程讀取 AOF 文件中的前一部分(重寫開始前的數據,不包括讀寫過程中寫入的數據)並重寫到臨時文件(tmp.aof)中
  3. 暫停AOF寫入 -> 將重寫過程中產生的新數據寫入tmp.aof -> 使用臨時文件tmp.aof覆蓋AOF文件(使用文件系統的mv命令保證安全 -> 恢復AOF寫入

在不阻塞在線服務的同時進行其它操作是一項必需的能力,AOF重寫的思路在解決這類問題時具有重要的參考價值。比如Mysql Online DDL: gh-ost采用了類似的策略保證數據一致。

首先准備開始重寫操作:

func (handler *Handler) StartRewrite() (*rewriteCtx, error) {
	// 暫停 aof 寫入, 數據會在 aofChan 中暫時堆積
	handler.pausingAof.Lock() // pausing aof
	defer handler.pausingAof.Unlock()

	// 調用 fsync 將緩沖區中的數據落盤,防止 aof 文件不完整造成錯誤
	err := handler.aofFile.Sync()
	if err != nil {
		logger.Warn("fsync failed")
		return nil, err
	}

	// 獲得當前 aof 文件大小,用於判斷哪些數據是 aof 重寫過程中產生的
	// handleAof 會保證每次寫入完整的一條指令
	fileInfo, _ := os.Stat(handler.aofFilename)
	filesize := fileInfo.Size()

	// 創建臨時文件供重寫使用
	file, err := ioutil.TempFile("", "*.aof")
	if err != nil {
		logger.Warn("tmp file create failed")
		return nil, err
	}
	return &rewriteCtx{
		tmpFile:  file,
		fileSize: filesize,
		dbIdx:	handler.currentDB, // 重寫開始時 aof 文件選中的數據庫
	}, nil
}

執行重寫:

func (handler *Handler) DoRewrite(ctx *rewriteCtx) error {
	tmpFile := ctx.tmpFile

	// 將重寫開始前的數據加載到內存
	tmpAof := handler.newRewriteHandler()
	tmpAof.LoadAof(int(ctx.fileSize))

	// 將內存中的數據寫入臨時文件
	for i := 0; i < config.Properties.Databases; i++ {
		// select db
		data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
		_, err := tmpFile.Write(data)
		if err != nil {
			return err
		}
		// dump db
		tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
			cmd := EntityToCmd(key, entity)
			if cmd != nil {
				_, _ = tmpFile.Write(cmd.ToBytes())
			}
			if expiration != nil {
				cmd := MakeExpireCmd(key, *expiration)
				if cmd != nil {
					_, _ = tmpFile.Write(cmd.ToBytes())
				}
			}
			return true
		})
	}
	return nil
}

結束重寫的過程最為復雜:

func (handler *Handler) FinishRewrite(ctx *rewriteCtx) {
	// 同樣暫停 handleAof 的寫入
	handler.pausingAof.Lock() 
	defer handler.pausingAof.Unlock()

	// 打開線上 aof 文件並 seek 到重寫開始的位置
	tmpFile := ctx.tmpFile
	src, err := os.Open(handler.aofFilename)
	if err != nil {
		logger.Error("open aofFilename failed: " + err.Error())
		return
	}
	defer func() {
		_ = src.Close()
	}()
	_, err = src.Seek(ctx.fileSize, 0)
	if err != nil {
		logger.Error("seek failed: " + err.Error())
		return
	}

	// 寫入一條 Select 命令,使 tmpAof 選中重寫開始時刻線上 aof 文件選中的數據庫
	data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
	_, err = tmpFile.Write(data)
	if err != nil {
		logger.Error("tmp file rewrite failed: " + err.Error())
		return
	}
	// 對齊數據庫后就可以把重寫過程中產生的數據復制到 tmpAof 文件了
	_, err = io.Copy(tmpFile, src)
	if err != nil {
		logger.Error("copy aof filed failed: " + err.Error())
		return
	}

	// 使用 mv 命令用 tmpAof 代替線上 aof 文件
	_ = handler.aofFile.Close()
	_ = os.Rename(tmpFile.Name(), handler.aofFilename)

	// 重新打開線上 aof 
	aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
	if err != nil {
		panic(err)
	}
	handler.aofFile = aofFile

	// 重新寫入一次 select 指令保證 aof 中的數據庫與 handler.currentDB 一致
	data = reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes()
	_, err = handler.aofFile.Write(data)
	if err != nil {
		panic(err)
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM