Golang並發+並發數據交換+並發安全(gorutine+channel+sync)


前言

CPU最小執行單位是線程,后台開發人員一直在費盡心思得解決大並發問題 從單線程----->多線程(切換)-------->協程(上下文開銷小),無非是在尋找1種相對完美的方案當1個線程遇到IO阻塞時可以讓OS以最小的開銷把另1個線程調度到CPU上繼續執行。規避IO、最大限度地把所有物理CPU利用起來。

gorutine就是來自Goole的免費解決方案。

gorutine不是被os調度的線程而是由golang的rutime調度的用戶態的微線程(協程)。因為它是微線程所以它的上下文切換開銷小。執行速度也快!

當然CPU最小的執行單位還是線程,golang的rutime最終也會把這些協程 映射到線程池中的線程上。

 

Goroutines are part of making concurrency easy to use.

1.讓程序員可以輕松得使用實現大並發效果。

The idea, which has been around for a while, is to multiplex independently executing functions—coroutines—onto a set of threads.

2.Gorutine是建立在線程池之上來實現的

When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread

to a different, runnable thread so they won't be blocked.

3.當1個Gorutine遇到IO阻塞時Golang的run-time 會自動得把另一個Gorutine調度到另1個沒有被阻塞的線程里面。(永遠不會阻塞)

The programmer sees none of this, which is the point. The result, which we call goroutines, can be very cheap: unless they spend a lot of time in long-running system calls, they cost little more than the memory for the stack, which is just a few kilobytes.

5.Gorutin開銷非常小,非常輕量級(can be very cheap)大概幾千字節。

To make the stacks small, Go's run-time uses segmented stacks. A newly minted goroutine is given a few kilobytes, which is almost always enough.When it isn't, the run-time allocates (and frees) extension segments automatically.

6.為了使Gorutin的stack更小,Golang的runtime使用了segmented stacks(把1個棧划分成一小塊一小塊的)當Gorutine的棧空間不足時還能自動擴展/縮容。666

The overhead(開銷) averages about three cheap instructions per function call. It is practical to create hundreds of thousands of goroutines in the same address space. If goroutines were just threads, system resources would run out at a much smaller number.

7.創建幾千個Gorutine也是可行的!

 

 

並發與並行

並發:同一時間段內執行多個任務。

並行:同一時刻執行多個任務。

Go語言的並發通過goroutine實現。goroutine不同於os層的線程,它屬於用戶態的線程,所以gorutine比較輕量級所以我們可以根據需要創建成千上萬個goroutine並發工作。

goroutine是由Go語言的運行時(runtime)調度,而線程是由操作系統(os)調度完成的,所以它的執行時間也比較快。

Go語言還提供channel在多個goroutine間進行通信

goroutinechannel是 Go 語言秉承的 CSP(Communicating Sequential Process)並發模式的重要實現基礎。

 

goroutine簡介

在java/c++中我們要實現並發編程的時候,我們通常需要自己維護一個線程池,並且需要自己去包裝一個又一個的任務。

同時需要自己去調度線程執行任務並維護上下文切換,這一切通常會耗費程序員大量的心智。

那么能不能有一種機制,程序員只需要定義很多個任務,讓系統去幫助我們把這些任務分配到CPU上實現並發執行呢?

 

我們原來的實現使用線程實現並發的方案流程是:

程序----》os線程池-----》os調度線程----->cpu

在Golang中

程序---》gorutine------》go's runtime調度gorutines--------》線程池-------》os線程接口-----》os調度線程----->cpu

 

 

Go語言中的goroutine就是這樣一種線程封裝機制gorutine是用戶態的線程。

但 goroutine是由Go的運行時(runtime)調度和管理的。Go程序中的runtime會智能地將 goroutine 中的任務合理地分配給每個CPU

Go語言之所以被稱為現代化的編程語言,就是因為Golang在語言層面已經內置了調度和上下文切換的機制

在Go語言編程中你不需要去自己寫進程、線程、協程,你的技能包里只有一個技能–goroutine,當你需要讓某個任務並發執行的時候,你只需要把這個任務包裝成一個函數,開啟一個goroutine去執行這個函數就可以了,就是這么簡單。

 

 

使用gorutine

使用gorutine非常簡單就是在定義的函數前加go關鍵字就行

gorutine有1個特性當main函數結束后,由main函數啟動的gorutine也會全部消失。

mian函數結束相當於進程(資源單位)結束了,皮之不存毛將焉附?

package main

import "fmt"

func hello() {
	fmt.Println("hello")
}
//程序啟動之后會主動創建1個main gorutine
func main() {
	go hello() //開啟1個獨立的gorutine
	fmt.Println("main")
	//main函數結束之后由main函數啟動的gorutine也全部結束
}

 

sync.WaitGroup保證多個gorutine執行順序

sync.WaitGroup實現main gorutin等待所有goritines結束
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

//waitGroup協調gortines順序
var wg sync.WaitGroup

func f() {
	//在go中生成隨機數字(要加seed種子)
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 5; i++ {
		n1 := rand.Intn(11)
		fmt.Println(n1)
	}

}

func f1(i int) {
	// goroutine結束就登記-1
	defer wg.Done()
	//開啟1個gorutine:睡300毫秒
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
	fmt.Printf("goroutine%d\n",i)
}

func main() {
	for i := 0; i < 10; i++ {
		// 啟動一個goroutine就登記+1
		wg.Add(1)
		go f1(i)
	}
	//如何等待10個gorutines全部完成,main函數再結束。
	wg.Wait()//wg.Wait()等待計數器減為0

}

  

 

GOMAXPROCS 並行

Go運行時的調度器使用GOMAXPROCS參數來確定需要使用多少個OS線程來同時執行Go的gorutine。

GOMAXPROCS默認值是機器上的CPU核心數。(跑滿所有cup)例如在一個8核心的機器上,調度器會把Go代碼同時調度到8個OS線程上(GOMAXPROCS是m:n調度中的n)。

也就是說你使用的是Go1.5之后的版本,開啟多個gorutine之后,它們的執行順序 默認就是並行的    go可沒有Python中的GIL鎖啊! 多個cpu可同1時刻跑多個os線程。

但是有時候我們也需要玩得佛系一點!

比如我們寫得 go程序 主要是采集一些監控信息入庫.....一些IO密集型操作,跑滿所有cup反而會增加rutine的調度頻率,影響到服務器上其他的業務!

Go語言中可以通過runtime.GOMAXPROCS()函數設置當前程序並發時占用的CPU邏輯核心數。

 

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var wg sync.WaitGroup

func f1() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		fmt.Printf("f1:%d\n", i)
	}
}

func f2() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		fmt.Printf("f2:%d\n", i)
	}
}

func main() {
	//默認go的runtime會把gorutine調度到機器所有cpu上
	//不過我們可設置gorutine可以被調度到幾個CUP上
	runtime.GOMAXPROCS(8)
	wg.Add(2)
	go f1()
	go f2()
	wg.Wait()
}

  

Go語言中os線程和goroutine的關系

cpu執行的最小單位是os線程,所有的gorutine最終都需要被runtime調度、映射到真正的os線程上,才能被CPU執行。

1個os線程對應用戶態N個goroutine。

1個go程序可以同時使用N個os線程。

goroutine和os線程是多對多的映射關系,即m:n。

 

gorutine的調用模型(GMP)

GPM是Go語言運行時(runtime)層面的實現的,其目的是把M個gorutine映射成N個os線程,然后再被os調度到cpu執行。

  • G(goroutine)里面除了存放本goroutine信息外 還有與所在P的綁定等信息。
  • P(processor) 管理着一組goroutine隊列,P里面會存儲當前goroutine運行的上下文環境(函數指針,堆棧地址及地址邊界),P會對自己管理的goroutine隊列做一些調度(比如把占用CPU時間較長的goroutine暫停、運行后續的goroutine等等)當自己的隊列消費完了就去全局隊列里取,如果全局隊列里也消費完了會去其他P的隊列里搶任務。
  • M(machine)是Go運行時(runtime)對操作系統內核線程的虛擬, M與內核線程一般是1:1映射的關系, 一個groutine最終是要放到M上執行的;

P與M一般也是1:1對應的。

他們關系是:

P管理着一組G掛載在M上運行。

Processor里其中1個Gorutine長久阻塞在一個Machine上時,runtime會新建一個Machine,阻塞Gorutine所在的Processor會把剩余的Gorutine掛載在新建的Mechine上。

當被阻塞的Gorutie阻塞完成或者認為其已經死掉時 回收舊的Machine。

推理得出:1Processor管理N個gorutines-------->1個processor又被掛載1個mechine運行----------》1個mechine映射到1個os線程--------->被cpu執行掉。

 

P的個數是通過runtime.GOMAXPROCS設定(最大256),Go1.5版本之后默認為物理線程數。 在並發量大的時候會增加一些P和M,但不會太多,切換太頻繁的話得不償失。

單從線程調度講,Go語言相比起其他語言的優勢在於OS線程是由OS內核來調度的,goroutine則是由Go運行時(runtime)自己的調度器調度的,這個調度器使用一個稱為m:n調度的技術(復用/調度m個goroutine到n個OS線程)。 其一大特點是goroutine的調度是在用戶態下完成的, 不涉及內核態與用戶態之間的頻繁切換,包括內存的分配與釋放,都是在用戶態維護着一塊大的內存池, 不直接調用系統的malloc函數(除非內存池需要改變),成本比調度OS線程低很多。 另一方面充分利用了多核的硬件資源,近似的把若干goroutine均分在物理線程上, 再加上本身goroutine的超輕量,以上種種保證了go調度方面的性能。

點我了解更多

 

 

channel實現gorutine之間數據共享

Go語言的並發模型是CSP(Communicating Sequential Processes)通信順序進程,就是通過通信的方式實現程序(gorutine)之間的順序執行

當每個獨立的gorutine並發執行之后,它們之間的如何交換數據?多個gorurine協作起來系統得完成1個系統?

channel是golang中的1種數據類型,它專用於gorutine之間數據交換,充當隊列角色。

 

特點

單雙工通信、數據流向為先進先出

數據不能反復讀取讀完了就沒有了

有緩沖區channel實現同步通信

無緩沖區channel有了存儲數據的功能實現了異步通信

 

 

定義channel

package main

import "fmt"

//需要指定channel中存放元素的類型
var channel chan int

func main() {
	//channel是引用類型需要make()開辟內存
	fmt.Println(channel)//<nil>
	//初始化channel1設置它的緩存取大小為5個int
	channel=make(chan int,5)
	fmt.Println(channel)
}

 

channel使用注意細節

channel的capacity是固定了,length是動態的。

只要在初始化階段固定了channel的長度(N)。

生產者只能投放N個數據,不能讓channel超載數據。

同理channel中的(N)個數據讀取(消費)完了,消費者也無法再消費。

 計划經濟:不能產能過剩、不能超前消費。否則會引起panic deadlock(死鎖)

1.channel不支持在線擴容

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

//需要指定channel中存放元素的類型
var channel chan int
func main() {
	//channel是引用類型需要make()開辟內存
	// fmt.Println(channel) //<nil>
	//初始化channel1設置它的緩存取大小為2個int
	channel = make(chan int,2)
	channel <- 1
	fmt.Println("1發送到了通道channel1中")
	channel <- 2
	fmt.Println("2發送到了通道channel1中")
	//fatal error: all goroutines are asleep - deadlock!
	//你給我定義時說讓我存2個int,我不能超載!!!!
	channel <- 3
	fmt.Println("3發送到了通道channel1中")
	close(channel)


}

  

2.channel里面的元素讀取完了就不能再獲取了

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

//需要指定channel中存放元素的類型
var channel chan int
func main() {
	//channel是引用類型需要make()開辟內存
	// fmt.Println(channel) //<nil>
	//初始化channel1設置它的緩存取大小為2個int
	channel = make(chan int,2)
	channel <- 1
	fmt.Println("1發送到了通道channel1中")
	channel <- 2
	fmt.Println("2發送到了通道channel1中")
	v1,_:=<-channel
	fmt.Printf("從管道channel1中獲取到了%d\n",v1)
	v2,_:=<-channel
	fmt.Printf("從管道channel1中獲取到了%d\n",v2)
	//fatal error: all goroutines are asleep - deadlock!
	//管道不超載,所以不要多取值
	v3,_:=<-channel
	fmt.Printf("從管道channel1中獲取到了%d\n",v3)
	
	

}

 

3.close(channel)

channel關閉之后有1個好處就是 可以隨便從channel中讀取數據不會引起pannic
但是寫入數據會報錯。
package main

import "fmt"

var intchannel chan int

func main() {
	intchannel = make(chan int, 3)
	intchannel <- 1
	intchannel <- 2
	//channel關閉之后:可以隨便從channel中讀取數據不會引起pannic,但是寫入數據會報錯。
	close(intchannel)
	//可以隨便讀
	<-intchannel
	<-intchannel
	<-intchannel
	<-intchannel
	//值讀完了之后返回:false 0(默認值)
	v1,ok := <-intchannel
	fmt.Println(ok,v1)
	//不能寫panic: send on closed channel
	// intchannel <- 3

}

 

4.channel的遍歷 

我們可以使用for range 循環遍歷已經關閉的channel。

package main

import "fmt"

var intchannel chan int

func main() {
	intchannel = make(chan int, 3)
	intchannel <- 1
	intchannel <- 2
	intchannel <- 3
	//channel關閉之后:可以隨便從channel中讀取數據不會引起pannic,但是寫入數據會報錯。
	close(intchannel)
	//遍歷已經close的channel不會報錯
	for v := range intchannel {
		fmt.Println(v)
	}

}

 

5.生產者和消費者步調不一致時

只要在channel的兩端有供(生產者)消(消費者)存在,程序會阻塞,但不永遠會造成程序死鎖! 

 當生產者在channel中send的數據小於/大於消費者可消費的數目時或者消費者消費的數據量大於生產者生產的數目都不會引起程序死鎖!

看到這樣的報錯心里也就有數了!

fatal error: all goroutines are asleep - deadlock!

無非是你寫的代碼哪個環節?channel中有值但是沒人消費!或者有消費但是沒值了! 

你只能在那里一直等導致main gorutine無法退出,一旦主 gorutine無法退出,所以的子gorutines也全部會被阻塞,從而導致deadlock.

 

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func senNum(ch chan<- int) {
	for {
		n := rand.Intn(10)
		time.Sleep(5 * time.Second)
		ch <- n
	}

}

func reciveNum(ch <-chan int) {
	for {
		time.Sleep(1 * time.Second)
		ok, n := <-ch 
		/*
		等(阻塞)4秒才能接受到值,因為senNum()每隔5秒鍾才會給chanek中send1個值
		*/
		fmt.Println(ok, n)
	}

}

func main() {
	ch := make(chan int, 1)
	go senNum(ch)
	reciveNum(ch)
}

  

 

操作channel

channel 解決了多個並發的gorutines之間有序傳送數據的問題,但是使用它需要注意一下幾點。

 

 那些我使用channel遇到的坑

package main

import (
	"fmt"
	"time"
)

var ch1 chan int

// var once sync.Once

func readChannle(channel chan int) {

	for v := range channel {
		fmt.Println(v)
	}

}

func main() {
	ch1 = make(chan int, 2)
	ch1 <- 1
	ch1 <- 2
	//ch1 <- 3
	/* bug 1
	會阻塞:因為ch1的緩沖區大小為2個int,你卻還給它send值,
	它已經裝不下第3個值了!(除非有消費者在另一端消費)
	*/
	<-ch1
	<-ch1
	//<-ch1 //deadlock死鎖
	/*
		bug2
		會阻塞:因為ch1里面的值全部被獲取完了
		除非有生產者給channel中寫值 main函數不能能一直在這里等值 或者你是gorutine可以一直等
	*/

	close(ch1)
	readChannle(ch1)
	<-ch1
	/*
		benefit 1
		關閉了channel之后即便channel的值被取光了,再從channel中獲取值
		程序也不會阻塞造成死鎖
	*/

	//ch1<-1
	/*
		bug2
		send on closed channel
		但是channel關閉之后就不能在send數據了
	*/

	//close(ch1)
	/*
		bug3 channel不能被close兩次
	*/

	/*
	Warnning 如何在gorutine中close(channel)
	讓gorutine外面的main函數可以獲取值呢?
	gorutine可是並行的,g1關閉了 g1就不能寫和再關閉了
	解決:once.Do(func() { close(ch2) })互斥鎖

	*/
	go readChannle(ch1)
	go readChannle(ch1)
	go readChannle(ch1)
	go readChannle(ch1)
	go readChannle(ch1)
	time.Sleep(2 * time.Second)

}

  

 

發送

channel <- 1

  

接收

value, ok := <-channel

 

練習1 

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

//生產者生產10個包子
func producer(channel1 chan string) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		channel1 <- fmt.Sprintf("包子%d\n", i)
	}
	//關閉channel以便於別人來取chanel1取值值時不會報錯啊!
	close(channel1)
}

//消費者
func consumer(channel1 chan string) {
	defer wg.Done()
	for v := range channel1 {
		fmt.Println(v)
	}
}


func main() {
	wg.Add(4)
	channal1 := make(chan string, 10)
	go producer(channal1) 
	go consumer(channal1)//消費者1
	go consumer(channal1)//
	go consumer(channal1)//
	wg.Wait()
	fmt.Println("main結束")
}

  

練習2

/*
1.啟動1個gorutine1,生成100個數字寫入channel1中
2.啟動1個gorutine2,從gorutine中獲取到數字,計算其平方寫入channel2。
3.main函數從channel2中查詢結果
*/
package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var once sync.Once

func producer(chanel1 chan int) {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		chanel1 <- i
	}
	close(chanel1)

}

func consumer(ch1 chan int, ch2 chan int) {
	defer wg.Done()
	for v := range ch1 {
		ch2 <- v * v
	}
	//確保某個close操作被gorutines搶到后只被 close 1次
	once.Do(func() { close(ch2) })

}

func main() {
	wg.Add(6)
	ch1 := make(chan int, 100)
	ch2 := make(chan int, 100)
	go producer(ch1)
	go consumer(ch1, ch2)
	go consumer(ch1, ch2)
	go consumer(ch1, ch2)
	go consumer(ch1, ch2)
	go consumer(ch1, ch2)
	wg.Wait()
	for v := range ch2 {
		fmt.Println(v)

	}

}

  

單向通道 

我們默認使用的channel是雙向通道(既可以向channel中 send值,也可以向通道中read值),單向通道就是限制channel僅能sen/read。

單向通道明確了通道的使用功能,限制了用戶操作通道的權限。可以避免在並發場景下讀寫channel沖突導致deadlock。

 

應用場景

單向通道多出現於函數的參數、返回值起到限制函數操作channel的權限。

 

只可以讀的channel

<-chan int

 

只寫的channel

chan<- int

  

 

func Tick(d Duration) <-chan Time {
	if d <= 0 {
		return nil
	}
	return NewTicker(d).C
}

  

 

gorutine pool

在go可以輕松啟動多個gorutine物極必反,無論我們啟動多少個gorutine最終干活的還是os線程。

1個8核的服務器可以同時啟動16個線程,但是golang中啟動了1000個goritine。16個os線程划分1000個gorutine無疑是增加了go runtime調度頻率。並沒有加速程序執行速度。

 

package main

import (
	"fmt"
	"time"
)


//worker
func worker(id int, jobs <-chan int, results chan<- int) {
	for v := range jobs {
		fmt.Printf("work:%d satrt,job:%d\n", id, v)
		time.Sleep(time.Second)
		results <- v * 2
		fmt.Printf("work:%d end,job:%d\n", id, v)

	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	//開啟3個gorutines
	for i := 0; i < 3; i++ {
		go worker(i, jobs, results)
	}
	//消費50個任務
	for i := 0; i < 5; i++ {
		jobs <- i
	}
	close(jobs)
	
	// 輸出5個results結果
	for j := 1; j <= 5; j++ {
		<-results
	}

}

  

練習 使用struct作為channel的存放的元素

一般在golang的channel里面存儲的數據體量比較大的數據時,可以使用struct  pointer類型節省存儲空間。

ps:

如果某1個文件的的權限是755,如 計算該文件 屬主+屬組+其他用戶權限的總和。(7+5+5)

package main
import "fmt"
func main() {
	n := 755
	sum := 0
	for n > 0 {
		//每次除以10
		sum += n % 10 
		n = n / 10    
	}
	fmt.Println(sum)

}

 

 使用goroutinechannel實現一個計算int64隨機數各位數和的程序。

  1. 開啟一個goroutine循環生成int64類型的隨機數,發送到jobChan
  2. 開啟24個goroutinejobChan中取出隨機數計算各位數的和,將結果發送到resultChan
  3. goroutineresultChan取出結果並打印到終端輸出
//
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

/*
開啟一個goroutine循環生成int64類型的隨機數,發送到jobChan
開啟24個goroutine從jobChan中取出隨機數計算各位數的和,將結果發送到resultChan
主goroutine從resultChan取出結果並打印到終端輸出

*/

type job struct {
	value int64
}

type result struct {
	job *job
	sum int64
}

var jobsChannel = make(chan *job, 100)
var resultChanel = make(chan *result, 100)
var wg sync.WaitGroup

func producer(jobs chan<- *job) {
	defer wg.Done()
	for {
		v := rand.Int63()
		newJob := &job{
			value: v,
		}
		jobs <- newJob
		//休眠500毫秒
		time.Sleep(time.Millisecond * 500)

	}

}

func consumer(jobs <-chan *job, results chan<- *result) {
	defer wg.Done()
	for {
		job := <-jobs
		sum := int64(0)
		jobValue := job.value
		for jobValue > 0 {
			sum += jobValue % int64(10)
			jobValue = jobValue / int64(10)
		}
		newResult := &result{
			job: job,
			sum: sum,
		}
		results <- newResult

	}
}

func main() {
	wg.Add(1)
	go producer(jobsChannel)
	wg.Add(24)
	for i := 0; i < 24; i++ {
		go consumer(jobsChannel, resultChanel)
	}

	for v := range resultChanel {
		fmt.Printf("%d---->%d\n",v.job.value, v.sum)
	}
	wg.Wait()

}

  

 

select關鍵字

在某些場景下我們需要同時從多個通道接收數據。

通道在接收數據時,如果沒有數據可以接收將會發生阻塞。

Go內置了select關鍵字,可以循環監聽通道的可讀和可寫的狀態, 同時響應多個通道的操作。

package main

import "fmt"

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		//select自動監聽到channel是否為可讀/可寫狀態!
		select {
		case x := <-ch:
			fmt.Printf("第%d循環:ch通道能讀值啦!獲取%d\n",i, x)
		case ch <- i:
			fmt.Printf("第%d循環:ch通道能寫值啦!寫入%d\n", i, i)

		}
	}
}

  

 

sync包

 

golang除了提供channel 這種CSP機制達到gorutines之間共性數據目的之外,還提供了1個sync包實現並發安全。

sync包中提供了Mutex(互斥鎖)、once(一次性操作)、waigroup(主線程等待所有gorutine結束再推出)、RWMutex(讀寫相互斥鎖)等功能,幫助我們實現並發安全

 

gorutine資源爭用現象

我們知道MySQL客戶端用到的數據放在mysqld服務端的數據庫中當多個客戶端連接數據庫時有事會需要加鎖操作保證數據安全。

程序中用到變量數據在內存里,我開多個gorutine去同時對同1個全局變量進行修改,相當於多個MySQL的客戶端同時對數據庫同1條數據進行修改。

var wg sync.WaitGroup

//定義1個全局變量
var number int64

//對全局變量進行+1操作
func add1() {
    for i := 0; i < 5000; i++ {
        //1.從內存中找到number變量對應的值
        //2.進行+1操作
        //3.把結果賦值給number寫到內存
        number++
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add1()
    go add1()
    //fmt.Println(number)
    wg.Wait()
    fmt.Println(number) //每次執行結果都不一致
}

sync.Mutex

Mutex防止同1時刻,同1資源(全局變量)被多個gorutine操作。

// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema  uint32
}

 

Mextex是使用struct實現的而在golang中struct屬於value類型。

需要注意的是在使用sync.Mutex時如果把它當成參數傳入到函數里面,mutax就會被copy生成2把不同的mutex。

 

互斥鎖

互斥鎖 不區分是讀、寫操作,只要有1個goruitne拿到Mutax,其余的所有gorutines,無論是讀還寫,只能等待。

var lock sync.Mutex
lock.Lock()   //加鎖
lock.Lock()   //加鎖

  

 

1個公共資源被N個gorutines 操作引發的問題 

package main

import (
	"fmt"
	"sync"
)

//鎖
var x = 0
var wg sync.WaitGroup

//每次執行add增加5000
func add() {
	defer wg.Done()
	for i := 0; i < 5000; i++ {
		x++
	}

}

func main() {
	wg.Add(2)
	//開啟2個gorutines同時對x+1
	go add()
	go add()
	/*2個gorutines如果同1時刻都去獲取公共變量x=50,
	然后在獨自的棧中對x+1改變了x都=51
	就少+了1次,導致結果計算不准!
	*/
	wg.Wait()
	fmt.Println(x)
}

 

使用互斥鎖
A Mutex must not be copied after first use.
在使用互斥鎖一定要確保該鎖 不是復制品(作為參數傳遞時一定要傳指針
package main

import (
	"fmt"
	"sync"
)

//鎖
var x = 0
var wg sync.WaitGroup

/*
A Mutex must not be copied after first use.
使用互斥鎖一定要確保該鎖不是復制品(作為參數傳遞時一定要傳指針)
*/

//互斥鎖
var lock sync.Mutex

//每次執行add增加5000
func add() {
	defer wg.Done()
	for i := 0; i < 5000; i++ {
		lock.Lock()   //加鎖
		x++           //操作同1資源
		lock.Unlock() //釋放鎖
	}

}

func main() {
	wg.Add(2)
	//開啟2個gorutines同時對x+1
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

  

RWMutex(讀/寫互斥鎖)

使用數據庫時我們大部分的場景都是讀的頻率高於寫的頻率,所以我們可以使用2個數據庫,1個叫主庫另1個叫從庫,主庫支持寫操作,從庫支持度操作,主從之間通過bin log同步數據。

如果現在數據在內存中放着也是讀變量的頻率遠遠高於修改變量的頻率。我們可以使用RWmutex

互斥鎖是完全互斥的,但是有很多實際的場景下是讀多寫少的,當我們並發的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync包中的RWMutex類型。

讀寫鎖分為兩種:

讀鎖:當一個goroutine獲取讀鎖之后,其他的goroutine如果是獲取讀鎖會繼續獲得鎖,如果是獲取寫鎖就會等待;

寫鎖:當一個goroutine獲取寫鎖之后,其他的goroutine無論是獲取讀鎖還是寫鎖都會等待;

 

var rwlock sync.RWMutex
//讀鎖
rwlock.RLock()
rwlock.RUnlock()
//寫鎖
rwlock.Lock()
rwlock.Unlock()

  

Rwmutex區分gorutine讀、寫操作,僅在寫時資源被lock,讀的gorutines等(讀並發、寫串行)

應用場景:所以使用RWMutex之后,在讀操作大於寫操作次數的場景下並發執行效率會比Mutex更快。

如果讀和寫的操作差別不大,讀寫鎖的優勢就發揮不出來。

package main

import (
	"fmt"
	"sync"
	"time"
)

var x = 0
var lock sync.Mutex
var rwlock sync.RWMutex
var wg sync.WaitGroup

//rwlock
func read() {
	defer wg.Done()
	//加普通互斥鎖
	// lock.Lock()
	//加讀鎖
	rwlock.RLock()
	fmt.Println(x)
	time.Sleep(time.Millisecond)
	//釋放普通互斥鎖
	// lock.Unlock()
	//釋放讀鎖
	rwlock.RUnlock()
}

func write() {
	defer wg.Done()
	// lock.Lock()
	//加寫鎖
	rwlock.Lock()
	x++
	time.Sleep(10 * time.Millisecond)
	// lock.Unlock()
	//釋放寫鎖
	rwlock.Unlock()
}

func main() {

	start := time.Now()

	for i := 0; i < 10; i++ {
		go write()
		wg.Add(1)
	}
	//讀的次數一定要大於寫的次數
	for i := 0; i < 1000; i++ {
		go read()
		wg.Add(1)
	}
	wg.Wait()
	fmt.Println(time.Now().Sub(start))

	//Mutex:1.205s
	//RWMutex 194ms
}

  

sync.WaitGroup

var wg sync.WaitGroup
wg.Add(2)
wg.Done(2)
wg.Wait()

  

主gorutine結束之后,又它開啟的其他gorutines會自動結束!!     

如何做到讓main gorutine等待它開啟的gorutines結束之后,再結束呢?

main gorutine執行time.Sleep(duration)肯定是不合適,因為我們無法精確預測出 gorutines到底會執行多久?

方法名 功能
(wg * WaitGroup) Add(delta int) 計數器+delta
(wg *WaitGroup) Done() 計數器-1
(wg *WaitGroup) Wait() 阻塞直到計數器變為0

 

var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // 啟動另外一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
	wg.Wait()
}

  

 

sync.Once

 如何確保某些操作在並發的場景下只執行1次,例如只加載一次配置文件、只執行1次close(channel)等。

func (o *Once) Do(f func()) {}

Onece的Do方法只能接受1個沒有參數的函數作為它的參數,  如果要傳遞的func參數是有參數的func, 就需要搭配閉包來使用。  

 

下面是借助sync.Once實現的並發安全的單例模式:

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

  

sync.Map

var syncMap sync.Map
//新增
syncMap.Store(key, n)
//刪除
syncMap.Delete(key)
//改
syncMap.LoadOrStore(key)
//遍歷
syncMap.Range(walk)

  

golang中的map在並發情況下: 只讀是線程安全的,但是寫線程不安全,所以為了並發安全 & 高效,官方幫我們實現了另1個sync.map。

 

fatal error: concurrent map writes  //go內置的map只能支持20個並發寫!
package main

import (
	"fmt"
	"strconv"
	"sync"
)

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			//設置1個值
			set(key, n)
			//獲取1個值
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

  

就支持20個並發也太少了!

Go語言的sync包中提供了一個開箱即用的並發安全版map–sync.Map。開箱即用表示不用像內置的map一樣使用make函數初始化就能直接使用。

同時sync.Map內置了諸如StoreLoadLoadOrStoreDeleteRange等操作方法。

 

package main

import (
	"fmt"
	"strconv"
	"sync"
)

var syncMap sync.Map
var wg sync.WaitGroup



func walk(key, value interface{}) bool {
	fmt.Println("即將刪除Key =", key, "Value =", value)
	syncMap.Delete(key)
	return true
}

func main() {
	for i := 0; i < 200; i++ {
		//開啟20個協程去syncMap並發寫操作,也是可以順利寫進去的的!
		key := strconv.Itoa(i)
		wg.Add(1)
		go func(n int) {
			//設置key
			syncMap.Store(key, n)
			//通過key獲取value
			value, ok := syncMap.Load(key)
			if !ok {
				fmt.Println("沒有該key", key)
			}
			fmt.Println(value)
			wg.Done()
		}(i)

	}
	//使用for 循環或者 for range 循環無法遍歷所有syncMap只能使用syncMap.Range()
	//不幸運的Go沒有提供sync.Map的Length的方法,需要自己實現!!
	syncMap.Range(walk)
	wg.Wait()
}

  

  

 

atomic包

 

代碼中的加鎖操作因為涉及內核態的上下文切換會比較耗時、代價比較高

針對基本數據類型我們還可以使用原子操作來保證並發安全,因為原子操作是Go語言提供的方法它在用戶態就可以完成,因此性能比加鎖操作更好

Go語言中原子操作由內置的標准庫sync/atomic提供

方法 解釋
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
讀取操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
寫入操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交換操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比較並交換操作

 

ackage main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Counter interface {
	Inc()
	Load() int64
}

// 普通版
type CommonCounter struct {
	counter int64
}

func (c CommonCounter) Inc() {
	c.counter++
}

func (c CommonCounter) Load() int64 {
	return c.counter
}

// 互斥鎖版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// 原子操作版
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // 非並發安全
	test(c1)
	c2 := MutexCounter{} // 使用互斥鎖實現並發安全
	test(&c2)
	c3 := AtomicCounter{} // 並發安全且比互斥鎖效率更高
	test(&c3)
}

  

 

 

 

 

 

 

 

 

 

 

 

https://morsmachine.dk/go-scheduler

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM