Go的並發


Go的並發

並發和並行

  • 並發:同一時間段內執行多個任務
  • 並行:同一時刻執行多個任務

goroutine類似於線程,屬於用戶態的線程,我們可以根據需要創建成千上萬個goroutine並發工作

goroutine是由Go語言的運行時(runtime)調度完成,而線程是由操作系統調度完成

channel在多個goroutine間進行通信

goroutine

goroutine的概念類似於線程,但 goroutine是由Go的運行時(runtime)調度和管理的

Go程序會智能地將 goroutine 中的任務合理地分配給每個CPU

Go語言在語言層面已經內置了調度和上下文切換的機制

當你需要讓某個任務並發執行的時候,你只需要把這個任務包裝成一個函數,開啟一個goroutine去執行這個函數

使用goroutine

  1. Go語言中使用goroutine非常簡單,只需要在調用函數的時候在前面加上go關鍵字,就可以為一個函數創建一個goroutine
  2. 一個goroutine必定對應一個函數,可以創建多個goroutine去執行相同的函數

啟動單個goroutine

func hello() {
  fmt.Println("Hello")
}
func main() {
  go hello()
  fmt.Println("main goroutine done!")
  time.Sleep(time.Second)
}
  • Go程序就會為main()函數創建一個默認的goroutine
  • main()結束后,在main()函數中啟動的goroutine會一同結束
  • 先打印"main goroutine done!"是因為創建hello函數的goroutine需要一點時間

啟動多個goroutine

使用sync.WaitGroup來實現goroutine的同步

var wg sync.WaitGroup
func hello(i int) {
  defer wg.Done()       // goroutine結束就登記-1
  fmt.Println("Hello:", i)
}
func main() {
  for i:= 0; i < 10; i++ {
    wg.Add(1)          // 啟動一個goroutine就+1
    go hello(i)
  }
  wg.Wait()            // 等所有登記的goroutine都結束
}

每次打印的數字的順序都不一致,這是因為10個goroutine是並發執行的,而goroutine的調度是隨機的

goroutine與線程

可增長的棧

  1. OS線程(操作系統線程)一般都有固定的棧內存(通常為2MB)

  2. 一個goroutine的棧在其生命周期開始時只有很小的棧(典型情況下2KB)

  3. goroutine的棧大小限制可以達到1GB

  4. 在Go語言中一次創建十萬左右的goroutine也是可以的

goroutine調度

GPM是Go語言運行時(runtime)層面的實現,是go語言自己實現的一套調度系統

  • G很好理解,就是個goroutine的,里面除了存放本goroutine信息外 還有與所在P的綁定等信息
  • P管理着一組goroutine隊列,P里面會存儲當前goroutine運行的上下文環境(函數指針,堆棧地址及地址邊界),P會對自己管理的goroutine隊列做一些調度(比如把占用CPU時間較長的goroutine暫停、運行后續的goroutine等等)當自己的隊列消費完了就去全局隊列里取,如果全局隊列里也消費完了會去其他P的隊列里搶任務
  • M(machine)是Go運行時(runtime)對操作系統內核線程的虛擬, M與內核線程一般是一一映射的關系, 一個groutine最終是要放到M上執行的

P與M一般也是一一對應的。他們關系是: P管理着一組G掛載在M上運行。當一個G長久阻塞在一個M上時,runtime會新建一個M,阻塞G所在的P會把其他的G 掛載在新建的M上。當舊的G阻塞完成或者認為其已經死掉時 回收舊的M

goroutine的調度是在用戶態下完成的, 不涉及內核態與用戶態之間的頻繁切換,包括內存的分配與釋放,都是在用戶態維護着一塊大的內存池, 不直接調用系統的malloc函數(除非內存池需要改變),成本比調度OS線程低很多

GOMAXPROCS

Go運行時的調度器使用GOMAXPROCS參數來確定需要使用多少個OS線程來同時執行Go代碼。默認值是機器上的CPU核心數

GOMAXPROCS是m:n調度中的n

Go1.5版本之前,默認使用的是單核心執行。Go1.5版本之后,默認使用全部的CPU邏輯核心數

Go語言中可以通過runtime.GOMAXPROCS()函數設置當前程序並發時占用的CPU邏輯核心數

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second)
}

將邏輯核心數設為2,此時兩個任務並行執行

Go語言中的操作系統線程和goroutine的關系

  • 一個操作系統線程對應用戶態多個goroutine
  • go程序可以同時使用多個操作系統線程
  • goroutine和OS線程是多對多的關系,即m:n

channel類型

channel是可以讓一個goroutine發送特定值到另一個goroutine的通信機制

Go 語言中的通道(channel)是一種特殊的類型,一種引用類型

var 變量 chan 元素類型
var ch1 chan int   // 聲明一個傳遞整型的通道
var ch2 chan bool  // 聲明一個傳遞布爾型的通道
var ch3 chan []int // 聲明一個傳遞int切片的通道

創建channel

通道是引用類型,通道類型的空值是nil

聲明的通道后需要使用make函數初始化之后才能使用

make(chan 元素類型, [緩沖大小])
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

channel操作

ch := make(chan int)
ch <- 10 // 把10發送到ch中
x := <- ch // 從ch中接收值並賦值給變量x
<-ch       // 從ch中接收值,忽略結果
close(ch)
  • 對一個關閉的通道再發送值就會導致panic
  • 對一個關閉的通道進行接收會一直獲取值直到通道為空
  • 一個關閉的並且沒有值的通道執行接收操作會得到對應類型的零值
  • 關閉一個已經關閉的通道會導致panic

無緩沖通道

func recv(c chan int) {
  ret := <-c
  fmt.Println("接收成功:", ret)
}
func main() {
  ch := make(chan int)
  go recv(ch)    // 先有接收者,ch <- 10才能發送
  ch <- 10
  fmt.Println("發送成功")
}

無緩沖通道上的發送操作會阻塞,直到另一個goroutine在該通道上執行接收操作,這時值才能發送成功,兩個goroutine將繼續執行。相反,如果接收操作先執行,接收方的goroutine將阻塞,直到另一個goroutine在該通道上發送一個值

使用無緩沖通道進行通信將導致發送和接收的goroutine同步化。因此,無緩沖通道也被稱為同步通道

有緩沖通道

func main() {
  ch := make(chan int, 1)
  ch <- 10
  fmt.Println("發送成功")
}

可以使用內置的len函數獲取通道內元素的數量,使用cap函數獲取通道的容量,雖然我們很少會這么做

就像你小區的快遞櫃只有那么個多格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個快遞員就能往里面放一個

for range從通道中取值

func main() {
  ch1 := make(chan int)
  ch2 := make(chan int)
  // 開啟goroutine將0到100的數發乳ch1中
  go func() {
    for i := 0; i < 100; i++ {
      ch1 <- i
    }
    close(ch1)
  }()
  // 開啟goroutine從ch1取值,並將該值的平方發到ch2中
  go func() {
    for {
      i, ok := <-ch1  // 通道關閉后再取值時ok為false
      if !ok{
        break
      }
      ch2 <- i*i
    }
    close(ch2)
  }()
  // 在主goroutine中取ch2的值並打印
  for i := range ch2{     // ch2關閉后會退出for range循環
    fmt.Println(i)
  }
}

有兩種方式在接收值的時候判斷該通道是否被關閉,不過我們通常使用的是for range的方式。使用for range遍歷通道,當通道被關閉的時候就會退出for range

單項通道

有的時候我們會將通道作為參數在多個任務函數間傳遞,很多時候我們在不同的任務函數中使用通道都會對其進行限制,比如限制通道在函數中只能發送或只能接收

func counter(out chan<- int) {
 for i := 0; i < 100; i++ {
   out <- int
 }
 close(out)
}
func squarer(out chan<- int, in <-chan int) {
 for i := range in {
   out <-  i*i
 }
 // in <- 5      // 會報錯,in只能發送,不能接收
 close(out)
}
func printer(in <-chan int) {
 for i := range in {
   fmt.Println(i)
 }
}
func main() {
 ch1 := make(chan int)
 ch2 := make(chan int)
 go count(ch1)
 go count(ch2, ch1)
 printer(ch2)
}
  • chan<- int是一個只能發送的通道,可以發送但是不能接收
  • <-chan int是一個只能接收的通道,可以接收但是不能發送

在函數傳參及任何賦值操作中將雙向通道轉換為單向通道是可以的,但反過來是不可以的

通道總結

channel常見的異常總結

channel nil 非空 空的 滿了 沒滿
接收 阻塞 接收值 阻塞 接收值 接收值
發送 阻塞 發送值 發送值 阻塞 發送值
關閉 pannic 關閉成功,讀完數據后返回零值 關閉成功,返回零值 關閉成功,讀完數據后返回零值 關閉成功,讀完數據后返回零值

worker pool(goroutine池)

在工作中我們通常會使用可以指定啟動的goroutine數量–worker pool模式,控制goroutine的數量,防止goroutine泄漏和暴漲

func worker(id int, jobs <-chan int, results chan<- int) {
  for j := range jobs {
    fmt.Printf("worker:%d start job:%d\n", id, j)
    time.Sleep(time.Second)
    fmt.Printf("worker:%d end job:%d\n", id, j)
    results <- j * 2
  }
}
func main() {
  jobs := make(chan int, 100)
  results := make(chan int, 100)
  // 開啟3個goroutine
  for w := 0; w < 3; w++ {
    go worker(w, jobs, results)
  }
  // 5個任務
  for j := 0; j < 5; j++ {
    jobs <- j
  }
  close(jobs)
  // 輸出結果
  for a := 0; a < 5; a++ {
    <-results
  }
}

select多路復用

在某些場景下我們需要同時從多個通道接收數據。通道在接收數據時,如果沒有數據可以接收將會發生阻塞。你也許會寫出如下代碼使用遍歷的方式來實現

for{
  // 嘗試從ch1接收值
  data, ok1 := <-ch1
  if ok1 {
    fmt.Println(data)
  }
  // 嘗試從ch2接收值
  data, ok2 := <-ch2
  if ok2 {
    fmt.Println(data)
  }
  …
}

這種方式雖然可以實現從多個通道接收值的需求,但是運行性能會差很多 X

select的使用類似於switch語句,它有一系列case分支和一個默認的分支。每個case會對應一個通道的通信(接收或發送)過程

select會一直等待,直到某個case的通信操作完成時,就會執行case分支對應的語句

/*
select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默認操作
}
*/

func main() {
  ch := make(chan int, 1)
  for i := 0; i < 10; i ++ {
    select {
      case x := <-ch:
      fmt.Println(x)
      case ch <- i:
    }
  }
}
  • 可處理一個或多個channel的發送/接收操作
  • 如果多個case同時滿足,select會隨機選擇一個
  • 對於沒有caseselect{}會一直等待,可用於阻塞main函數

並發安全和鎖

var x int64
var wg sync.WaitGroup
func add() {
  for i := 0; i < 5000; i++ {
    x += 1
  }
  wg.Done()
}
func main() {
  wg.Add(2)
  go add()
  go add()
  wg.Wait()
  fmt.Println(x)
}
// 會發現每次的輸出都不一樣

互斥鎖

互斥鎖是一種常用的控制共享資源訪問的方法,它能夠保證同時只有一個goroutine可以訪問共享資源。Go語言中使用sync包的Mutex類型來實現互斥鎖。 使用互斥鎖來修復上面代碼的問題

var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
  for i := 0; i < 5000; i++ {
    lock.Lock()    // 加鎖
    x += 1
    lock.Unlock()  // 解鎖
  }
  wg.Done()
}
func main() {
  wg.Add(2)
  go add()
  go add()
  wg.Wait()
  fmt.Println(x)
}

使用互斥鎖能夠保證同一時間有且只有一個goroutine進入臨界區,其他的goroutine則在等待鎖;當互斥鎖釋放后,等待的goroutine才可以獲取鎖進入臨界區,多個goroutine同時等待一個鎖時,喚醒的策略是隨機的

讀寫互斥鎖

互斥鎖是完全互斥的,但是有很多實際的場景下是讀多寫少的,當我們並發的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync包中的RWMutex類型

讀寫鎖分為兩種:讀鎖和寫鎖

當一個goroutine獲取讀鎖之后,其他的goroutine如果是獲取讀鎖會繼續獲得鎖,如果是獲取寫鎖就會等待

當一個goroutine獲取寫鎖之后,其他的goroutine無論是獲取讀鎖還是寫鎖都會等待

var x int64
var wg sync.WaitGroup
var lock sync.Mutex
var rwlock sync.RWMutex
func write() {
  rwlock.Lock()  // 加寫鎖
  x += 1
  time.Sleep(10 * time.Millisecond)   // 假設寫操作用10毫秒
  rwlock.Unlock()
  wg.Done()
}
func read() {
  rwlock.Lock()  // 加讀鎖
  time.Sleep(time.Millisecond)   // 假設讀操作用1毫秒
  rwlock.Unlock()
  wg.Done()
}

func main() {
  start := time.Now()
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go write()
  }
  for i := 0; i < 1000; i++ {
    wg.Add(1)
    go read()
  }
  wg.Wait()
  end := time.Time()
  fmt.Println(end.Sub(start))
}

sync.WaitGroup

在代碼中生硬的使用time.Sleep肯定是不合適的,Go語言中可以使用sync.WaitGroup來實現並發任務的同步

方法名 功能
(wg * WaitGroup) Add(delta int) 計數器+delta
(wg *WaitGroup) Done() 計數器-1
(wg *WaitGroup) Wait() 阻塞直到計數器變為0

sync.WaitGroup內部維護着一個計數器,計數器的值可以增加和減少。例如當我們啟動了N 個並發任務時,就將計數器值增加N。每個任務完成時通過調用Done()方法將計數器減1。通過調用Wait()來等待並發任務執行完,當計數器值為0時,表示所有並發任務已經完成

var wg sync.WaitGroup
func hello() {
  defer wg.Done()
  fmt.Println("Hello Groutine")
}
func main() {
  wg.Add(1)
  go hello()   // 啟動一個goroutine去執行hello函數
  fmt.Println("main gotoutine done")
  wg.Wait()
}

sync.WaitGroup是一個結構體,傳遞的時候要傳遞指針

sync.Once

在編程的很多場景下我們需要確保某些操作在高並發的場景下只執行一次,例如只加載一次配置文件、只關閉一次通道等

func (o *Once) Do(f func()) {}

如果要執行的函數f需要傳遞參數就需要搭配閉包來使用

var icons map[string]image.Image
func loadIcons() {
  icons = map[string]image.Image{
    "left": loadIcon("left.png"),
    "up": loadIcon("up.png"),
    "right": loadIcon("right.png"),
    "down": loadIcon("down.png"),
  }
}
// Icon 被多個goroutine調用時不是並發安全的
func Icon(name string) image.Image {
  if icons == nil{
    loadIcons()
  }
  return icons[name]
}

多個goroutine並發調用Icon函數時不是並發安全的,現代的編譯器和CPU可能會在保證每個goroutine都滿足串行一致的基礎上自由地重排訪問內存的順序。loadIcons函數可能會被重排為以下結果

func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}

在這種情況下就會出現即使判斷了icons不是nil也不意味着變量初始化完成了。考慮到這種情況,我們能想到的辦法就是添加互斥鎖,保證初始化icons的時候不會被其他的goroutine操作,但是這樣做又會引發性能問題

使用sync.Once

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是並發安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

並發安全的單例模式

package singletion
import "sync"
type singleton struct {}
var instance * singleton
var once sync.Once
func GetInstance() *singleton {
  coce.Do(func() {
    instance = &singleton{}
  })
  return instance
}

sync.Once其實內部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和數據的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操作的時候是並發安全的並且初始化操作也不會被執行多次

sync.Map

go中的map不是並發安全的

var m = make(map[string]int)
func get(key string) int {
  return m[key]
}
func set(key string, value int) {
  m[key] = value
}
func main() {
  wg := sync.WaitGroup{}
  for i := 0; i < 20; i++ {
    wg.Add(1)
    go func(n int) {
      key := strconv.Iton(n)
      set(key, n)
      fmt.Printf("k=:%v, v:=%v\n", key, get(key))
      wg.Done()
    }(i)
  }
  wg.Wait()
}

上面的代碼開啟少量幾個goroutine的時候可能沒什么問題,當並發多了之后執行上面的代碼就會報fatal error: concurrent map writes錯誤

像這種場景下就需要為map加鎖來保證並發的安全性了,Go語言的sync包中提供了一個開箱即用的並發安全版map–sync.Map。開箱即用表示不用像內置的map一樣使用make函數初始化就能直接使用。同時sync.Map內置了諸如StoreLoadLoadOrStoreDeleteRange等操作方法

var m = sync.Map{}
func main() {
  wg := sync.WaitGruop{}
  for i := 0; i < 20; i++ {
    wg.Add(1)
    go func(n int){
      key := strconv.Itoa(n)
      value, _ := m.Load(key)
      fmt.Printf("k=:%v, v:=%v\n", key, value)
      wg.Done
    }(i)
  }
  wg.Wait()
}

原子操作

代碼中的加鎖操作因為涉及內核態的上下文切換會比較耗時、代價比較高。針對基本數據類型我們還可以使用原子操作來保證並發安全,因為原子操作是Go語言提供的方法它在用戶態就可以完成,因此性能比加鎖操作更好。Go語言中原子操作由內置的標准庫sync/atomic提供

atomic包

方法 解釋
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) 讀取操作
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 寫入操作
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交換操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比較並交換操作

例子

type Count interface {
  Inc()
  Load() int64
}
// 普通版
type CommonCounter struct {
  counter int64
}
func (c CommonCounter) Inc(){
  c.counter++
}
func (c CommonCounter) Load(){
  return c.counter
}

// 互斥鎖版
type MutexCounter struct {
  counter int64
  lock sync.Mutex
}
func (m *MutexCounter) Inc() {
  m.lock.Lock()
  defer m.lock.Lock()
  m.counter++
}

// 原子操作版
type AtomicCounter struct {
  counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // 非並發安全
	test(c1)
	c2 := MutexCounter{} // 使用互斥鎖實現並發安全
	test(&c2)
	c3 := AtomicCounter{} // 並發安全且比互斥鎖效率更高
	test(&c3)
}

atomic包提供了底層的原子級內存操作,對於同步算法的實現很有用。這些函數必須謹慎地保證正確使用。除了某些特殊的底層應用,使用通道或者sync包的函數/類型實現同步更好


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM