Golang協程和管道


協程(goroutine)

  • 基本介紹

並發和並行

  1. 多線程程序在單核上運行,就是並發
  2. 多線程程序在多核上運行,就是並行
    並發:因為是在一一個cpu上,比如有10個線程,每個線程執行10毫秒(進行輪詢操作),從人的角度看,好像這10個線程都在運行,但是從微觀上看,在某一個時間點看,其實只有一一個線程在執行,這就是並發。
    並行:因為是在多個cpu上(比如有10個cpu),比如有10個線程,每個線程執行10毫秒(各自在不同cpu.上執行),從人的角度看,這10個線程都在運行,但是從微觀上看,在某一個時間點看,也同時有10個線程在執行,這就是並行
  • Go 協程和 Go 主線程
  1. Go 主線程(有程序員直接稱為線程/也可以理解成進程): 一個 Go 線程上,可以起多個協程,你可以這樣理解,協程是輕量級的線程[編譯器做優化]
  2. Go 協程的特點
  1. 有獨立的棧空間
  2. 共享程序堆空間
  3. 調度由用戶控制
  4. 協程是輕量級的線程
    //編寫一個函數,每隔1秒輸出 "hello,world"
    func test() {
        for i := 1; i <= 10; i++ {
            fmt.Println("tesst () hello,world " + strconv.Itoa(i))
            time.Sleep(time.Second)
        }
    }

    func main() {

        go test() // go關鍵字開啟了一個協程

        for i := 1; i <= 10; i++ {
            fmt.Println(" main() hello,golang" + strconv.Itoa(i))
            time.Sleep(time.Second)
        }
    }
  • 小結
  1. 主線程是一個物理線程,直接作用在 cpu 上的。是重量級的,非常耗費 cpu 資源。
  2. 協程從主線程開啟的,是輕量級的線程,是邏輯態。對資源消耗相對小。
  3. Golang 的協程機制是重要的特點,可以輕松的開啟上萬個協程。其它編程語言的並發機制是一般基於線程的,開啟過多的線程,資源耗費大,這里就突顯 Golang 在並發上的優勢了

goroutine 的調度模型

  • MPG 模式基本介紹
  1. M: 操作系統的主線程(是物理線程)
  2. P: 協程執行需要的上下文
  3. G: 協程
  • 設置 Golang 運行的 cpu 數
  1. go1.8后, 默認讓程序運行在多個核上,,可以不用設置了
  2. go1.8前, 還是要設置一下,可以更高效的利益cpu
    func main() {
        cpuNum := runtime.NumCPU()
        fmt.Println("cpuNum=", cpuNum)

        //可以自己設置使用多個cpu
        runtime.GOMAXPROCS(cpuNum - 1)
        fmt.Println("ok")
    }
  • goroutine 之間通訊
  1. 全局變量的互斥鎖 (Go 語言提供兩類鎖: 互斥鎖(Mutex)和讀寫鎖(RWMutex)。其中讀寫鎖(RWMutex)是基於互斥鎖(Mutex)實現的)
    import (
        "sync"
    )
    var ( 
        //聲明一個全局的互斥鎖
        //lock 是一個全局的互斥鎖, 
        //sync 是包: synchornized 同步
        //Mutex : 是互斥
        lock sync.Mutex
    )
    func funcName ( * args[]) {
        //加鎖
        lock.Lock()

        ........    //協程

        //解鎖
        lock.Unlock()
    }
  1. 使用管道 channel 來解決
  1. 前面使用全局變量加鎖同步來解決 goroutine 的通訊,但不完美
  2. 主線程在等待所有 goroutine 全部完成的時間很難確定,我們這里設置 10 秒,僅僅是估算。
  3. 如果主線程休眠時間長了,會加長等待時間,如果等待時間短了,可能還有 goroutine 處於工作狀態,這時也會隨主線程的退出而銷毀
  4. 通過全局變量加鎖同步來實現通訊,也並不利用多個協程對全局變量的讀寫操作。

管道(channel)

  1. channle 本質就是一個數據結構-隊列
  2. 數據是先進先出【FIFO : first in first out】
  3. 線程安全,多 goroutine 訪問時,不需要加鎖,就是說 channel 本身就是線程安全的
  4. channel 有類型的,一個 string 的 channel 只能存放 string 類型數據。
  • 定義/聲明 channel
    var 變量名 chan 數據類型
    舉例:
    var intChan chan int (intChan 用於存放 int 數據)
    var mapChan chan map[int]string (mapChan 用於存放 map[int]string 類型)
    var perChan chan Person
    var perChan2 chan *Person
    ...

    說明
    1. channel 是引用類型
    2. channel 必須初始化才能寫入數據, 即 make 后才能使用
    3. 管道是有類型的,intChan 只能寫入 整數 int
  • 基本操作
    func main() {
        //1. 創建一個可以存放3個int類型的管道
        var intChan chan int
        intChan = make(chan int, 3)

        //2. intChan是什么
        fmt.Printf("intChan 的值=%v intChan本身的地址=%p\n", intChan, &intChan)
        //intChan 的值=0xc00001c100 intChan本身的地址=0xc00000e028

        //3. 向管道寫入數據
        intChan<- 10
        num := 211
        intChan<- num
        intChan<- 50
        /*
            //如果從channel取出數據后,可以繼續放入
            <-intChan
            intChan<- 98//注意點, 當我們給管寫入數據時,不能超過其容量
        */

        //4. 管道的長度和cap(容量)
        fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 3, 3

        //5. 從管道中讀取數據
        var num2 int
        num2 = <-intChan 
        fmt.Println("num2=", num2)
        fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan))  // 2, 3

        //6. 在沒有使用協程的情況下,如果我們的管道數據已經全部取出,再取就會報告 deadlock

        num3 := <-intChan
        num4 := <-intChan

        //num5 := <-intChan

        fmt.Println("num3=", num3, "num4=", num4/*, "num5=", num5*/)
    }
  • 注意事項
  1. channel 中只能存放指定的數據類型
  2. channle 的數據放滿后,就不能再放入了
  3. 如果從 channel 取出數據后,可以繼續放入
  4. 在沒有使用協程的情況下,如果 channel 數據取完了,再取,就會報 dead lock
  • channel 的關閉
    使用內置函數 close 可以關閉 channel, 當 channel 關閉后,就不能再向 channel 寫數據了,但是仍然可以從該 channel 讀取數據
func main() {
    intChan := make(chan int, 3)
    intChan <- 10
    intChan <- 20
    // 關閉 channel
    close(intChan)
    // 關閉 channel 后,無法將數據寫入到 channel 中,讀取數據是可以的
    num := <- intChan
    fmt.Println(num) // 10
}
  • channel 的遍歷
    channel 支持 for--range 的方式進行遍歷,請注意兩個細節
  1. 在遍歷時,如果 channel 沒有關閉,則回出現 deadlock 的錯誤
  2. 在遍歷時,如果 channel 已經關閉,則會正常遍歷數據,遍歷完后,就會退出遍歷。
    func main() {
        ch := make(chan int, 3)
        ch <- 10
        ch <- 20
        ch <- 30
        // 關閉 channel
        close(ch)
        // 遍歷 channel
        for v := range ch {
            fmt.Println(v)
        }
    }
  • 應用實例:協程與管道協同工作
  1. 開啟一個 writeData 協程,向管道中寫入30個整數;
  2. 開啟一個 readData 協程,從管道中讀取writeData寫入的數據;
  3. writeData 和 readData 操作的是同一個管道;
  4. 主線程需要等待這兩個協程都完成工作才能退出。
//write Data
func writeData(intChan chan int) {
	for i := 1; i <= 50; i++ {
		//放入數據
		intChan<- i //
		fmt.Println("writeData ", i)
	}
	close(intChan) //關閉
}

//read data
func readData(intChan chan int, exitChan chan bool) {

	for {
		v, ok := <-intChan
		if !ok {
			break
		}
		fmt.Printf("readData 讀到數據=%v\n", v) 
	}
	//readData 讀取完數據后,即任務完成
	exitChan<- true
	close(exitChan)

}

func main() {
	//創建兩個管道
	intChan := make(chan int, 50)
	exitChan := make(chan bool, 1)
	
	go writeData(intChan)
	go readData(intChan, exitChan)

	for {
		_, ok := <-exitChan //會輪詢兩次,第二次為false
		if !ok {
			break
		}
	}
}
  • 應用實例:阻塞
  1. 如果編譯器(運行),發現一個管道只有寫,沒有讀,則該管道會阻塞。
  2. 如果寫管道和讀管道的頻率不一致,無所謂。
  3. 如果channel沒有關閉, 也會死鎖阻塞。
func writeData(intChan chan int) {
	for i := 1; i <= 3; i++ {
		//放入數據
		intChan<- i //
		fmt.Println("writeData ", i)
	}
	close(intChan) //關閉
}

func readData(intChan chan int, exitChan chan bool) {
	for {
		v, ok := <-intChan
		if !ok {
			break
		}
        2. 讀取比寫入頻率慢,不會死鎖
		time.Sleep(time.Second*10)
		fmt.Printf("readData 讀到數據=%v\n", v) 
	}
	//readData 讀取完數據后,即任務完成
	exitChan<- true
    
	//close(exitChan)

}

func main() {

	//創建兩個管道
	intChan := make(chan int, 3)
	exitChan := make(chan bool, 1)
	
    1. 只有寫沒有讀,會死鎖
	go writeData(intChan)
	//go readData(intChan, exitChan)

	for {
        3. channel沒有關閉,這里會死鎖
		_, ok := <-exitChan
		if !ok {
			break
		}
	}

}
  • 應用實例:多管道同時運行
  1. 不要在讀取端關閉 channel ,因為寫入端無法知道 channel 是否已經關閉,往已關閉的 channel 寫數據會 panic ;
  2. 有多個寫入端時,不要在寫入端關閉 channle ,因為其他寫入端無法知道 channel 是否已經關閉,關閉已經關閉的 channel 會發生 panic ;
  3. 如果只有一個寫入端,可以在這個寫入端放心關閉 channel 。
func putNum(intChan chan int) {
	for i := 1; i <= 80000; i++ {    
		intChan<- i
	}
	close(intChan)
}

func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
	var num int //從intChan取出的數字
	var flag bool //素數標識
	for {
		num, ok := <-intChan
		if !ok {  //intChan 取不到
			break
		}
		flag = true //假設是素數
		//判斷num是不是素數
		for i := 2; i < num; i++ {
			if num % i == 0 {//說明該num不是素數
				flag = false
				break
			}
		}
		if flag {
			//將這個數就放入到primeChan
			primeChan<- num
		}
	}

	fmt.Println("一個primeNum 協程退出")
	//這里我們還不能關閉 primeChan
	//向 exitChan 寫入true
	exitChan<- true	
}

func main() {
	intChan := make(chan int , 1000)
	primeChan := make(chan int, 20000)//過濾后放入結果
	exitChan := make(chan bool, 8) ////標識退出的管道

	start := time.Now().Unix()
	
	//開啟一個協程,向 intChan放入 1-8000個數
	go putNum(intChan)

	//開啟4個協程,從 intChan取出數據,並判斷是否為素數,如果是,就放入到primeChan
	for i := 0; i < 6; i++ {
		go primeNum(intChan, primeChan, exitChan)
	}

	//這里我們主線程,進行處理
	//直接
	go func(){
		for i := 0; i < 6; i++ {
			<-exitChan
		}

		end := time.Now().Unix()
		fmt.Println("使用協程耗時=", end - start)

		//當我們從exitChan 取出了4個結果,就可以放心的關閉 prprimeChan
		close(primeChan)
	}()


	//遍歷我們的 primeChan ,把結果取出
	for {
		_, ok := <-primeChan
		if !ok{
			break
		}
		//將結果輸出
		//fmt.Printf("素數=%d\n", res)
	}

	fmt.Println("main線程退出")
}
  • 注意事項
  1. channel 可以聲明為只讀,或者只寫性質
	func main() {
		//1. 在默認情況下下,管道是雙向
		//var chan1 chan int //可讀可寫
		
		//2 聲明為只寫
		var chan2 chan<- int
		chan2 = make(chan int, 3)
		chan2<- 20
		//num := <-chan2 //error
		fmt.Println("chan2=", chan2)

		//3. 聲明為只讀
		var chan3 <-chan int
		num2 := <-chan3
		//chan3<- 30 //err
		fmt.Println("num2", num2)

	}
  1. 使用 select 可以解決從管道取數據的阻塞問題
	func main() {
		//1.定義一個管道 10個數據int
		intChan := make(chan int, 10)
		for i := 0; i < 10; i++ {
			intChan<- i
		}
		//2.定義一個管道 5個數據string
		stringChan := make(chan string, 5)
		for i := 0; i < 5; i++ {
			stringChan <- "hello" + fmt.Sprintf("%d", i)
		}

		//傳統的方法在遍歷管道時,如果不關閉會阻塞而導致 deadlock
		//在實際開發中,不確定要關閉該管道,且不希望阻塞,可以使用select 方式可以解決
		//label:
		for {
			select {
				//注意: 這里,如果intChan一直沒有關閉,不會一直阻塞而deadlock,會自動到下一個case匹配
				case v := <-intChan : 
					fmt.Printf("從intChan讀取的數據%d\n", v)
					time.Sleep(time.Second)
				case v := <-stringChan :
					fmt.Printf("從stringChan讀取的數據%s\n", v)
					time.Sleep(time.Second)
				default :
					fmt.Printf("都取不到了,不玩了, 程序員可以加入邏輯\n")
					time.Sleep(time.Second)
					return 
					//break label 一般不建議使用;return后后續代碼不會執行,而break標簽只是跳出當前循環,繼續執行下面的代碼
			}
		}
	}
  1. goroutine 中使用 recover,解決協程中出現 panic,導致程序崩潰問題

如果我們起了一個協程,但是這個協程出現了panic,如果我們沒有捕獲這個panic,就會造成整個程序崩潰,這時我們可以在goroutine中使用recover來捕獲panic,進行處理,這樣即使這個協程發生的問題,但是主線程仍然不受影響,可以繼續執行。

	//函數
	func sayHello() {
		for i := 0; i < 10; i++ {
			time.Sleep(time.Second)
			fmt.Println("hello,world")
		}
	}
	//函數
	func test() {
		//這里我們可以使用defer + recover
		defer func() {
			//捕獲test拋出的panic
			if err := recover(); err != nil {
				fmt.Println("test() 發生錯誤", err)
			}
		}()
		//定義了一個map
		var myMap map[int]string
		myMap[0] = "golang" //error 切片沒有mark
	}

	func main() {
		go sayHello()
		go test()

		for i := 0; i < 10; i++ { //防止協程沒有執行結束,程序就已經退出,隨便找點事做
			fmt.Println("main() ok=", i)
			time.Sleep(time.Second)
		}

	}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM