本篇文章開篇會介紹一下Go-YCSB是如何使用,然后按照慣例會分析一下它是如何做基准測試,看看它有什么優缺點。
轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客: https://www.luozhiyun.com/archives/634
最近我們在做數據庫的技術選型,要做選型的話難免需要對數據庫進行一個基准測試,以便可以橫向對比不同數據庫性能。
YCSB,全稱為“Yahoo!Cloud Serving Benchmark”,是雅虎開發的用來對雲服務進行基礎測試的工具,其內部涵蓋了常見的NoSQL數據庫產品,如Cassandra、MongoDB、HBase、Redis等等。
作為一名go開發人員,所以我們使用 pingcap 開發的Go YCSB來進行基准測試。
安裝
首先要保證本地 Go 版本不低於 1.16,然后下載編譯:
git clone https://github.com/pingcap/go-ycsb.git
cd go-ycsb
make
在 bin 文件夾里面就放着我們編譯好的程序 go-ycsb。
我們先來看一下 workloads 文件夾,目錄下有各種workload的模板,可以基於workload模板進行自定義修改。默認的6種測試場景如下:
- workloada:讀寫均衡型,50%/50%,Reads/Writes
- workloadb:讀多寫少型,95%/5%,Reads/Writes
- workloadc:只讀型,100%,Reads
- workloadd:讀最近寫入記錄型,95%/5%,Reads/insert
- workloade:掃描小區間型,95%/5%,scan/insert
- workloadf:讀寫入記錄均衡型,50%/50%,Reads/insert
- workload_template:參數列表模板。
所以我們可以依據不同的 workload 多維度的對系統進行測試。workload里面的操作主要包括:
- Insert:插入一條新的記錄
- Update:更新一條記錄的某一個或者所有 fields
- Read:讀取一條記錄的某一個或者所有 fields
- Scan:隨機從一個 key 開始順序掃描隨機條記錄
在測試的時候,我們還需要根據不同的業務場景來模擬測試,那么可以通過 requestdistribution 控制:
- uniform:隨機選擇一個記錄;
- sequential:按順序選擇記錄;
- zipfian:根據 Zipfian 分布來選擇記錄。大致意思就是互聯網常說的80/20原則,也就是20%的key,會占有80%的訪問量;
- latest:和 Zipfian 類似,但是傾向於訪問新數據明顯多於老數據;
- hotspot:熱點分布訪問;
- exponential:指數分布訪問;
下面我們看一下workload里面可以填哪些參數:
# 目前只實現了這一種
workload=core
# 總記錄數
recordcount=1000000
# 測試階段被操作的記錄數,如果設置了 threadcount,那么每個線程操作的記錄數=operationcount/threadcount
operationcount=3000000
# 線程數
threadcount=500
# 如果一個表里面已經有記錄數了,那么load的數據的時候從這個記錄數開始
insertstart=0
# 一行數據的字段數
fieldcount=10
# 每個字段大小
fieldlength=100
# 是否應該讀取所有字段
readallfields=true
# 是否應該更新所有字段
writeallfields=false
# 字段長度分布
fieldlengthdistribution=constant
#fieldlengthdistribution=uniform
#fieldlengthdistribution=zipfian
# 讀操作概率
readproportion=0.95
# 更新操作概率
updateproportion=0.05
# 插入操作概率
insertproportion=0
# 先讀后寫操作同一條記錄概率
readmodifywriteproportion=0
# 范圍操作的概率
scanproportion=0
# 范圍操作,最大的可操作的記錄數
maxscanlength=1000
# 用來選擇掃描時訪問的記錄數量分布情況
scanlengthdistribution=uniform
#scanlengthdistribution=zipfian
# 記錄應按順序插入還是偽隨機插入
insertorder=hashed
#insertorder=ordered
# 以什么方式模擬測試
requestdistribution=zipfian
#requestdistribution=uniform
#requestdistribution=latest
# 下面這兩種方式時針對requestdistribution為hotspot的時候
# 構成熱點集的數據項的百分比
hotspotdatafraction=0.2
# 訪問熱點集的數據操作百分比
hotspotopnfraction=0.8
# 操作數據的表名
table=usertable
# 延遲測量結果展現形式,暫時沒實現
measurementtype=histogram
測試
比如我們現在要測試 redis 的性能,先寫一個 workload:
recordcount=1000000
operationcount=1000000
workload=core
readallfields=true
readmodifywriteproportion=1
requestdistribution=uniform
redis.addr=127.0.0.1:6379
threadcount=50
上面的這個 workload 表示在 load 的時候會插入100萬條數據到庫里面,操作的數據量也是100萬,但是有50個線程,也就是每個線程實際操作2萬行記錄;
測試方式使用 readmodifywriteproportion,先讀后寫,操作記錄采用 uniform 也就是隨機方式進行。
先 load 數據:
./bin/go-ycsb load redis -P workloads/workloada
再運行測試:
./bin/go-ycsb run redis -P workloads/workloada
返回:
READ_MODIFY_WRITE - Takes(s): 18.8, Count: 499312, OPS: 26539.8, Avg(us): 1388, Min(us): 107, Max(us): 42760, 99th(us): 3000, 99.9th(us): 7000, 99.99th(us): 26000
- Takes(s) :表示測試總共耗時;
- Count:操作記錄數;
- OPS:Operates Per Second,一般是操作次數,與qps區別不大;
- Avg、Min、Max:平均、最小、最大單條記錄操作耗時;
- 99th、99.9th、99.99th:P99、P99.9、P99.99時延;
代碼實現分析
當然對於我來說,肯定還是要看一下它的代碼是怎么做的,學習一下大佬是如何寫代碼的對我們工作也是很有幫助。
對於 Go YCSB 來說,它總共有這么幾個組成部分:
- workload:加載初始化配置文件,創建線程執行測試;
- client:封裝了 workload ,配置參數,DB等,用來運行測試;
- db:配置了一堆可被執行的數據庫 client,執行具體的讀寫數據庫;
- measurement:數據統計模塊,統計執行次數,時延等;
我們以 redis 為例先看一下,如果要測試自己的 Database 該怎么辦。
定義 DB
在 Go YCSB 中,所有的 DB 都放在 db 這個目錄下面:
所以,我們可以在這個文件夾下面創建自己的db,然后構造一個 struct ,實現 DB 這個接口:
type DB interface {
ToSqlDB() *sql.DB
Close() error
InitThread(ctx context.Context, threadID int, threadCount int) context.Context
CleanupThread(ctx context.Context)
Read(ctx context.Context, table string, key string, fields []string) (map[string][]byte, error)
Scan(ctx context.Context, table string, startKey string, count int, fields []string) ([]map[string][]byte, error)
Update(ctx context.Context, table string, key string, values map[string][]byte) error
Insert(ctx context.Context, table string, key string, values map[string][]byte) error
Delete(ctx context.Context, table string, key string) error
}
里面定義了具體的 DB 操作。
然后需要定義一個工廠,用來創建這個 DB struct,實現DBCreator接口:
type DBCreator interface {
Create(p *properties.Properties) (DB, error)
}
然后需要定義一個 init 函數,在啟動的時候進行 DBCreator 注冊:
func init() {
ycsb.RegisterDBCreator("redis", redisCreator{})
}
var dbCreators = map[string]DBCreator{}
func RegisterDBCreator(name string, creator DBCreator) {
_, ok := dbCreators[name]
if ok {
panic(fmt.Sprintf("duplicate register database %s", name))
}
dbCreators[name] = creator
}
RegisterDBCreator 會在初始化的時候被調用。用來獲取 init 方法注冊過的 DB。通過這種方式 Go YCSB 實現了 DB 的自定義化。
全局參數初始化
首先 Go YCSB 在運行的時候會使用 cobra 根據傳入的是 load 還是 run 執行到下面兩個不同的方法:
func runLoadCommandFunc(cmd *cobra.Command, args []string) {
runClientCommandFunc(cmd, args, false)
}
func runTransCommandFunc(cmd *cobra.Command, args []string) {
runClientCommandFunc(cmd, args, true)
}
這里會調用到 runClientCommandFunc 函數中:
func runClientCommandFunc(cmd *cobra.Command, args []string, doTransactions bool) {
dbName := args[0]
// 初始化全局參數
initialGlobal(dbName, func() {
doTransFlag := "true"
if !doTransactions {
doTransFlag = "false"
}
globalProps.Set(prop.DoTransactions, doTransFlag)
if cmd.Flags().Changed("threads") {
// We set the threadArg via command line.
globalProps.Set(prop.ThreadCount, strconv.Itoa(threadsArg))
}
if cmd.Flags().Changed("target") {
globalProps.Set(prop.Target, strconv.Itoa(targetArg))
}
if cmd.Flags().Changed("interval") {
globalProps.Set(prop.LogInterval, strconv.Itoa(reportInterval))
}
})
fmt.Println("***************** properties *****************")
for key, value := range globalProps.Map() {
fmt.Printf("\"%s\"=\"%s\"\n", key, value)
}
fmt.Println("**********************************************")
// 初始化 client
c := client.NewClient(globalProps, globalWorkload, globalDB)
start := time.Now()
// 運行測試
c.Run(globalContext)
fmt.Printf("Run finished, takes %s\n", time.Now().Sub(start))
// 測試結果輸出
measurement.Output()
}
參數的初始化主要是在 initialGlobal 里面做的:
func initialGlobal(dbName string, onProperties func()) {
...
go func() {
http.ListenAndServe(addr, nil)
}()
//初始化 measurement
measurement.InitMeasure(globalProps)
if len(tableName) == 0 {
tableName = globalProps.GetString(prop.TableName, prop.TableNameDefault)
}
// 獲取 WorkloadCreator
workloadName := globalProps.GetString(prop.Workload, "core")
workloadCreator := ycsb.GetWorkloadCreator(workloadName)
//創建Workload
var err error
if globalWorkload, err = workloadCreator.Create(globalProps); err != nil {
util.Fatalf("create workload %s failed %v", workloadName, err)
}
// 獲取要被測試的 db
dbCreator := ycsb.GetDBCreator(dbName)
if dbCreator == nil {
util.Fatalf("%s is not registered", dbName)
}
// 創建 db
if globalDB, err = dbCreator.Create(globalProps); err != nil {
util.Fatalf("create db %s failed %v", dbName, err)
}
globalDB = client.DbWrapper{globalDB}
}
這里最主要的是創建 Workload 和 DB。Workload 里面會初始化很多配置文件里面的信息。
運行測試
runClientCommandFunc 里面會調用 client 的 Run 方法執行測試:
func (c *Client) Run(ctx context.Context) {
var wg sync.WaitGroup
threadCount := c.p.GetInt(prop.ThreadCount, 1)
wg.Add(threadCount)
measureCtx, measureCancel := context.WithCancel(ctx)
measureCh := make(chan struct{}, 1)
go func() {
defer func() {
measureCh <- struct{}{}
}()
// 這里很有意思,因為有時候我們做數據庫是需要初始化數據到緩存里面的
// 所以開始的一段時間我們不能計入測試統計中,這里有隔預熱時間,可以通過 warmuptime 配置
if c.p.GetBool(prop.DoTransactions, true) {
dur := c.p.GetInt64(prop.WarmUpTime, 0)
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(dur) * time.Second):
}
}
// 預熱完畢
measurement.EnableWarmUp(false)
dur := c.p.GetInt64(prop.LogInterval, 10)
t := time.NewTicker(time.Duration(dur) * time.Second)
defer t.Stop()
for {
select {
// 在運行的時候每隔 10 秒輸出一次統計信息
case <-t.C:
measurement.Output()
case <-measureCtx.Done():
return
}
}
}()
// 做一些初始化的工作,如mysql需要創建表
if err := c.workload.Init(c.db); err != nil {
fmt.Printf("Initialize workload fail: %v\n", err)
return
}
// 根據 threadCount 創建多個線程操作數據庫
for i := 0; i < threadCount; i++ {
go func(threadId int) {
defer wg.Done()
// 初始化 worker
w := newWorker(c.p, threadId, threadCount, c.workload, c.db)
ctx := c.workload.InitThread(ctx, threadId, threadCount)
ctx = c.db.InitThread(ctx, threadId, threadCount)
// 開始跑測試
w.run(ctx)
// 跑完測試做清理工作
c.db.CleanupThread(ctx)
c.workload.CleanupThread(ctx)
}(i)
}
// 等待測試跑完
wg.Wait()
measureCancel()
<-measureCh
}
這里分為兩個部分:第一部分是創建一個線程,這個線程會控制是否開始測試統計,然后會每隔10秒輸出一次統計信息;第二部分是根據設置的 threadcount 創建線程,運行 Worker 運行測試;
newWorker 的時候會根據 operationcount 設置 totalOpCount 表示總共需要執行次數,用 totalOpCount / int64(threadCount)
設置 opCount 表示 單線程操作的記錄數。
func (w *worker) run(ctx context.Context) {
// 將線程操作分散開來,這樣它們就不會同時擊中DB了。
if w.targetOpsPerMs > 0.0 && w.targetOpsPerMs <= 1.0 {
time.Sleep(time.Duration(rand.Int63n(w.targetOpsTickNs)))
}
startTime := time.Now()
// 循環直到操作數達到 opsDone
for w.opCount == 0 || w.opsDone < w.opCount {
var err error
opsCount := 1
// 這里是執行基准測試
if w.doTransactions {
if w.doBatch {
err = w.workload.DoBatchTransaction(ctx, w.batchSize, w.workDB)
opsCount = w.batchSize
} else {
err = w.workload.DoTransaction(ctx, w.workDB)
}
// 這里是執行 load 數據
} else {
if w.doBatch {
err = w.workload.DoBatchInsert(ctx, w.batchSize, w.workDB)
opsCount = w.batchSize
} else {
err = w.workload.DoInsert(ctx, w.workDB)
}
}
// 預熱完了會進行操作次數的統計
if measurement.IsWarmUpFinished() {
w.opsDone += int64(opsCount)
w.throttle(ctx, startTime)
}
select {
case <-ctx.Done():
return
default:
}
}
}
基准測試的具體執行是交給 workload 的 DoTransaction 方法來判斷執行。
func (c *core) DoTransaction(ctx context.Context, db ycsb.DB) error {
state := ctx.Value(stateKey).(*coreState)
r := state.r
// 根據會根據不同的測試場景,進入到不同的測試分支
// Next 方法會根據設置的 readproportion、updateproportion、 scanproportion等概率來獲取相應操作類型
operation := operationType(c.operationChooser.Next(r))
switch operation {
case read:
return c.doTransactionRead(ctx, db, state)
case update:
return c.doTransactionUpdate(ctx, db, state)
case insert:
return c.doTransactionInsert(ctx, db, state)
case scan:
return c.doTransactionScan(ctx, db, state)
default:
return c.doTransactionReadModifyWrite(ctx, db, state)
}
}
這里會調用 operationChooser 的 Next 方法來判斷該執行那個指令,執行指令的概率是我們在配置文件里面設置好的。
這個算法很簡單,在初始化 operationChooser 會將設置的參數readproportion、updateproportion、 scanproportion的值以數組的形式 add 到 operationChooser 的 values 里面,然后隨機一個 0~1的小數,檢查這個隨機數落在哪個范圍就好了:
func (d *Discrete) Next(r *rand.Rand) int64 {
sum := float64(0)
for _, p := range d.values {
sum += p.Weight
}
// 隨機一個 0~1的小數
val := r.Float64()
for _, p := range d.values {
pw := p.Weight / sum
if val < pw {
d.SetLastValue(p.Value)
return p.Value
}
val -= pw
}
panic("oops, should not get here.")
}
在代碼實現上就是按照上面說的,將所有 values 的值加起來得到 sum,然后計算每個 value 的占比是否達到隨機數值。
最后我們再來看看 doTransactionRead 是怎么執行的:
func (c *core) doTransactionRead(ctx context.Context, db ycsb.DB, state *coreState) error {
r := state.r
// 根據我們設置的 requestdistribution 獲取一個 key 值
keyNum := c.nextKeyNum(state)
keyName := c.buildKeyName(keyNum)
//被讀取的字段
var fields []string
if !c.readAllFields {
// 如果不是讀取所有字段,那么根據fieldChooser字段選擇器選擇一個字段執行
fieldName := state.fieldNames[c.fieldChooser.Next(r)]
fields = append(fields, fieldName)
} else {
fields = state.fieldNames
}
//調用 db 的read方法
values, err := db.Read(ctx, c.table, keyName, fields)
if err != nil {
return err
}
//校驗數據完整性
if c.dataIntegrity {
c.verifyRow(state, keyName, values)
}
return nil
}
這里首先會調用 nextKeyNum 去獲取 key 值,這里的 key 會根據我們設置的 requestdistribution 參數根據一定的規則獲取到。然后校驗完需要讀哪些字段后調用 DbWrapper 的 Read 方法讀取數據。
func (db DbWrapper) Read(ctx context.Context, table string, key string, fields []string) (_ map[string][]byte, err error) {
start := time.Now()
defer func() {
// 進行測試數據統計
measure(start, "READ", err)
}()
return db.DB.Read(ctx, table, key, fields)
}
DbWrapper 會封裝一層,用 defer 方法調用 measure 進行統計。
不過這里我有問題是在讀取數據的時候通過還會根據傳入的 fields 來進行解析,這樣也會損耗一些性能,不知是否合理,如redis 的 Read 方法:
func (r *redis) Read(ctx context.Context, table string, key string, fields []string) (map[string][]byte, error) {
data := make(map[string][]byte, len(fields))
res, err := r.client.Get(table + "/" + key).Result()
if err != nil {
return nil, err
}
// 反序列化
err = json.Unmarshal([]byte(res), &data)
if err != nil {
return nil, err
}
// TODO: filter by fields
return data, err
}
數據統計
每一次操作完畢之后都會調用到 measure 方法,進行測試數據統計。
func measure(start time.Time, op string, err error) {
// 計算耗時
lan := time.Now().Sub(start)
if err != nil {
measurement.Measure(fmt.Sprintf("%s_ERROR", op), lan)
return
}
measurement.Measure(op, lan)
}
統計信息由於是會有多個線程同時操作,所以需要使用線程安全的方式進行操作:
func (h *histogram) Measure(latency time.Duration) {
// 這里是 us 微秒
n := int64(latency / time.Microsecond)
atomic.AddInt64(&h.sum, n)
atomic.AddInt64(&h.count, 1)
// 這里轉為毫秒ms
bound := int(n / h.boundInterval)
// boundCounts 是一個並發map,用來統計每個時間段(單位:ms)中有多少次操作
h.boundCounts.Upsert(bound, 1, func(ok bool, existedValue int64, newValue int64) int64 {
if ok {
return existedValue + newValue
}
return newValue
})
// 設置最小時延
for {
oldMin := atomic.LoadInt64(&h.min)
if n >= oldMin {
break
}
if atomic.CompareAndSwapInt64(&h.min, oldMin, n) {
break
}
}
// 設置最大時延
for {
oldMax := atomic.LoadInt64(&h.max)
if n <= oldMax {
break
}
if atomic.CompareAndSwapInt64(&h.max, oldMax, n) {
break
}
}
}
統計每個時間段(單位:ms)內操作的次數是使用 boundCounts,它是 Go YCSB 自己實現的 ConcurrentMap 保證線程安全,用來統計單位時間內操作的次數;
最大和最小時延是通過 CAS 進行操作的,也是為了保證線程安全。
統計完之后會調用 getInfo 計算耗時:
func (h *histogram) getInfo() map[string]interface{} {
min := atomic.LoadInt64(&h.min)
max := atomic.LoadInt64(&h.max)
sum := atomic.LoadInt64(&h.sum)
count := atomic.LoadInt64(&h.count)
bounds := h.boundCounts.Keys()
sort.Ints(bounds)
avg := int64(float64(sum) / float64(count))
per99 := 0
per999 := 0
per9999 := 0
opCount := int64(0)
// 計算 P99,P99.9,P99.99
// 這里實際上是統計一個占比
// bound 里面會保存每毫秒有多少次操作
for _, bound := range bounds {
boundCount, _ := h.boundCounts.Get(bound)
opCount += boundCount
per := float64(opCount) / float64(count)
// 這里是 99% 的操作是落在哪個時間區間內
if per99 == 0 && per >= 0.99 {
per99 = (bound + 1) * 1000
}
if per999 == 0 && per >= 0.999 {
per999 = (bound + 1) * 1000
}
if per9999 == 0 && per >= 0.9999 {
per9999 = (bound + 1) * 1000
}
}
// 計算整個測試耗時
elapsed := time.Now().Sub(h.startTime).Seconds()
// 計算單位耗時內操作次數
qps := float64(count) / elapsed
res := make(map[string]interface{})
res[ELAPSED] = elapsed
res[COUNT] = count
res[QPS] = qps
res[AVG] = avg
res[MIN] = min
res[MAX] = max
res[PER99TH] = per99
res[PER999TH] = per999
res[PER9999TH] = per9999
return res
}
這里的 per99、per999、per9999 實際上精度只有毫秒,是為了做直方圖導出而設計的(然后作者在這個項目已經過去3年了,還沒加上這個功能)。
總結
通過上面的分析可以發現, Go YCSB 設計還是很精妙的,通過很少的代碼就可以進行 DB 的擴展;配置也是相當靈活,可以根據不同的 requestdistribution 提供了不同的測試環境,並且在測試中也可以隨意的調整讀寫概率,保證可以盡可能的模擬線上的環境。
但是它也有很多不足,一方面是文檔很不充分,基本上就寫了幾個參數配置;另一方面就是很多功能都沒有實現,線上測試的時候經常會出現ERROR,去代碼一看結果是沒有實現。三年前作者的博客中就說要實現測試結果導出功能,結果現在還沒實現。我已經給作者 tl@pingcap.com 發郵件了,等待回復。
Reference
https://github.com/pingcap/go-ycsb
https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload