golang 詳解協程——errgroup


我們把一個復雜的任務,尤其是依賴多個微服務 rpc 需要聚合數據的任務,分解為依賴和並行,依賴的意思為: 需要上游 a 的數據才能訪問下游 b 的數據進行組合。但是並行的意思為: 分解為多個小任務並行執行,最終等全部執行完畢。

https://pkg.go.dev/golang.org/x/sync/errgroup 核心原理: 利用 sync.Waitgroup 管理並行執行的 goroutine。 3/10.go

並行工作流

錯誤處理 或者 優雅降級

context 傳播和取消

利用局部變量+閉包

為什么要有sync.errgroup
go支持並發,一般采用的是 channel 、 sync.WaitGroup 、context,來實現各個協程之間的流程控制和消息傳遞。
但是對於開啟的成千上萬的協程,如果在每個協程內都自行去打印 錯誤日志的話,會造成日志分散,不好分析。
所以我們要實現一種能統一處理各個協程錯誤的工具

什么是 sync.errgroup
Go團隊在實驗倉庫中添加了一個名為sync.errgroup的新軟件包。 sync.ErrGroup在sync.WaitGroup功能的基礎上,增加了錯誤傳遞,以及在發生不可恢復的錯誤時取消整個goroutine集合,或者等待超時

主要是利用了 waitgroup,context以及sync.Once,對這三個不熟悉的應先去看下相應的知識點

獲取方法

go get golang.org/x/sync

 

errgroup 的功能

1、處理子協程 error

func main() {
    var g errgroup.Group  // 聲明一個group實例
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {  // 分別獲取網站內容
        url := url // url是局部變量,for循環中對多個協程傳遞值時,需要重新進行賦值
        g.Go(func() error {  // group 的go方法,啟一個協程去執行代碼
            // Fetch the URL.
            resp, err := http.Get(url)
            if err == nil {
                resp.Body.Close()
            }
            return err
        })
    }
    if err := g.Wait(); err == nil {  // group 的wait方法,等待上面的 g.go的協程執行完成,並且可以接受錯誤
        fmt.Println("Successfully fetched all URLs.")
    }
}

 

 

上面這個例子是簡單的利用 errgroup 進行的 waitGroup和error的處理,下面我們對關鍵的代碼做一個分析,並結合源碼來看

var g errgroup.Group

 

聲明一個 group的實例,我們看下 group 包含哪些東西

type Group struct {
    cancel func()

    wg sync.WaitGroup

    errOnce sync.Once
    err     error
}

 

group是一個結構體,包含四個部分

  • cancel 一個取消的函數,主要來包裝context.WithCancel的CancelFunc
  • wg 借助於WaitGroup實現的
  • errOnce 使用sync.Once實現只輸出第一個err
  • err 記錄下錯誤的信息
g.Go(func() error {}

 

 

啟動goroutine 執行代碼
記錄第一個出錯的goroutine的err信息。我們看下源碼

func (g *Group) Go(f func() error) {
    g.wg.Add(1)  // 和WaitGroup 一樣,每執行一個新的g,通過add方法 加1

    go func() {
        defer g.wg.Done() // 執行結束后 調用 Done方法,減1

        if err := f(); err != nil {  // 執行傳入的匿名函數
            g.errOnce.Do(func() {   // 如果匿名函數返回錯誤,會記錄錯誤信息。注意這里用的 once.Do,只執行一次,僅會記錄第一個出現的err
                g.err = err
                if g.cancel != nil {  // 如果初始化的有 cancel 函數,會調用 cancel退出
                    g.cancel()
                }
            })
        }
    }()
}

 

 

再來看下 g.Wait()

func (g *Group) Wait() error {
    g.wg.Wait()  // 和 WaitGroup 一樣,在主線程調用 wait 方法,阻塞等待所有g執行完成
    if g.cancel != nil {  // 如果初始化了 cancel 函數,就執行
        g.cancel()
    }
    return g.err  // 返回第一個出現的err信息
}

 

2、結合 context 來使用

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    group, errCtx := errgroup.WithContext(ctx)

    for index := 0; index < 3; index++ {
        indexTemp := index

        // 新建子協程
        group.Go(func() error {
            fmt.Printf("indexTemp=%d \n", indexTemp)
            if indexTemp == 0 { // 第一個協程
                fmt.Println("indexTemp == 0 start ")
                fmt.Println("indexTemp == 0 end")
            } else if indexTemp == 1 { // 第二個協程
                fmt.Println("indexTemp == 1 start")
                //這里一般都是某個協程發生異常之后,調用cancel()
                //這樣別的協程就可以通過errCtx獲取到err信息,以便決定是否需要取消后續操作
                cancel() // 第二個協程異常退出
                fmt.Println("indexTemp == 1 err ")
            } else if indexTemp == 2 {
                fmt.Println("indexTemp == 2 begin")

                // 休眠1秒,用於捕獲子協程2的出錯
                time.Sleep(1 * time.Second)

                //檢查 其他協程已經發生錯誤,如果已經發生異常,則不再執行下面的代碼
                err := CheckGoroutineErr(errCtx) // 第三個協程感知第二個協程是否正常
                if err != nil {
                    return err
                }
                fmt.Println("indexTemp == 2 end ")
            }
            return nil
        })
    }

    // 捕獲err
    err := group.Wait()
    if err == nil {
        fmt.Println("都完成了")
    } else {
        fmt.Printf("get error:%v", err)
    }
}

//校驗是否有協程已發生錯誤
func CheckGoroutineErr(errContext context.Context) error {
    select {
    case <-errContext.Done():
        return errContext.Err()
    default:
        return nil
    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM