在實際go開發中, 需要充分的利用go的語言特色,開啟適當的goroutine, 對於所需的返回值的處理,成為比較有意思的問題,困擾很久,終於解決。
本篇借鑒此博文:http://docs.lvrui.io/2020/03/26/go語言在goroutine中拿到返回值/
執行go協程時, 是沒有返回值的, 這時候需要用到go語言中特色的channel來獲取到返回值. 通過channel拿到返回值有兩種處理形式, 一種形式是具有go風格特色的, 即發送給一個for channel或select channel的獨立goroutine中, 由該獨立的goroutine來處理函數的返回值. 還有一種傳統的做法, 就是將所有goroutine的返回值都集中到當前函數, 然后統一返回給調用函數.
發送給獨立的goroutine處理程序
package main
import (
"fmt"
"sync"
"time"
)
var responseChannel = make(chan string, 15)
func httpGet(url int, limiter chan bool, wg *sync.WaitGroup) {
// 函數執行完畢時 計數器-1
defer wg.Done()
fmt.Println("http get:", url)
responseChannel <- fmt.Sprintf("Hello Go %d", url)
// 釋放一個坑位
<- limiter
}
func ResponseController() {
for rc := range responseChannel {
fmt.Println("response: ", rc)
}
}
func main() {
// 啟動接收response的控制器
go ResponseController()
wg := &sync.WaitGroup{}
// 控制並發數為10
limiter := make(chan bool, 20)
for i := 0; i < 99; i++ {
// 計數器+1
wg.Add(1)
limiter <- true
go httpGet(i, limiter, wg)
}
// 等待所以協程執行完畢
wg.Wait() // 當計數器為0時, 不再阻塞
fmt.Println("所有協程已執行完畢")
}
這種具有Go語言特色的處理方式的關鍵在於, 你需要預先創建一個用於處理返回值的公共管道. 然后定義一個一直在讀取該管道的函數, 該函數需要預先以單獨的goroutine形式啟動.
最后當執行到並發任務時, 每個並發任務得到結果后, 都會將結果通過管道傳遞到之前預先啟動的goroutine中.
在當前函數中聚合返回
package main
import (
"fmt"
"sync"
)
func httpGet(url int,response chan string, limiter chan bool, wg *sync.WaitGroup) {
// 函數執行完畢時 計數器-1
defer wg.Done()
// 將拿到的結果, 發送到參數中傳遞過來的channel中
response <- fmt.Sprintf("http get: %d", url)
// 釋放一個坑位
<- limiter
}
// 將所有的返回結果, 以 []string 的形式返回
func collect(urls []int) []string {
var result []string
wg := &sync.WaitGroup{}
// 控制並發數為10
limiter := make(chan bool, 5)
defer close(limiter)
// 函數內的局部變量channel, 專門用來接收函數內所有goroutine的結果
responseChannel := make(chan string, 20)
// 為讀取結果控制器創建新的WaitGroup, 需要保證控制器內的所有值都已經正確處理完畢, 才能結束
wgResponse := &sync.WaitGroup{}
// 啟動讀取結果的控制器
go func() {
// wgResponse計數器+1
wgResponse.Add(1)
// 讀取結果
for response := range responseChannel {
// 處理結果
result = append(result, response)
}
// 當 responseChannel被關閉時且channel中所有的值都已經被處理完畢后, 將執行到這一行
wgResponse.Done()
}()
for _, url := range urls {
// 計數器+1
wg.Add(1)
limiter <- true
// 這里在啟動goroutine時, 將用來收集結果的局部變量channel也傳遞進去
go httpGet(url,responseChannel, limiter, wg)
}
// 等待所以協程執行完畢
wg.Wait() // 當計數器為0時, 不再阻塞
fmt.Println("所有協程已執行完畢")
// 關閉接收結果channel
close(responseChannel)
// 等待wgResponse的計數器歸零
wgResponse.Wait()
// 返回聚合后結果
return result
}
func main() {
urls := []int{1,2,3,4,5,6,7,8,9,10}
result := collect(urls)
fmt.Println(result)
}
接收多返回值
package main
import (
"fmt"
"sync"
)
func httpGet(url int,response chan string,rep chan string, limiter chan bool, wg *sync.WaitGroup) {
// 函數執行完畢時 計數器-1
defer wg.Done()
// 將拿到的結果, 發送到參數中傳遞過來的channel中
response <- fmt.Sprintf("http get: %d", url)
rep <- fmt.Sprintf("rep get: %d", url)
// 釋放一個坑位
<- limiter
}
// 將所有的返回結果, 以 []string 的形式返回
func collect(urls []int) ([]string ,[]string){
var result []string
var tworesult []string
wg := &sync.WaitGroup{}
// 控制並發數為10
limiter := make(chan bool, 5)
defer close(limiter)
// 函數內的局部變量channel, 專門用來接收函數內所有goroutine的結果
responseChannel := make(chan string, 20)
repChannel := make(chan string, 20)
// 為讀取結果控制器創建新的WaitGroup, 需要保證控制器內的所有值都已經正確處理完畢, 才能結束
wgResponse := &sync.WaitGroup{}
// 啟動讀取結果的控制器
// wgResponse計數器+2
wgResponse.Add(2)
go func() {
// 讀取結果
for response := range responseChannel {
// 處理結果
result = append(result, response)
}
// 當 responseChannel被關閉時且channel中所有的值都已經被處理完畢后, 將執行到這一行
wgResponse.Done()
}()
go func() {
// 讀取結果
for rep := range repChannel {
// 處理結果
tworesult = append(tworesult, rep)
}
// 當 responseChannel被關閉時且channel中所有的值都已經被處理完畢后, 將執行到這一行
wgResponse.Done()
}()
for _, url := range urls {
// 計數器+1
wg.Add(1)
limiter <- true
// 這里在啟動goroutine時, 將用來收集結果的局部變量channel也傳遞進去
go httpGet(url,responseChannel,repChannel, limiter, wg)
}
// 等待所以協程執行完畢
wg.Wait() // 當計數器為0時, 不再阻塞
fmt.Println("所有協程已執行完畢")
// 關閉接收結果channel
close(responseChannel)
close(repChannel)
// 等待wgResponse的計數器歸零
wgResponse.Wait()
// 返回聚合后結果
return result,tworesult
}
func main() {
urls := []int{1,2,3,4,5,6,7,8,9,10}
result,tworesult := collect(urls)
fmt.Println(result)
fmt.Println(tworesult)
}