go學習筆記 sync/once源碼 和 sync/WaitGroup源碼


sync.Once.Do(f func())是一個挺有趣的東西,能保證once只執行一次,無論你是否更換once.Do(xx)這里的方法,這個sync.Once塊只會執行一次。

package sync
 
import (
    "sync/atomic"
)
 
// Once is an object that will perform exactly one action.
type Once struct {
    // done indicates whether the action has been performed.
    // It is first in the struct because it is used in the hot path.
    // The hot path is inlined at every call site.
    // Placing done first allows more compact instructions on some architectures (amd64/x86),
    // and fewer instructions (to calculate offset) on other architectures.
    done uint32 // 初始值為0表示還未執行過,1表示已經執行過
    m    Mutex
}
 
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
//     var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
//     config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
    // Note: Here is an incorrect implementation of Do:
    //
    //    if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
    //        f()
    //    }
    //
    // Do guarantees that when it returns, f has finished.
    // This implementation would not implement that guarantee:
    // given two simultaneous calls, the winner of the cas would
    // call f, and the second would return immediately, without
    // waiting for the first's call to f to complete.
    // This is why the slow path falls back to a mutex, and why
    // the atomic.StoreUint32 must be delayed until after f returns.
    // 每次一進來先讀標識位 0 標識沒有被執行過,1 標識已經被執行過
    if atomic.LoadUint32(&o.done) == 0 {
        // Outlined slow-path to allow inlining of the fast-path.
        o.doSlow(f)
    }
}
 
func (o *Once) doSlow(f func()) {
    o.m.Lock() // 施加互斥鎖
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

從上面我們可以看出,once只有一個 Do 方法;once的結構體中只定義了兩個字段:一個mutex的m,一個代表標識位的done。

下面我們來看看Do方法的流程:

WaitGroup用於等待一組線程的結束。父線程調用Add 方法來設定應等待的線程數量。每個被等待的線程在結束時應調用Done方法。同時,主線程里可以調用wait方法阻塞至所有線程結束。 注意:Add和創建協程的數量一定要匹配,否則會產出panic
主要函數:
func (wg *WaitGroup) Add(delta int):等待協程的數量。
func (wg *WaitGroup) Done(): 減少waitgroup線程等待線程數量的值,一般在協程完成之后執行。
func (wg *WaitGroup) Wait():wait方法一般在主線程調用,阻塞直到group計數減少為0。

 
package sync
 
import (
    "internal/race"
    "sync/atomic"
    "unsafe"
)
 
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
    noCopy noCopy // noCopy可以嵌入到結構中,在第一次使用后不可復制
 
    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    // 64 bit:高32 bit是計數器,低32位是 阻塞的goroutine計數。
    // 64位的原子操作需要64位的對齊,但是32位。
    // 編譯器不能確保它,所以分配了12個byte對齊的8個byte作為狀態。其他4個作為信號量
    state1 [3]uint32
}
 
// uintptr和unsafe.Pointer的區別就是:unsafe.Pointer只是單純的通用指針類型,用於轉換不同類型指針,它不可以參與指針運算;
// 而uintptr是用於指針運算的,GC 不把 uintptr 當指針,也就是說 uintptr 無法持有對象,uintptr類型的目標會被回收。
// state()函數可以獲取到wg.state1數組中元素組成的二進制對應的十進制的值 和信號量
// 根據編譯器位數,獲得標志位和等待次數的數據域
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
       // 是否是 64位機器:因為64位機器站高8位 信號量在后面
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        // 如果是 32位機器,型號量在最前面
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}
 
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
    // 獲取到wg.state1數組中元素組成的二進制對應的十進制的值的指針 和信號量
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    // 將標記為加delta 因為高32位是計數器 所以把 delta的值左移32位,並從數組的首元素處開始賦值
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32) // 獲取計數器的值:轉int32
    //獲得調用 wait()等待次數:轉uint32
    w := uint32(state)
    if race.Enabled && delta > 0 && v == int32(delta) {
        // The first increment must be synchronized with Wait.
        // Need to model this as a read, because there can be
        // several concurrent wg.counter transitions from 0.
        race.Read(unsafe.Pointer(semap))
    }
    // 計數器為負數,報panic
    //標記位不能小於0(done過多或者Add()負值太多)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    // 不能Add 與Wait 同時調用
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Add 完畢
    if v > 0 || w == 0 {
        return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    // 當等待計數器> 0時,而goroutine將設置為0。
    // 此時不可能有同時發生的狀態突變:
    // - Add()不能與 Wait() 同時發生,
    // - 如果計數器counter == 0,不再增加等待計數器
    // 不能Add 與Wait 同時調用
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    *statep = 0 // 所有狀態位清零
    for ; w != 0; w-- {
        // 目的是作為一個簡單的wakeup原語,以供同步使用。true為喚醒排在等待隊列的第一個goroutine
        runtime_Semrelease(semap, false, 0)
    }
}
 
// Done decrements the WaitGroup counter by one.
// Done方法其實就是Add(-1)
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}
 
// Wait blocks until the WaitGroup counter is zero.
// Wait 會一直阻塞到 計數器值為0為止
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
    //循環檢查計數器V啥時候等於0
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
        // 尚有未執行完的go程,等待標志位+1(直接在低位處理,無需移位)
        // 增加等待goroution計數,對低32位加1,不需要移位
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
            // 目的是作為一個簡單的sleep原語,以供同步使用
            runtime_Semacquire(semap)
            // 在上一次Wait返回之前重新使用WaitGroup,即在之前的Done 中沒有清空 計數量就會有問題
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

Add:

Wait:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM