探究Go-YCSB做數據庫基准測試


本篇文章開篇會介紹一下Go-YCSB是如何使用,然后按照慣例會分析一下它是如何做基准測試,看看它有什么優缺點。

轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客: https://www.luozhiyun.com/archives/634

最近我們在做數據庫的技術選型,要做選型的話難免需要對數據庫進行一個基准測試,以便可以橫向對比不同數據庫性能。

YCSB,全稱為“Yahoo!Cloud Serving Benchmark”,是雅虎開發的用來對雲服務進行基礎測試的工具,其內部涵蓋了常見的NoSQL數據庫產品,如Cassandra、MongoDB、HBase、Redis等等。

作為一名go開發人員,所以我們使用 pingcap 開發的Go YCSB來進行基准測試。

安裝

首先要保證本地 Go 版本不低於 1.16,然后下載編譯:

git clone https://github.com/pingcap/go-ycsb.git
cd go-ycsb
make

在 bin 文件夾里面就放着我們編譯好的程序 go-ycsb。

我們先來看一下 workloads 文件夾,目錄下有各種workload的模板,可以基於workload模板進行自定義修改。默認的6種測試場景如下:

  • workloada:讀寫均衡型,50%/50%,Reads/Writes
  • workloadb:讀多寫少型,95%/5%,Reads/Writes
  • workloadc:只讀型,100%,Reads
  • workloadd:讀最近寫入記錄型,95%/5%,Reads/insert
  • workloade:掃描小區間型,95%/5%,scan/insert
  • workloadf:讀寫入記錄均衡型,50%/50%,Reads/insert
  • workload_template:參數列表模板。

所以我們可以依據不同的 workload 多維度的對系統進行測試。workload里面的操作主要包括:

  • Insert:插入一條新的記錄
  • Update:更新一條記錄的某一個或者所有 fields
  • Read:讀取一條記錄的某一個或者所有 fields
  • Scan:隨機從一個 key 開始順序掃描隨機條記錄

在測試的時候,我們還需要根據不同的業務場景來模擬測試,那么可以通過 requestdistribution 控制:

  • uniform:隨機選擇一個記錄;
  • sequential:按順序選擇記錄;
  • zipfian:根據 Zipfian 分布來選擇記錄。大致意思就是互聯網常說的80/20原則,也就是20%的key,會占有80%的訪問量;
  • latest:和 Zipfian 類似,但是傾向於訪問新數據明顯多於老數據;
  • hotspot:熱點分布訪問;
  • exponential:指數分布訪問;

下面我們看一下workload里面可以填哪些參數:

# 目前只實現了這一種
workload=core
 
# 總記錄數
recordcount=1000000
 
# 測試階段被操作的記錄數,如果設置了 threadcount,那么每個線程操作的記錄數=operationcount/threadcount
operationcount=3000000
 
# 線程數
threadcount=500 
 
# 如果一個表里面已經有記錄數了,那么load的數據的時候從這個記錄數開始
insertstart=0
 
# 一行數據的字段數
fieldcount=10
 
# 每個字段大小
fieldlength=100
 
# 是否應該讀取所有字段
readallfields=true
 
# 是否應該更新所有字段
writeallfields=false
 
# 字段長度分布
fieldlengthdistribution=constant
#fieldlengthdistribution=uniform
#fieldlengthdistribution=zipfian
 
# 讀操作概率
readproportion=0.95 
# 更新操作概率
updateproportion=0.05
 
# 插入操作概率
insertproportion=0 

# 先讀后寫操作同一條記錄概率
readmodifywriteproportion=0
 
# 范圍操作的概率
scanproportion=0
 
# 范圍操作,最大的可操作的記錄數
maxscanlength=1000
 
# 用來選擇掃描時訪問的記錄數量分布情況
scanlengthdistribution=uniform
#scanlengthdistribution=zipfian
 
# 記錄應按順序插入還是偽隨機插入
insertorder=hashed
#insertorder=ordered
 
# 以什么方式模擬測試
requestdistribution=zipfian
#requestdistribution=uniform
#requestdistribution=latest
 
# 下面這兩種方式時針對requestdistribution為hotspot的時候
# 構成熱點集的數據項的百分比
hotspotdatafraction=0.2
 
# 訪問熱點集的數據操作百分比
hotspotopnfraction=0.8
 
# 操作數據的表名
table=usertable
 
# 延遲測量結果展現形式,暫時沒實現
measurementtype=histogram
 

測試

比如我們現在要測試 redis 的性能,先寫一個 workload:

recordcount=1000000
operationcount=1000000
workload=core 
readallfields=true 
readmodifywriteproportion=1 
requestdistribution=uniform 
redis.addr=127.0.0.1:6379 
threadcount=50

上面的這個 workload 表示在 load 的時候會插入100萬條數據到庫里面,操作的數據量也是100萬,但是有50個線程,也就是每個線程實際操作2萬行記錄;

測試方式使用 readmodifywriteproportion,先讀后寫,操作記錄采用 uniform 也就是隨機方式進行。

先 load 數據:

./bin/go-ycsb load redis  -P workloads/workloada

再運行測試:

./bin/go-ycsb run redis  -P workloads/workloada

返回:

READ_MODIFY_WRITE - Takes(s): 18.8, Count: 499312, OPS: 26539.8, Avg(us): 1388, Min(us): 107, Max(us): 42760, 99th(us): 3000, 99.9th(us): 7000, 99.99th(us): 26000
  • Takes(s) :表示測試總共耗時;
  • Count:操作記錄數;
  • OPS:Operates Per Second,一般是操作次數,與qps區別不大;
  • Avg、Min、Max:平均、最小、最大單條記錄操作耗時;
  • 99th、99.9th、99.99th:P99、P99.9、P99.99時延;

代碼實現分析

當然對於我來說,肯定還是要看一下它的代碼是怎么做的,學習一下大佬是如何寫代碼的對我們工作也是很有幫助。

對於 Go YCSB 來說,它總共有這么幾個組成部分:

  • workload:加載初始化配置文件,創建線程執行測試;
  • client:封裝了 workload ,配置參數,DB等,用來運行測試;
  • db:配置了一堆可被執行的數據庫 client,執行具體的讀寫數據庫;
  • measurement:數據統計模塊,統計執行次數,時延等;

我們以 redis 為例先看一下,如果要測試自己的 Database 該怎么辦。

定義 DB

在 Go YCSB 中,所有的 DB 都放在 db 這個目錄下面:

所以,我們可以在這個文件夾下面創建自己的db,然后構造一個 struct ,實現 DB 這個接口:

type DB interface { 
	ToSqlDB() *sql.DB 
	Close() error 
	InitThread(ctx context.Context, threadID int, threadCount int) context.Context 
	CleanupThread(ctx context.Context) 
	Read(ctx context.Context, table string, key string, fields []string) (map[string][]byte, error) 
	Scan(ctx context.Context, table string, startKey string, count int, fields []string) ([]map[string][]byte, error) 
	Update(ctx context.Context, table string, key string, values map[string][]byte) error 
	Insert(ctx context.Context, table string, key string, values map[string][]byte) error 
	Delete(ctx context.Context, table string, key string) error
}

里面定義了具體的 DB 操作。

然后需要定義一個工廠,用來創建這個 DB struct,實現DBCreator接口:

type DBCreator interface {
	Create(p *properties.Properties) (DB, error)
}

然后需要定義一個 init 函數,在啟動的時候進行 DBCreator 注冊:

func init() {
	ycsb.RegisterDBCreator("redis", redisCreator{})
}

var dbCreators = map[string]DBCreator{}
 
func RegisterDBCreator(name string, creator DBCreator) {
	_, ok := dbCreators[name]
	if ok {
		panic(fmt.Sprintf("duplicate register database %s", name))
	}

	dbCreators[name] = creator
}

RegisterDBCreator 會在初始化的時候被調用。用來獲取 init 方法注冊過的 DB。通過這種方式 Go YCSB 實現了 DB 的自定義化。

全局參數初始化

首先 Go YCSB 在運行的時候會使用 cobra 根據傳入的是 load 還是 run 執行到下面兩個不同的方法:

func runLoadCommandFunc(cmd *cobra.Command, args []string) {
	runClientCommandFunc(cmd, args, false)
}

func runTransCommandFunc(cmd *cobra.Command, args []string) {
	runClientCommandFunc(cmd, args, true)
}

這里會調用到 runClientCommandFunc 函數中:

func runClientCommandFunc(cmd *cobra.Command, args []string, doTransactions bool) {
	dbName := args[0]
	// 初始化全局參數
	initialGlobal(dbName, func() {
		doTransFlag := "true"
		if !doTransactions {
			doTransFlag = "false"
		}
		globalProps.Set(prop.DoTransactions, doTransFlag)

		if cmd.Flags().Changed("threads") {
			// We set the threadArg via command line.
			globalProps.Set(prop.ThreadCount, strconv.Itoa(threadsArg))
		}

		if cmd.Flags().Changed("target") {
			globalProps.Set(prop.Target, strconv.Itoa(targetArg))
		}

		if cmd.Flags().Changed("interval") {
			globalProps.Set(prop.LogInterval, strconv.Itoa(reportInterval))
		}
	})

	fmt.Println("***************** properties *****************")
	for key, value := range globalProps.Map() {
		fmt.Printf("\"%s\"=\"%s\"\n", key, value)
	}
	fmt.Println("**********************************************")
	// 初始化 client
	c := client.NewClient(globalProps, globalWorkload, globalDB)
	start := time.Now()
	// 運行測試
	c.Run(globalContext)

	fmt.Printf("Run finished, takes %s\n", time.Now().Sub(start))
	// 測試結果輸出
	measurement.Output()
}

參數的初始化主要是在 initialGlobal 里面做的:

func initialGlobal(dbName string, onProperties func()) {
	...
	go func() {
		http.ListenAndServe(addr, nil)
	}()
	//初始化 measurement
	measurement.InitMeasure(globalProps)

	if len(tableName) == 0 {
		tableName = globalProps.GetString(prop.TableName, prop.TableNameDefault)
	}
	// 獲取 WorkloadCreator
	workloadName := globalProps.GetString(prop.Workload, "core")
	workloadCreator := ycsb.GetWorkloadCreator(workloadName)
	//創建Workload
	var err error
	if globalWorkload, err = workloadCreator.Create(globalProps); err != nil {
		util.Fatalf("create workload %s failed %v", workloadName, err)
	}
	// 獲取要被測試的 db
	dbCreator := ycsb.GetDBCreator(dbName)
	if dbCreator == nil {
		util.Fatalf("%s is not registered", dbName)
	}
	// 創建 db 
	if globalDB, err = dbCreator.Create(globalProps); err != nil {
		util.Fatalf("create db %s failed %v", dbName, err)
	}
	globalDB = client.DbWrapper{globalDB}
}

這里最主要的是創建 Workload 和 DB。Workload 里面會初始化很多配置文件里面的信息。

運行測試

runClientCommandFunc 里面會調用 client 的 Run 方法執行測試:

func (c *Client) Run(ctx context.Context) {
	var wg sync.WaitGroup
	threadCount := c.p.GetInt(prop.ThreadCount, 1)

	wg.Add(threadCount)
	measureCtx, measureCancel := context.WithCancel(ctx)
	measureCh := make(chan struct{}, 1)
	go func() {
		defer func() {
			measureCh <- struct{}{}
		}() 
		// 這里很有意思,因為有時候我們做數據庫是需要初始化數據到緩存里面的
		// 所以開始的一段時間我們不能計入測試統計中,這里有隔預熱時間,可以通過 warmuptime 配置 
		if c.p.GetBool(prop.DoTransactions, true) {
			dur := c.p.GetInt64(prop.WarmUpTime, 0)
			select {
			case <-ctx.Done():
				return
			case <-time.After(time.Duration(dur) * time.Second):
			}
		}
		// 預熱完畢
		measurement.EnableWarmUp(false)

		dur := c.p.GetInt64(prop.LogInterval, 10)
		t := time.NewTicker(time.Duration(dur) * time.Second)
		defer t.Stop()

		for {
			select {
			// 在運行的時候每隔 10 秒輸出一次統計信息
			case <-t.C:
				measurement.Output()
			case <-measureCtx.Done():
				return
			}
		}
	}()
	// 做一些初始化的工作,如mysql需要創建表
	if err := c.workload.Init(c.db); err != nil {
		fmt.Printf("Initialize workload fail: %v\n", err)
		return
	}
	// 根據 threadCount 創建多個線程操作數據庫
	for i := 0; i < threadCount; i++ {
		go func(threadId int) {
			defer wg.Done()
			// 初始化 worker
			w := newWorker(c.p, threadId, threadCount, c.workload, c.db)
			ctx := c.workload.InitThread(ctx, threadId, threadCount)
			ctx = c.db.InitThread(ctx, threadId, threadCount)
			// 開始跑測試
			w.run(ctx)
			// 跑完測試做清理工作
			c.db.CleanupThread(ctx)
			c.workload.CleanupThread(ctx)
		}(i)
	}
	// 等待測試跑完
	wg.Wait() 
	measureCancel()
	<-measureCh
}

這里分為兩個部分:第一部分是創建一個線程,這個線程會控制是否開始測試統計,然后會每隔10秒輸出一次統計信息;第二部分是根據設置的 threadcount 創建線程,運行 Worker 運行測試;

newWorker 的時候會根據 operationcount 設置 totalOpCount 表示總共需要執行次數,用 totalOpCount / int64(threadCount)設置 opCount 表示 單線程操作的記錄數。

func (w *worker) run(ctx context.Context) { 
	// 將線程操作分散開來,這樣它們就不會同時擊中DB了。
	if w.targetOpsPerMs > 0.0 && w.targetOpsPerMs <= 1.0 {
		time.Sleep(time.Duration(rand.Int63n(w.targetOpsTickNs)))
	}

	startTime := time.Now()
	// 循環直到操作數達到 opsDone
	for w.opCount == 0 || w.opsDone < w.opCount {
		var err error
		opsCount := 1
		// 這里是執行基准測試
		if w.doTransactions {
			if w.doBatch {
				err = w.workload.DoBatchTransaction(ctx, w.batchSize, w.workDB)
				opsCount = w.batchSize
			} else {
				err = w.workload.DoTransaction(ctx, w.workDB)
			}
			//	這里是執行 load 數據
		} else {
			if w.doBatch {
				err = w.workload.DoBatchInsert(ctx, w.batchSize, w.workDB)
				opsCount = w.batchSize
			} else {
				err = w.workload.DoInsert(ctx, w.workDB)
			}
		}
		// 預熱完了會進行操作次數的統計
		if measurement.IsWarmUpFinished() {
			w.opsDone += int64(opsCount)
			w.throttle(ctx, startTime)
		}

		select {
		case <-ctx.Done():
			return
		default:
		}
	}
}

基准測試的具體執行是交給 workload 的 DoTransaction 方法來判斷執行。

func (c *core) DoTransaction(ctx context.Context, db ycsb.DB) error {
	state := ctx.Value(stateKey).(*coreState)
	r := state.r
	// 根據會根據不同的測試場景,進入到不同的測試分支
	// Next 方法會根據設置的 readproportion、updateproportion、 scanproportion等概率來獲取相應操作類型
	operation := operationType(c.operationChooser.Next(r))
	switch operation {
	case read:
		return c.doTransactionRead(ctx, db, state)
	case update:
		return c.doTransactionUpdate(ctx, db, state)
	case insert:
		return c.doTransactionInsert(ctx, db, state)
	case scan:
		return c.doTransactionScan(ctx, db, state)
	default:
		return c.doTransactionReadModifyWrite(ctx, db, state)
	}
}

這里會調用 operationChooser 的 Next 方法來判斷該執行那個指令,執行指令的概率是我們在配置文件里面設置好的。

這個算法很簡單,在初始化 operationChooser 會將設置的參數readproportion、updateproportion、 scanproportion的值以數組的形式 add 到 operationChooser 的 values 里面,然后隨機一個 0~1的小數,檢查這個隨機數落在哪個范圍就好了:

func (d *Discrete) Next(r *rand.Rand) int64 {
	sum := float64(0) 
	for _, p := range d.values {
		sum += p.Weight
	}
	// 隨機一個 0~1的小數
	val := r.Float64() 
	for _, p := range d.values {
		pw := p.Weight / sum
		if val < pw {
			d.SetLastValue(p.Value)
			return p.Value
		} 
		val -= pw
	} 
	panic("oops, should not get here.")
}

在代碼實現上就是按照上面說的,將所有 values 的值加起來得到 sum,然后計算每個 value 的占比是否達到隨機數值。

最后我們再來看看 doTransactionRead 是怎么執行的:

func (c *core) doTransactionRead(ctx context.Context, db ycsb.DB, state *coreState) error {
	r := state.r
	// 根據我們設置的 requestdistribution 獲取一個 key 值
	keyNum := c.nextKeyNum(state)
	keyName := c.buildKeyName(keyNum)

	//被讀取的字段
	var fields []string
	if !c.readAllFields {
		// 如果不是讀取所有字段,那么根據fieldChooser字段選擇器選擇一個字段執行
		fieldName := state.fieldNames[c.fieldChooser.Next(r)]
		fields = append(fields, fieldName)
	} else {
		fields = state.fieldNames
	}
	//調用 db 的read方法
	values, err := db.Read(ctx, c.table, keyName, fields)
	if err != nil {
		return err
	}
	//校驗數據完整性
	if c.dataIntegrity {
		c.verifyRow(state, keyName, values)
	}

	return nil
}

這里首先會調用 nextKeyNum 去獲取 key 值,這里的 key 會根據我們設置的 requestdistribution 參數根據一定的規則獲取到。然后校驗完需要讀哪些字段后調用 DbWrapper 的 Read 方法讀取數據。

func (db DbWrapper) Read(ctx context.Context, table string, key string, fields []string) (_ map[string][]byte, err error) {
	start := time.Now()
	defer func() {
		// 進行測試數據統計
		measure(start, "READ", err)
	}()

	return db.DB.Read(ctx, table, key, fields)
}

DbWrapper 會封裝一層,用 defer 方法調用 measure 進行統計。

不過這里我有問題是在讀取數據的時候通過還會根據傳入的 fields 來進行解析,這樣也會損耗一些性能,不知是否合理,如redis 的 Read 方法:

func (r *redis) Read(ctx context.Context, table string, key string, fields []string) (map[string][]byte, error) {
	data := make(map[string][]byte, len(fields))

	res, err := r.client.Get(table + "/" + key).Result()

	if err != nil {
		return nil, err
	}
	// 反序列化
	err = json.Unmarshal([]byte(res), &data)
	if err != nil {
		return nil, err
	} 
	// TODO: filter by fields 
	return data, err
}

數據統計

每一次操作完畢之后都會調用到 measure 方法,進行測試數據統計。

func measure(start time.Time, op string, err error) {
	// 計算耗時
	lan := time.Now().Sub(start)
	if err != nil {
		measurement.Measure(fmt.Sprintf("%s_ERROR", op), lan)
		return
	}
	measurement.Measure(op, lan)
}

統計信息由於是會有多個線程同時操作,所以需要使用線程安全的方式進行操作:

func (h *histogram) Measure(latency time.Duration) {
	// 這里是 us 微秒
	n := int64(latency / time.Microsecond)

	atomic.AddInt64(&h.sum, n)
	atomic.AddInt64(&h.count, 1)
	// 這里轉為毫秒ms
	bound := int(n / h.boundInterval)
	// boundCounts 是一個並發map,用來統計每個時間段(單位:ms)中有多少次操作
	h.boundCounts.Upsert(bound, 1, func(ok bool, existedValue int64, newValue int64) int64 {
		if ok {
			return existedValue + newValue
		}
		return newValue
	})
	// 設置最小時延
	for {
		oldMin := atomic.LoadInt64(&h.min)
		if n >= oldMin {
			break
		}

		if atomic.CompareAndSwapInt64(&h.min, oldMin, n) {
			break
		}
	}
	// 設置最大時延
	for {
		oldMax := atomic.LoadInt64(&h.max)
		if n <= oldMax {
			break
		}

		if atomic.CompareAndSwapInt64(&h.max, oldMax, n) {
			break
		}
	}
}

統計每個時間段(單位:ms)內操作的次數是使用 boundCounts,它是 Go YCSB 自己實現的 ConcurrentMap 保證線程安全,用來統計單位時間內操作的次數;

最大和最小時延是通過 CAS 進行操作的,也是為了保證線程安全。

統計完之后會調用 getInfo 計算耗時:

func (h *histogram) getInfo() map[string]interface{} {
	min := atomic.LoadInt64(&h.min)
	max := atomic.LoadInt64(&h.max)
	sum := atomic.LoadInt64(&h.sum)
	count := atomic.LoadInt64(&h.count)

	bounds := h.boundCounts.Keys()
	sort.Ints(bounds)

	avg := int64(float64(sum) / float64(count))
	per99 := 0
	per999 := 0
	per9999 := 0

	opCount := int64(0)
	// 計算 P99,P99.9,P99.99
	// 這里實際上是統計一個占比
	// bound 里面會保存每毫秒有多少次操作
	for _, bound := range bounds {
		boundCount, _ := h.boundCounts.Get(bound)
		opCount += boundCount
		per := float64(opCount) / float64(count)
		// 這里是 99% 的操作是落在哪個時間區間內
		if per99 == 0 && per >= 0.99 {
			per99 = (bound + 1) * 1000
		}

		if per999 == 0 && per >= 0.999 {
			per999 = (bound + 1) * 1000
		}

		if per9999 == 0 && per >= 0.9999 {
			per9999 = (bound + 1) * 1000
		}
	}
	// 計算整個測試耗時
	elapsed := time.Now().Sub(h.startTime).Seconds()
	// 計算單位耗時內操作次數 
	qps := float64(count) / elapsed
	res := make(map[string]interface{})
	res[ELAPSED] = elapsed
	res[COUNT] = count
	res[QPS] = qps
	res[AVG] = avg
	res[MIN] = min
	res[MAX] = max
	res[PER99TH] = per99
	res[PER999TH] = per999
	res[PER9999TH] = per9999

	return res
}

這里的 per99、per999、per9999 實際上精度只有毫秒,是為了做直方圖導出而設計的(然后作者在這個項目已經過去3年了,還沒加上這個功能)。

總結

通過上面的分析可以發現, Go YCSB 設計還是很精妙的,通過很少的代碼就可以進行 DB 的擴展;配置也是相當靈活,可以根據不同的 requestdistribution 提供了不同的測試環境,並且在測試中也可以隨意的調整讀寫概率,保證可以盡可能的模擬線上的環境。

但是它也有很多不足,一方面是文檔很不充分,基本上就寫了幾個參數配置;另一方面就是很多功能都沒有實現,線上測試的時候經常會出現ERROR,去代碼一看結果是沒有實現。三年前作者的博客中就說要實現測試結果導出功能,結果現在還沒實現。我已經給作者 tl@pingcap.com 發郵件了,等待回復。

Reference

https://github.com/pingcap/go-ycsb

https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM