Go並發編程(goroutine)


Go並發

並發編程里面一個非常重要的概念, go語言在語言層面天生支持並發, 這也是Go語言流行的一個重要的原因

Go語言中的並發編程

並發與並行

並發:同一時間段內執行多個任務(你在用微信和兩個人聊天)

並行:同一時刻執行多個任務 (你和你的朋友 都在用微信和 你們的一個朋友聊天)

Go語言的並發通過goroutine 實現 , goroutine 是比線程更加輕量級的協程 。goroutine是由Go語言的運行時(runtime)調度完成,而線程是由操作系統調度完成

Go語言還提供channel在多個goroutine間進行通信,goroutine和channel是Go語言秉承的CSP並發模式的重要實現基礎

package main

import (
"fmt"
"sync"
)

//func Person() {
// fmt.Println(12356)
//}

func main() {
    var wg sync.WaitGroup
    defer wg.Done()
    for i:=0; i<10000; i++ {
    //go Person() // 開啟一個單獨的goroutine取執行hello函數(任務)
    go func(i int) {
    wg.Add(1)
    fmt.Println(12355, i)
    }(i)
    fmt.Println("main") // 如果main打印出來 說明整個線程都死了 go就執行不了Person了
    }
    wg.Wait()
}

  • goroutine 通過sync.waitgroup節省負載

package main

import (
	"fmt"
	"math/rand"
	"sync"
)

var wg sync.WaitGroup

func main() {
	for i := 0; i < 1000; i++ {

		go func(i int) {
			wg.Add(1)
			fmt.Println(rand.Intn(1000))
			// rand.Intn(1000) 1000 以內的隨機數
			defer wg.Done()
		}(i)
		fmt.Println("main")
	}
	wg.Wait()
}


  • goroutine調度

GMP是Go語言運行時(runtime)層面實現的, 是go語言自己實現的一套調度系統. 區別於操作系統調度OS線程。

G:就是goroutine里除了存放goroutine信息外還有所在P的綁定信息

M:machine 是Go運行時對操作系統內核線程的虛擬, M與內核線程一般是一一映射的關系,一個goroutine最終是要放到M上運行的

P:管理者一組goroutine隊列,P里面會存儲當前goroutine運行的上下文環境, 自己隊列執行完成,就回去全局隊列取任務,全局完成,就去別的P隊列搶任務 干活。活活的活雷鋒

P的個數是通過runtime.GOMAXPROCS設定最大256 。go1.5版本之后默認為物理線程數, 在並發量大的時候會增加一些p和m但是不會太多,不會太多,切換太頻繁會得不償失

Go語言中的操作系統線程和goroutine的關系

  • 一個操作系統線程對應用戶態多個goroutine
  • go程序可以同時使用多個操作系統線程
  • goroutine和OS線程是多對多的關系 m:n。將m個goroutine分配給n個os的線程去執行

Channel通道

單純的將函數並發執行意義沒有多大的,函數與函數之間是需要傳參交換數據才能體現出並發函數的意義

Go語言的並發模型提倡通過通信共享內存 而不是通過共享內存而實現通信

Go語言中的通道是一種特殊的類型。通道像一個傳送帶或着隊列,總是遵循先進先出的規則,保證收發數據的順序


package main

import "fmt"

func main() {
	var ch chan interface{}  聲明通道
	ch = make(chan interface{})  // 通道初始化
  ch := make(chan interface{}, 16)  // 帶緩沖區的通道初始化
  ch <- 10                         // <- 發送值 和接收值 都是這個符號
	res := <-ch                      // 接收值
	close(ch)                        //關閉通道

	fmt.Println(ch)

}
  • Channel 練習
var wg sync.WaitGroup

func c1(ch1 chan interface{}) {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		ch1 <- rand.Intn(100)

	}
	close(ch1)   // 必須關閉 否則 會出現死鎖

}

func c2(ch1, ch2 chan interface{}) {
	defer wg.Done()

	for value := range ch1{
		ch2 <- value.(int) * value.(int)
	}
	//for {
	//	i,ok := <-ch1
	//	if !ok {
	//		break
	//	}
	//	ch2 <- i.(int) * i.(int)
	//}

	close(ch2)  // 必須關閉否則會出現死鎖
	//fmt.Println(ch2)

}

func main() {
	v1 := make(chan interface{}, 100)
	v2 := make(chan interface{}, 100)
	wg.Add(2)
	go c1(v1)
	go c2(v1, v2)
	fmt.Println(v2)
	for i := range v2{
		fmt.Println(i)
	}
	wg.Wait()



}

  • 單向通道
ch1 chan <- int  只能存
ch1 <- chan int  只能取

  • worker pool(goroutine池)

編寫代碼實現一個計算隨機數的被一個位置數字子和的程序 ,使用goroutine和channel 構建模型

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func worker(id int, jobs <-chan int, list chan<- int) {
	defer wg.Done()
	for item := range jobs {

		fmt.Println(id, "start", item)
		time.Sleep(1 * time.Second)
		fmt.Println(id, "ending", item)

		list <- item * 2

	}
}

func main() {
	jobs := make(chan int, 100)
	list := make(chan int, 100)

	for i := 0; i < 3; i++ {
		go worker(i, jobs, list)
		wg.Add(1)
	}

	for i := 0; i < 5; i++ {
		jobs <- i
	}
	close(jobs)
	wg.Wait()
	//for value := range list {
	//	fmt.Println(value)
	//}
	//time.Sleep(3 * time.Second)
}
執行結果:
2 start 0
1 start 2
0 start 1
0 ending 1
0 start 3
2 ending 0
1 ending 2
2 start 4
2 ending 4
0 ending 3


  • 復雜一點的channel_goroutine
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

//import (
//	"fmt"
//	"sync"
//	"time"
//)
//
//var wg sync.WaitGroup
//
//func worker(id int, jobs <-chan int, list chan<- int) {
//	defer wg.Done()
//	wg.Add(1)
//	for item := range jobs {
//
//		fmt.Println(id, "start", item)
//		time.Sleep(1 * time.Second)
//		fmt.Println(id, "ending", item)
//
//		list <- item * 2
//
//	}
//}
//
//func main() {
//	jobs := make(chan int, 1000)
//	list := make(chan int, 1000)
//
//	for i := 0; i < 64; i++ {
//		go worker(i, jobs, list)
//
//	}
//
//	for i := 0; i < 1000; i++ {
//		jobs <- i
//	}
//	close(jobs)
//	//close(list)
//	//for value := range list {
//	//	fmt.Println(value)
//	//}
//	wg.Wait()
//
//
//}

/*
1.開啟一個goroutine循環生成int64的所計數 發送到jobChan
2.開啟24個goroutine從jobChan中取出隨機數並計算各位數的和 將結果流入res
3.主goroutine從res取出結果並打印
*/

type Job struct {
	x int64
}

type Res struct {
	job *Job
	result int64
}

var wg sync.WaitGroup

func worker(job chan<- *Job) {
	for {
		job <- &Job{x: rand.Int63n(1000000000)}
		time.Sleep(500 * time.Millisecond)
	}
}

func rework(job <-chan *Job, res chan<- *Res) {
	defer wg.Done()
	for {
		data := <-job
		sum := int64(0)
		n := data.x

		for n > 0 {
			sum += n % 10
			n = n / 10
		}


		res <- &Res{job: data, result: sum}
	}

}

func main() {
	wg.Add(1)
	job := make(chan *Job, 100)
	res := make(chan *Res, 100)
	go worker(job)
	wg.Add(24)

	for i := 0; i < 24; i++ {
		go rework(job, res)
	}

	for item := range res{
		fmt.Println(item, item.result, item.job.x)
	}
	wg.Wait()
}


  • Select 多路復用

某些場景下我們需要同時從多個聽到接收數據。通道接收數據時,如果沒有數據可以接收

Go HTTP 包

package main

import (
	"encoding/json"
	"fmt"
	"github.com/julienschmidt/httprouter"
	"io"
	"log"
	"net/http"
)

//var once sync.Once

func helloHandler(w http.ResponseWriter, req *http.Request) {
	io.WriteString(w, "hello, world!\n")

	log.Println(req.RequestURI, req.RemoteAddr,req.Host, req.Method)
}

func main() {
  // 創建路由
	router := httprouter.New()
  // 設置路徑
	router.POST("/v1", posthello)
	router.GET("/", gethtml)
	// http.HandleFunc("/", helloHandler)
	fmt.Println("[+++++++++++++++]")

	_ = http.ListenAndServe(":12345", router)

}

func posthello(w http.ResponseWriter, req *http.Request, _ httprouter.Params)  {
	io.WriteString(w,"123post")
}

func gethtml(w http.ResponseWriter, req *http.Request, _ httprouter.Params)  {
	// io.WriteString(w, "v1 get")
	// json序列化
	text,_ := json.Marshal(map[string]string{"text":"ccn"})
	w.Write(text)
}



GO語言單元測試

測試函數的覆蓋率: > 90%

測試整體代碼覆蓋率: > 60%

單元測試的文件名必須以_test.go 結尾

測試的函數名必須以Test開頭

func TestSplit(t *testing.T)

go test -cover -coverprofile=cover.out  // 生成cover文件
go tool cover -html=cover.out   // 通過瀏覽器打開可視化界面


  • 基准測試

基准測試就是在一定的工作負載之下檢測程序性能的一種方法。

測試函數的函數名必須是Benchmark 開頭


func BenchmarkSplit(b *testing.B)

go test -bench=Split

  • pprof調試工具

Go語言項目中的性能優化主要有以下幾個方面

cpu profile :報告程序的cpu使用情況 按照一定的頻率去采集應用程序在cpu和寄存器上的數據

memory profile : 報告內存使用情況

block profile : 用來分析和查找死鎖等性能瓶頸

goroutine profile : 。。。

  • cpu性能分析

// 開始start
pprof.startcpuprofile(w io.writer)

// 結束stop
pprof.stopcpuprofile()

  • gin使用pprof 性能分析
pprof.Register(router)

go tool pprof http://localhost:9000/debug/pprof/goroutine?second=20
web  // 即可在瀏覽器中看到可視化的性能圖解


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM