場景
某些函數調用頻繁,但其計算卻非常耗時,為了避免每次調用時都重新計算一遍,我們需要保存函數的計算結果,這樣在對函數進行調用的時候,只需要計算一次,之后的調用可以直接從緩存中返回計算結果。
使用下面的httpGetBody()
作為我們需要緩存的函數樣例。
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body) // ReadAll會返回兩個結果,一個[]byte數組和一個錯誤
}
要求
緩存的設計要求是並發安全的,並且要盡量高效。
版本1:使用互斥量實現並發安全
版本1
// Func 是待緩存的函數(即key)
type Func func(key string) (interface{}, error)
// Result 作為緩存結果(即value)
type result struct {
value interface{}
err error
}
// 緩存通過調用 f 函數得到的結果
type Memo struct {
f Func
cache map[string]result
}
func NewMemo(f Func) *Memo {
memo := &Memo{f, make(map[string]result)}
return memo
}
// Get方法,線程不安全
func (memo *Memo) Get(url string) (interface{}, error) {
res, ok := memo.cache[url]
if !ok { // 如果緩存中不存在,通過調用memo中的f函數計算出結果,並把結果緩存起來
res.value, res.err = memo.f(url)
memo.cache[url] = res
}
return res.value, res.err
}
Memo實例會記錄需要緩存的函數f(類型為Func),以及緩存內容(里面是一個string到result映射的map)。
這是一個最簡單的實現,由於沒有加鎖,是線程不安全的。我們先對其進行簡單的測試。測試函數如下:
var urls = []string {
"https://www.nowcoder.com/",
"https://www.nowcoder.com/contestRoom",
"https://www.nowcoder.com/interview/ai/index",
"https://www.nowcoder.com/courses",
"https://www.nowcoder.com/recommend",
"https://www.nowcoder.com/courses", // 重復的url,測試緩存效果
"https://www.nowcoder.com/contestRoom", // 重復的url,測試緩存效果
}
// 單個goroutine,順序調用
func TestMemoSingle(t *testing.T) {
m := NewMemo(httpGetBody)
totalTime := time.Now()
for _, url := range urls {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Println(err)
}
fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}
fmt.Printf("total time used: %s\n", time.Since(totalTime))
}
// 並發調用
// 使用 sync.WaitGroup 來等待所有的請求都完成再返回
func TestMemoConcurrency(t *testing.T) {
m := NewMemo(httpGetBody)
var group sync.WaitGroup
totalTime := time.Now()
for _, url := range urls {
group.Add(1)
go func(url string) {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Println(err)
}
fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
group.Done() // equals ==> group.Add(-1)
}(url)
}
group.Wait()
fmt.Printf("total time used: %s\n", time.Since(totalTime))
}
首先測試單個goroutine順序執行的情況,測試結果如下:
$ go test -v -run=TestMemoSingle
=== RUN TestMemoSingle
https://www.nowcoder.com/, 289.8287ms, 95378 bytes
https://www.nowcoder.com/contestRoom, 178.8973ms, 71541 bytes
https://www.nowcoder.com/interview/ai/index, 68.9602ms, 21320 bytes
https://www.nowcoder.com/courses, 148.9146ms, 64304 bytes
https://www.nowcoder.com/recommend, 121.932ms, 90666 bytes
https://www.nowcoder.com/courses, 0s, 64304 bytes // 可以看到,本次調用直接從緩存中獲取結果,耗時為0
https://www.nowcoder.com/contestRoom, 0s, 71541 bytes // 同上
total time used: 809.5305ms
--- PASS: TestMemoSingle (0.81s)
PASS
ok _/D_/workspace/GoRepo/gopl/ch9/memo1 1.546s
可以清楚的看到,當訪問之前已經被訪問過的 url 時,可以立刻從緩存中返回結果。我們再來試試看並發訪問的情況。
$ go test -v -run=TestMemoConcurrency
=== RUN TestMemoConcurrency
https://www.nowcoder.com/interview/ai/index, 252.8542ms, 21320 bytes
https://www.nowcoder.com/, 253.8524ms, 95378 bytes
https://www.nowcoder.com/recommend, 279.8401ms, 90666 bytes
https://www.nowcoder.com/courses, 280.8377ms, 64304 bytes
https://www.nowcoder.com/courses, 318.8194ms, 64304 bytes
https://www.nowcoder.com/contestRoom, 359.7913ms, 71541 bytes
https://www.nowcoder.com/contestRoom, 404.7649ms, 71541 bytes
total time used: 404.7649ms
--- PASS: TestMemoConcurrency (0.40s)
PASS
ok _/D_/workspace/GoRepo/gopl/ch9/memo1 3.034s
並發訪問時(請多測試幾次),可以看到,總的用時比單個gouroutine順序訪問時少了差不多一半。但訪問相同 url 時似乎沒有達到緩存的效果。原因很簡單嘛,我們在實現Get()
方法時,沒有加鎖限制,因此多個goroutine可能同時訪問memo實例,也就是出現了數據競爭。
在 Go 中,我們可以利用-race
標簽,它能幫助我們識別代碼中是否出現了數據競爭。比如:
$ go test -v -race -run=TestMemoConcurrency
=== RUN TestMemoConcurrency
...
==================
WARNING: DATA RACE
Write at 0x00c000078cc0 by goroutine 10: // 在 goroutine 10 中寫入
runtime.mapassign_faststr()
D:/soft/Go/src/runtime/map_faststr.go:202 +0x0
_/D_/workspace/GoRepo/gopl/ch9/memo1.(*Memo).Get()
D:/workspace/GoRepo/gopl/ch9/memo1/memo.go:40 +0x1d5
_/D_/workspace/GoRepo/gopl/ch9/memo1.TestMemoConcurrency.func1()
D:/workspace/GoRepo/gopl/ch9/memo1/memo_test.go:46 +0x96
Previous write at 0x00c000078cc0 by goroutine 7: // 在 goroutine 7 中也出現寫入
runtime.mapassign_faststr()
D:/soft/Go/src/runtime/map_faststr.go:202 +0x0
_/D_/workspace/GoRepo/gopl/ch9/memo1.(*Memo).Get()
D:/workspace/GoRepo/gopl/ch9/memo1/memo.go:40 +0x1d5
_/D_/workspace/GoRepo/gopl/ch9/memo1.TestMemoConcurrency.func1()
D:/workspace/GoRepo/gopl/ch9/memo1/memo_test.go:46 +0x96
...
FAIL
exit status 1
FAIL _/D_/workspace/GoRepo/gopl/ch9/memo1 0.699s
可以看到,memo.go 的第40行(對應memo.cache[url] = res
)出現了2次,說明有兩個goroutine在沒有同步干預的情況下更新了cache map。這表明Get不是並發安全的,存在數據競爭。
OK,那我們就設法對Get()
方法進行加鎖(mutex),最粗暴的方式莫過於如下:
// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
memo.mu.Unlock()
return res.value, res.err
}
這樣做當然實現了所謂的“並發安全”,但是也失去了“並發性”,每次對f的調用期間都會持有鎖,Get將本來可以並行運行的I/O操作串行化了。顯然,這不是我們所希望的。
我們試圖降低鎖的粒度,查找階段獲取一次,如果查找沒有返回任何內容,那么進入更新階段會再次獲取。在這兩次獲取鎖的中間階段,其它goroutine可以隨意使用cache。
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
// Between the two critical sections, several goroutines
// may race to compute f(key) and update the map.
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}
這種實現在兩個以上的goroutine同一時刻調用Get來請求同樣的URL時,會導致同樣的url被重復計算。多個goroutine一起查詢cache,發現沒有值,然后一起調用f這個慢不拉嘰的函數。在得到結果后,也都會去更新map。其中一個獲得的結果會覆蓋掉另一個的結果。理想情況下是應該避免掉多余的工作的,這種“避免”工作一般被稱為duplicate suppression(重復抑制/避免)。
版本2:使用“互斥量+channel”實現並發安全機制
該版本的Memo每一個map元素都是指向一個條目的指針。每一個條目包含對函數f的調用結果。與之前不同的是這次entry還包含了一個叫ready的channel。在條目的結果被設置之后,這個channel就會被關閉,以向其它goroutine廣播——“現在去讀取該條目內的結果是安全的了”。
// Func 是待緩存的函數,作為 key
type Func func(key string) (interface{}, error)
// entry 作為緩存的 value, 除了包含一個結果result,還包含一個channel
type entry struct {
res result
ready chan struct{}
}
type result struct {
value interface{}
err error
}
// 緩存通過調用 f 函數得到的結果
type Memo struct {
f Func
mu sync.Mutex
cache map[string]*entry
}
func NewMemo(f Func) *Memo {
memo := &Memo{f: f, cache: make(map[string]*entry)}
return memo
}
// 使用一個互斥量(即 unbuffered channel)來保護多個goroutine調用Get時的共享map變量
func (memo *Memo) Get(url string) (interface{}, error) {
memo.mu.Lock()
e := memo.cache[url]
if e == nil {
// 如果查詢結果為空,說明這是對該url的第一次查詢
// 因此,讓這個goroutine負責計算這個url對應的值
// 當計算好后,再廣播通知所有的其他goroutine,
// 告訴它們這個url對應的緩存已經存在了,可以直接取用
e = &entry{ready: make(chan struct{})}
memo.cache[url] = e // 注意這里只是存入了一個“空的”條目,真正的結果還沒計算出來
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(url)
close(e.ready) // broadcast ready condition
} else {
// 如果查詢到結果非空,則立馬先把鎖給釋放掉
memo.mu.Unlock()
// 但是要注意,這里的非空並不代表馬上就可以返回結果
// 因為有可能是其他goroutine還在計算中
// 因此要等待ready condition
<-e.ready
}
return e.res.value, e.res.err
}
獲取互斥鎖來保護共享變量cache map,查詢map中是否存在指定條目,如果沒有找到,那么分配空間插入一個新條目,釋放互斥鎖。如果條目存在但其值並沒有寫入完成時(也就是有其它的goroutine在調用 f 這個慢函數),goroutine則必須等待ready之后才能讀到條目的結果。ready condition由一個無緩存channel來實現,對無緩存channel的讀取操作(即<-e.ready
)在channel關閉之前一直是阻塞。
如果沒有條目的話,需要向map中插入一個沒有准備好的條目,當前正在調用的goroutine就需要負責調用慢函數、更新條目以及向其它所有goroutine廣播條目已經ready可讀的消息了。
條目中的e.res.value和e.res.err變量是在多個goroutine之間共享的。創建條目的goroutine同時也會設置條目的值,其它goroutine在收到"ready"的廣播消息之后立刻會去讀取條目的值。盡管會被多個goroutine同時訪問,但卻並不需要互斥鎖。ready channel的關閉一定會發生在其它goroutine接收到廣播事件之前,因此第一個goroutine對這些變量的寫操作是一定發生在這些讀操作之前的。不會發生數據競爭。
版本3:通過goroutine通信實現並發安全
在版本2的實現中,我們使用了一個互斥量來保護多個goroutine調用Get時的共享變量map。在Go中,還有另外一種設計方案——把共享變量map限制在一個單獨的goroutine中(我們稱這樣的goroutine為monitor goroutine),對緩存的查詢和寫入均通過monitor goroutine進行。
Func、result和entry的聲明和之前保持一致,這里不再重復。Memo類型的定義則做了很大的改動,只包含了一個叫做requests
的channel,Get的調用者用這個channel來和monitor goroutine通信。
type request struct {
url string
// 負責發送響應結果, 只發送, 不接收
response chan<- result
}
type Memo struct {
requests chan request
}
func NewMemo(f Func) *Memo {
memo := &Memo{requests:make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(url string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{url, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() {
close(memo.requests)
}
上面的Get方法,會創建一個response channel,把它放進request結構中,然后發送給monitor goroutine,然后馬上又會接收它。
cache變量被限制在了monitor goroutine中,即server()
函數,下面會看到。monitor會在循環中一直讀取請求,直到request channel被Close方法關閉。每一個請求都會去查詢cache,如果沒有找到條目的話,那么就會創建/插入一個新的條目。
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.url]
if e == nil {
// this is the first request for this url
e = &entry{ready: make(chan struct{})}
cache[req.url] = e
go e.call(f,req.url)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, url string) {
// Evaluate the function.
e.res.value, e.res.err = f(url)
// broadcast ready condition
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// wait for the ready condition
<-e.ready
// send the result to the client
response <- e.res
}
和基於互斥量的版本類似,第一個對某個key的請求需要負責去調用函數f並傳入這個key,將結果存在條目里,並關閉ready channel來廣播條目的ready消息。使用(*entry).call
來完成上述工作。
緊接着對同一個key的請求會發現map中已經有了存在的條目,然后會等待結果變為ready,並將結果從response發送給客戶端的goroutien。上述工作是用(*entry).deliver
來完成的。對call和deliver方法的調用必須讓它們在自己的goroutine中進行以確保monitor goroutines不會因此而被阻塞住而沒法處理新的請求。
總結
在Go中,我們可以通過使用互斥量(加鎖),或者通信來建立並發程序。后者實現起來會難一些,初學也比較難理解。我也理解不深,暫記錄於此。
本文是對《The Go Programming Language》 9.7 節的學習筆記,大家去看原文吧~