一次讀鎖重入導致的死鎖故障


在兩天前第一次遇到自己的程序出現死鎖, 我一直非常的小心使用鎖,了解死鎖導致的各種可能性,
這次的經歷讓我未來會更加小心,下面來回顧一下死鎖發生的過程與代碼演進的過程吧。

簡述業務背景及代碼演進過程

我的程序中有一塊緩存,數據會組織好放到內存中,會根據數據源(MySQL)更新而刷新緩存,是讀多寫少的應用場景。
內存中有一個很大數據列表,緩存模塊會按數據維度進行分組,每次訪問根據維度查找到這個列表里面的所有數據。
業務模塊拿到數據后會根據業務需要再做一次篩選,選出N個符合條件的數據(具體多少個由業務模塊的規則決定)。

以下是簡化的代碼:

package cache

import "sync"

type Cache struct {
	lock sync.RWMutex
	data []int // 實際數據比這個復雜很多有很多維度
}

func (c *Cache) Get() []int {
	c.lock.RLock()
	defer c.lock.RUnlock()

	var res []int

	// 篩選數據, 簡單寫一個篩選過程
	for i := range c.data {
		if c.data[i] > 10 {
			res = append(res, c.data[i])
		}
	}

	return res
}

這個方法返回的數據會很多,可實際業務需要的數據只有幾個而已,那做一個優化吧,利用 gochan 實現一個迭代生成器,每次只返回一個數據,業務端找到需要的數據后立即終止。

調整后的方法大致像下面這樣:

package cache

import "sync"

type Cache struct {
	lock sync.RWMutex
	data []int // 實際數據比這個復雜很多有很多維度
}

func (c *Cache) Get(next chan struct{}) chan int {
	ch := make(chan int, 1)

	go func() {
		c.lock.RLock()
		defer c.lock.RUnlock()
		defer close(ch)

		// 篩選數據, 簡單寫一個篩選過程
		for i := range c.data {
			if c.data[i] > 10 {

				ch <- i

				if _, ok := <-next; !ok {
					return
				}
			}
		}
	}()

	return ch
}

調用端的代碼類似下面這樣:

data := make([]int, 0, 10)
c := Cache{}
next := make(chan struct{})
for i := range c.Get(next) {
    data = append(data, i)
    if len(data) >= 10 {
        close(next)
        break
    }

    next <- struct{}{}
}

這樣調整后查看程序的內存分配顯著降低,而且平安無事在生產環境運行了半個月_,當然截止當前還不會出現死鎖的情況。
有一天業務調整了,在 cache 模塊有另外一個方法,公用這個鎖(實際我緩存模塊為了統一,都使用一個鎖,方便管理),下面的代碼也寫到這個 cache 組件里面。

以下代碼只增加了改變的部分,.... 保持原來的代碼不變。

package cache

import "sync"

type Cache struct {
	....
	x int
}

func (c *Cache) XX(i int) int{
	c.lock.RLock()
	defer c.lock.RUnlock()
    
	if  i >c.x {
		return i
	}
	return 0
} 

....

添加一個方法怎么就導致死鎖了呢,主要是調用端的業務代碼也發生變化了,更改如下:

data := make([]int, 0, 10)
c := Cache{}
next := make(chan struct{})
for i := range c.Get(next) {
    data = append(data, i)
    if c.XX(i) != i  { // 在這里調用了緩存模塊的另一個方法
        close(next)
        break
    }

    next <- struct{}{}
}

修改后的代碼上線存活了5天就掛了,實際是當時業務訂單需求很少,只是有很多流量請求,並沒有頻繁訪問這個方法,否者會在極短的時間導致死鎖,
通過這塊簡化的代碼,也很難分析出會導致死鎖,真實的業務代碼很多,而且調用關系比較復雜,我們通過代碼審核並沒有發現任何問題。

事故現場分析排查問題

上線5天后突然接到服務無法響應的報警,事故發生立即查看了 grafana 的監控數據,發現在極段時間內服務器資源消耗極速增長,然后就立即沒有響應了

通過業務監控發現服務在極端的時間打開近10萬個 goroutine 之后持續了很長一段時間,
cpu 占用和 gc 都很正常, 內存方面可以看出短時間內分配了很多內存,但是沒有被釋放,gc 沒法回收說明一直被占用,

看到這里我心里在想可能是有個 goroutine 因為什么原因導致無法結束造成的事故吧,
然后我再往下看(實際頁面是在需要滾動屏幕,第一屏只顯示了上面6個模塊),發現 open files 和 goroutine 的情況一致,並且之后的數據突然中斷,
中斷是因為服務無法影響,也就無法采集服務的信息了。

goroutine 並不會占用 open files,一個http服務導致這種情況大概只能是網絡連接過多,我們遭受攻擊了嗎……
顯然是沒有的不然cpu不能很正常,那就是有可能請求無法響應,什么原因導致呢?

使用 lsof -n | grep dsp | wc -l 命令去服務器查找服務打開文件數,確實在六萬五千多,
通過 cat /proc/30717/limits 發現 Max open files 65535 65535 files
配置的最大打開文件數只有 65535,使用 lsof -n | grep dsp |grep TCP | wc -l 發現數據和之前接近,只小了幾個,那是日志文件占用的。

查看日志發現大量 http: Accept error: accept tcp 172.17.191.231:8090: accept4: too many open files; retrying in 1s 錯誤。

這些數據幫助我快速定位確實是有請求發送到服務器,服務器無法響應導致短時間內占用很多文件打開數,導致系統限制無法建立新的連接。
這里要說一下,即使客戶端斷開連接了,服務器連接還是沒有辦法關閉,因為 goroutine 沒有辦法關閉, 除非自己退出。

找到原因了,服務沒法響應,沒法通過現場查找問題了,先重新啟動一下服務,恢復業務在查找代碼問題。

接下來就是查找代碼問題了,期間又出現了一次故障,立即重啟服務,恢復業務。

分析解決問題

通過幾個小時分析代碼邏輯,終於有了進展,發現上面的示例代碼邏輯塊導致讀鎖重入,存在死鎖風險,這種死鎖的碰撞概率非常低,
之前說過我們的緩存是讀多寫少的場景,如果只是讀取數據,上面的代碼不會有任何問題,我們一天刷新緩存的次數也不過百余次而已。

看一下究竟發生了什么導致的死鎖吧:

  • 程序執行 cache.Get 獲取一個 chan, 在 cache.Get 里面有一個 goroutine 讀取數據只有加了讀寫鎖,只有 goroutine 關閉才會釋放
  • for i := range c.Get(next) { 遍歷 changoroutine 不會結束,也就說讀鎖沒有被釋放
  • 遍歷時執行了 c.XX(i) 方法,在該方面里面也加了讀鎖, 形成了讀鎖重入的場景,但是該放執行周期很短,執行完就會馬上釋放

好吧,這樣的流程並沒有形成死鎖,什么情況下導致的死鎖呢,接着看一下一個場景:

  • 程序執行 cache.Get 獲取一個 chan, 在 cache.Get 里面有一個 goroutine 讀取數據只有加了讀寫鎖,只有 goroutine 關閉才會釋放
  • for i := range c.Get(next) { 遍歷 changoroutine 不會結束,也就說讀鎖沒有被釋放
  • 數據發生了改變,觸發了緩存刷新,申請獨占鎖(寫鎖),等待所有讀鎖釋放
  • 遍歷時執行 c.XX(i) 方法,該方法申請讀鎖,因為寫鎖在等待,所以任何讀鎖都將等待寫鎖釋放后才能添加成功
  • for 循環被阻塞, cache.Get 里面的 goroutine 無法退出,無法釋放讀鎖
  • 寫鎖等待所有讀鎖釋放
  • c.XX(i) 等待寫鎖釋放
  • ....

重點看第三步,這里是關鍵,因為在兩個嵌套的讀鎖中間申請寫鎖,導致死鎖發生,找到原因修復起來很簡單的,

調整 cache.Get 加鎖的方法,把 c.data 賦值給一個臨時變量 data, 在這段代碼前后加鎖和釋放鎖,鎖的代碼塊更小,時間更短

c.data 單獨拷貝是安全的,那怕是指針數據,因為每次刷新緩存都會給 c.data 重新賦值,分配新的內存空間。

package cache

import "sync"

type Cache struct {
	lock sync.RWMutex
	data []int // 實際數據比這個復雜很多有很多維度
	x int
}
    
func (c *Cache) XX(i int) int{
    c.lock.RLock()
    defer c.lock.RUnlock()
    
    if  i >c.x {
        return i
    }
    return 0
} 

func (c *Cache) Get(next chan struct{}) chan int {
	ch := make(chan int, 1)

	go func() {
		defer close(ch)

		c.lock.RLock()
		data := c.data
		c.lock.RUnlock()
		
		// 篩選數據, 簡單寫一個篩選過程
		for i := range data {
			if data[i] > 10 {

				ch <- i

				if _, ok := <-next; !ok {
					return
				}
			}
		}
	}()

	return ch
}

修復之后的業務狀態:

復現問題

用程序復現一下上面的場景可以嗎,好像有點難,我寫了一個簡單的復現代碼,如下:

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var l = sync.RWMutex{}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	c := make(chan int)
	go func() {
		l.RLock() // 讀鎖1
		defer l.RUnlock()
		fmt.Println(1)
		c <- 1
		fmt.Println(2)
		runtime.Gosched()
		fmt.Println(3)
		b()
		fmt.Println(4)
		wg.Done()
	}()

	go func() {
		fmt.Println(5)
		<-c
		fmt.Println(6)
		l.Lock()
		fmt.Println(7)
		fmt.Println(8)
		defer l.Unlock()
		fmt.Println(9)
		wg.Done()
	}()

	go func() {
		i := 1
		for {
			i++
		}
	}()
	wg.Wait()
}

func b() {
	fmt.Println(10)
	l.RLock() // 讀鎖2
	fmt.Println(11)
	defer l.RUnlock()
	fmt.Println(12)
}

這段程序的輸出(受 goroutine 運行時影響在輸出數字3之前會有些許差異):

1
5
6
2
3
10

分析一下這個運行流程吧:

  • 首先加上讀鎖1,就是 fmt.Println(1) 之前, 狀態加讀鎖1
  • 另外一個 goroutine 啟動,fmt.Println(5), 狀態加讀鎖1
  • 發送數據 c <- 1 , 狀態加讀鎖1
  • 接受到數據 <-c fmt.Println(6), 狀態加讀鎖1
  • 輸出 2 fmt.Println(2), 狀態加讀鎖1
  • 暫停當前 goroutine runtime.Gosched() , 狀態加讀鎖1
  • 申請寫鎖 l.Lock(), 等待讀鎖1釋放, 狀態加讀鎖1、寫鎖等待
  • 切換 goroutine 執行 fmt.Println(3)b(), 狀態加讀鎖1、寫鎖等待
  • 輸出10 fmt.Println(10), 申請讀鎖2,等待寫鎖釋放, 狀態加讀鎖1、寫鎖等待、讀鎖2等待
  • 支持程序永久阻塞……

分析讀寫鎖實現

func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

申請寫鎖時會在 rw.readerCount 讀數量變量上自增加 1,如果結果小於 0,當前讀鎖進入修改等待讀鎖喚醒信號,
單獨看着一個方法會比較懵,為啥讀的數量會小於0呢,接着看寫鎖。

func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

申請寫鎖時會先加上互斥鎖,也就是有其它寫的客戶端的話會等待寫鎖釋放才能加上,具體實現看互斥鎖的代碼,
然后在 rw.readerCount 上自增一個極大的負數 1 << 30 , 讀寫鎖這里也就限制了我們的同時讀的進程不能超過這個值。
然后在結果上加上 rwmutexMaxReaders 也就是 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders 得到實際讀客戶端的數量
如果讀的客戶端不等於0,就在 rw.readerWait 自增讀客戶端的數量,之后陷入睡眠,等待 rw.writerSem 喚醒。

分析了這兩段代碼我們就能明白,寫鎖等待或者添加時,讀鎖沒法添加上

func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		if r+1 == 0 || r+1 == -rwmutexMaxReaders {
			race.Enable()
			throw("sync: RUnlock of unlocked RWMutex")
		}
		// A writer is pending.
		if atomic.AddInt32(&rw.readerWait, -1) == 0 {
			// The last reader unblocks the writer.
			runtime_Semrelease(&rw.writerSem, false)
		}
	}
	if race.Enabled {
		race.Enable()
	}
}

釋放讀鎖,先在 rw.readerCount 減 1,然后檢查讀客戶端是否小於0,如果小於0說明有寫鎖在等待,
rw.readerWait 上減1,這個變量記錄的是寫等待讀客戶端的數量,如果沒有需要等待的讀客戶端了,就通知 rw.writerSem 喚醒寫鎖

func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

寫鎖在釋放時會給 rw.readerCount 自增 rwmutexMaxReaders 還原真實讀客戶端數量。
for i := 0; i < int(r); i++ { 用來喚醒所有的讀客戶端,因為在寫鎖的時候,申請讀鎖的客戶端會被計數,但是都會陷入睡眠狀態。

總結

以前特別強調過讀鎖重入導致死鎖的問題,而且這個問題非常難在業務代碼里面復現,觸發幾率很低,
編譯和運行時都無法檢測這種情況,所以千萬不能陷入讀鎖重入的嵌套使用的情況,否者問題非常難以排查。

關於加鎖的幾個小經驗:

  • 運行時離開當前邏輯就釋放鎖。
  • 鎖的粒度越小越好,加鎖后盡快釋放鎖。
  • 盡量不用 defer 釋放鎖。
  • 讀鎖不要嵌套。

轉載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM