Go實現線程安全的緩存


場景

某些函數調用頻繁,但其計算卻非常耗時,為了避免每次調用時都重新計算一遍,我們需要保存函數的計算結果,這樣在對函數進行調用的時候,只需要計算一次,之后的調用可以直接從緩存中返回計算結果。

使用下面的httpGetBody()作為我們需要緩存的函數樣例。

func httpGetBody(url string) (interface{}, error) {
	resp, err := http.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	return ioutil.ReadAll(resp.Body) // ReadAll會返回兩個結果,一個[]byte數組和一個錯誤
}

要求

緩存的設計要求是並發安全的,並且要盡量高效。

版本1:使用互斥量實現並發安全

版本1

// Func 是待緩存的函數(即key)
type Func func(key string) (interface{}, error)
// Result 作為緩存結果(即value)
type result struct {
	value interface{}
	err error
}
// 緩存通過調用 f 函數得到的結果
type Memo struct {
	f Func
	cache map[string]result
}

func NewMemo(f Func) *Memo {
	memo := &Memo{f, make(map[string]result)}
	return memo
}

// Get方法,線程不安全
func (memo *Memo) Get(url string) (interface{}, error) {
	res, ok := memo.cache[url]
	if !ok { // 如果緩存中不存在,通過調用memo中的f函數計算出結果,並把結果緩存起來
		res.value, res.err = memo.f(url)
		memo.cache[url] = res
	}
	return res.value, res.err
}

Memo實例會記錄需要緩存的函數f(類型為Func),以及緩存內容(里面是一個string到result映射的map)。

這是一個最簡單的實現,由於沒有加鎖,是線程不安全的。我們先對其進行簡單的測試。測試函數如下:

var urls = []string {
	"https://www.nowcoder.com/",
	"https://www.nowcoder.com/contestRoom",
	"https://www.nowcoder.com/interview/ai/index",
	"https://www.nowcoder.com/courses",
	"https://www.nowcoder.com/recommend",
	"https://www.nowcoder.com/courses",     // 重復的url,測試緩存效果
	"https://www.nowcoder.com/contestRoom", // 重復的url,測試緩存效果
}

// 單個goroutine,順序調用
func TestMemoSingle(t *testing.T) {
	m := NewMemo(httpGetBody)
	totalTime := time.Now()
	for _, url := range urls {
		start := time.Now()
		value, err := m.Get(url)
		if err != nil {
			log.Println(err)
		}
		fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
	}
	fmt.Printf("total time used: %s\n", time.Since(totalTime))
}

// 並發調用
// 使用 sync.WaitGroup 來等待所有的請求都完成再返回
func TestMemoConcurrency(t *testing.T) {
	m := NewMemo(httpGetBody)
	var group sync.WaitGroup
	totalTime := time.Now()
	for _, url := range urls {
		group.Add(1)

		go func(url string) {
			start := time.Now()
			value, err := m.Get(url)
			if err != nil {
				log.Println(err)
			}
			fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))

			group.Done() // equals ==> group.Add(-1)
		}(url)
	}
	group.Wait()
	fmt.Printf("total time used: %s\n", time.Since(totalTime))
}

首先測試單個goroutine順序執行的情況,測試結果如下:

$ go test -v -run=TestMemoSingle
=== RUN   TestMemoSingle
https://www.nowcoder.com/, 289.8287ms, 95378 bytes
https://www.nowcoder.com/contestRoom, 178.8973ms, 71541 bytes
https://www.nowcoder.com/interview/ai/index, 68.9602ms, 21320 bytes
https://www.nowcoder.com/courses, 148.9146ms, 64304 bytes
https://www.nowcoder.com/recommend, 121.932ms, 90666 bytes
https://www.nowcoder.com/courses, 0s, 64304 bytes     // 可以看到,本次調用直接從緩存中獲取結果,耗時為0
https://www.nowcoder.com/contestRoom, 0s, 71541 bytes // 同上
total time used: 809.5305ms
--- PASS: TestMemoSingle (0.81s)
PASS
ok      _/D_/workspace/GoRepo/gopl/ch9/memo1    1.546s

可以清楚的看到,當訪問之前已經被訪問過的 url 時,可以立刻從緩存中返回結果。我們再來試試看並發訪問的情況。

$ go test -v -run=TestMemoConcurrency
=== RUN   TestMemoConcurrency
https://www.nowcoder.com/interview/ai/index, 252.8542ms, 21320 bytes
https://www.nowcoder.com/, 253.8524ms, 95378 bytes
https://www.nowcoder.com/recommend, 279.8401ms, 90666 bytes
https://www.nowcoder.com/courses, 280.8377ms, 64304 bytes
https://www.nowcoder.com/courses, 318.8194ms, 64304 bytes
https://www.nowcoder.com/contestRoom, 359.7913ms, 71541 bytes
https://www.nowcoder.com/contestRoom, 404.7649ms, 71541 bytes
total time used: 404.7649ms
--- PASS: TestMemoConcurrency (0.40s)
PASS
ok      _/D_/workspace/GoRepo/gopl/ch9/memo1    3.034s

並發訪問時(請多測試幾次),可以看到,總的用時比單個gouroutine順序訪問時少了差不多一半。但訪問相同 url 時似乎沒有達到緩存的效果。原因很簡單嘛,我們在實現Get()方法時,沒有加鎖限制,因此多個goroutine可能同時訪問memo實例,也就是出現了數據競爭

在 Go 中,我們可以利用-race標簽,它能幫助我們識別代碼中是否出現了數據競爭。比如:

$ go test -v -race -run=TestMemoConcurrency
=== RUN   TestMemoConcurrency
...
==================
WARNING: DATA RACE
Write at 0x00c000078cc0 by goroutine 10: // 在 goroutine 10 中寫入
  runtime.mapassign_faststr()
      D:/soft/Go/src/runtime/map_faststr.go:202 +0x0
  _/D_/workspace/GoRepo/gopl/ch9/memo1.(*Memo).Get()
      D:/workspace/GoRepo/gopl/ch9/memo1/memo.go:40 +0x1d5
  _/D_/workspace/GoRepo/gopl/ch9/memo1.TestMemoConcurrency.func1()
      D:/workspace/GoRepo/gopl/ch9/memo1/memo_test.go:46 +0x96

Previous write at 0x00c000078cc0 by goroutine 7: // 在 goroutine 7 中也出現寫入
  runtime.mapassign_faststr()
      D:/soft/Go/src/runtime/map_faststr.go:202 +0x0
  _/D_/workspace/GoRepo/gopl/ch9/memo1.(*Memo).Get()
      D:/workspace/GoRepo/gopl/ch9/memo1/memo.go:40 +0x1d5
  _/D_/workspace/GoRepo/gopl/ch9/memo1.TestMemoConcurrency.func1()
      D:/workspace/GoRepo/gopl/ch9/memo1/memo_test.go:46 +0x96
...

FAIL
exit status 1
FAIL    _/D_/workspace/GoRepo/gopl/ch9/memo1    0.699s

可以看到,memo.go 的第40行(對應memo.cache[url] = res)出現了2次,說明有兩個goroutine在沒有同步干預的情況下更新了cache map。這表明Get不是並發安全的,存在數據競爭。

OK,那我們就設法對Get()方法進行加鎖(mutex),最粗暴的方式莫過於如下:

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}

這樣做當然實現了所謂的“並發安全”,但是也失去了“並發性”,每次對f的調用期間都會持有鎖,Get將本來可以並行運行的I/O操作串行化了。顯然,這不是我們所希望的。

我們試圖降低鎖的粒度,查找階段獲取一次,如果查找沒有返回任何內容,那么進入更新階段會再次獲取。在這兩次獲取鎖的中間階段,其它goroutine可以隨意使用cache。

func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)

        // Between the two critical sections, several goroutines
        // may race to compute f(key) and update the map.
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}

這種實現在兩個以上的goroutine同一時刻調用Get來請求同樣的URL時,會導致同樣的url被重復計算。多個goroutine一起查詢cache,發現沒有值,然后一起調用f這個慢不拉嘰的函數。在得到結果后,也都會去更新map。其中一個獲得的結果會覆蓋掉另一個的結果。理想情況下是應該避免掉多余的工作的,這種“避免”工作一般被稱為duplicate suppression(重復抑制/避免)。

版本2:使用“互斥量+channel”實現並發安全機制

該版本的Memo每一個map元素都是指向一個條目的指針。每一個條目包含對函數f的調用結果。與之前不同的是這次entry還包含了一個叫ready的channel。在條目的結果被設置之后,這個channel就會被關閉,以向其它goroutine廣播——“現在去讀取該條目內的結果是安全的了”。

// Func 是待緩存的函數,作為 key
type Func func(key string) (interface{}, error)

// entry 作為緩存的 value, 除了包含一個結果result,還包含一個channel
type entry struct {
	res result
	ready chan struct{}
}

type result struct {
	value interface{}
	err   error
}

// 緩存通過調用 f 函數得到的結果
type Memo struct {
	f     Func
	mu    sync.Mutex
	cache map[string]*entry
}

func NewMemo(f Func) *Memo {
	memo := &Memo{f: f, cache: make(map[string]*entry)}
	return memo
}

// 使用一個互斥量(即 unbuffered channel)來保護多個goroutine調用Get時的共享map變量
func (memo *Memo) Get(url string) (interface{}, error) {
	memo.mu.Lock()
	e := memo.cache[url]
	if e == nil {
		// 如果查詢結果為空,說明這是對該url的第一次查詢
		// 因此,讓這個goroutine負責計算這個url對應的值
		// 當計算好后,再廣播通知所有的其他goroutine,
		// 告訴它們這個url對應的緩存已經存在了,可以直接取用
		e = &entry{ready: make(chan struct{})}
		memo.cache[url] = e // 注意這里只是存入了一個“空的”條目,真正的結果還沒計算出來
		memo.mu.Unlock()

		e.res.value, e.res.err = memo.f(url)

		close(e.ready) // broadcast ready condition
	} else {
		// 如果查詢到結果非空,則立馬先把鎖給釋放掉
		memo.mu.Unlock()

		// 但是要注意,這里的非空並不代表馬上就可以返回結果
		// 因為有可能是其他goroutine還在計算中
		// 因此要等待ready condition
		<-e.ready
	}
	return e.res.value, e.res.err
}

獲取互斥鎖來保護共享變量cache map,查詢map中是否存在指定條目,如果沒有找到,那么分配空間插入一個新條目,釋放互斥鎖。如果條目存在但其值並沒有寫入完成時(也就是有其它的goroutine在調用 f 這個慢函數),goroutine則必須等待ready之后才能讀到條目的結果。ready condition由一個無緩存channel來實現,對無緩存channel的讀取操作(即<-e.ready)在channel關閉之前一直是阻塞。

如果沒有條目的話,需要向map中插入一個沒有准備好的條目,當前正在調用的goroutine就需要負責調用慢函數、更新條目以及向其它所有goroutine廣播條目已經ready可讀的消息了。

條目中的e.res.value和e.res.err變量是在多個goroutine之間共享的。創建條目的goroutine同時也會設置條目的值,其它goroutine在收到"ready"的廣播消息之后立刻會去讀取條目的值。盡管會被多個goroutine同時訪問,但卻並不需要互斥鎖。ready channel的關閉一定會發生在其它goroutine接收到廣播事件之前,因此第一個goroutine對這些變量的寫操作是一定發生在這些讀操作之前的。不會發生數據競爭。

版本3:通過goroutine通信實現並發安全

在版本2的實現中,我們使用了一個互斥量來保護多個goroutine調用Get時的共享變量map。在Go中,還有另外一種設計方案——把共享變量map限制在一個單獨的goroutine中(我們稱這樣的goroutine為monitor goroutine),對緩存的查詢和寫入均通過monitor goroutine進行。

Func、result和entry的聲明和之前保持一致,這里不再重復。Memo類型的定義則做了很大的改動,只包含了一個叫做requests的channel,Get的調用者用這個channel來和monitor goroutine通信。

type request struct {
	url string
	// 負責發送響應結果, 只發送, 不接收
	response chan<- result 
}

type Memo struct {
	requests chan request
}

func NewMemo(f Func) *Memo {
	memo := &Memo{requests:make(chan request)}
	go memo.server(f)
	return memo
}

func (memo *Memo) Get(url string) (interface{}, error) {
	response := make(chan result)
	memo.requests <- request{url, response}
	res := <-response
	return res.value, res.err
}

func (memo *Memo) Close() {
	close(memo.requests)
}

上面的Get方法,會創建一個response channel,把它放進request結構中,然后發送給monitor goroutine,然后馬上又會接收它。

cache變量被限制在了monitor goroutine中,即server()函數,下面會看到。monitor會在循環中一直讀取請求,直到request channel被Close方法關閉。每一個請求都會去查詢cache,如果沒有找到條目的話,那么就會創建/插入一個新的條目。

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range memo.requests {
		e := cache[req.url]
		if e == nil {
			// this is the first request for this url
			e = &entry{ready: make(chan struct{})}
			cache[req.url] = e
			go e.call(f,req.url)
		}
		go e.deliver(req.response)
	}
}

func (e *entry) call(f Func, url string) {
	// Evaluate the function.
	e.res.value, e.res.err = f(url)
	// broadcast ready condition
	close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
	// wait for the ready condition
	<-e.ready
	// send the result to the client
	response <- e.res
}

和基於互斥量的版本類似,第一個對某個key的請求需要負責去調用函數f並傳入這個key,將結果存在條目里,並關閉ready channel來廣播條目的ready消息。使用(*entry).call來完成上述工作。

緊接着對同一個key的請求會發現map中已經有了存在的條目,然后會等待結果變為ready,並將結果從response發送給客戶端的goroutien。上述工作是用(*entry).deliver來完成的。對call和deliver方法的調用必須讓它們在自己的goroutine中進行以確保monitor goroutines不會因此而被阻塞住而沒法處理新的請求。

總結

在Go中,我們可以通過使用互斥量(加鎖),或者通信來建立並發程序。后者實現起來會難一些,初學也比較難理解。我也理解不深,暫記錄於此。


本文是對《The Go Programming Language》 9.7 節的學習筆記,大家去看原文吧~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM