golang:協程安全


多路復用

Go語言中提供了一個關鍵字select,通過select可以監聽channel上的數據流動。select的用法與switch語法類似,由select開始一個新的選擇塊,每個選擇條件由case語句來描述。只不過,select的case有比較多的限制,其中最大的一條限制就是每個case語句里必須是一個IO操作。

select 語法如下:

   select {
    case <-chan1:
        // 如果chan1成功讀到數據,則進行該case處理語句
    case chan2 <- 1:
        // 如果成功向chan2寫入數據,則進行該case處理語句
    default:
        // 如果上面都沒有成功,則進入default處理流程
    }

在一個select語句中,會按順序從頭至尾評估每一個發送和接收的語句;如果其中的任意一語句可以繼續執行(即沒有被阻塞),那么就從那些可以執行的語句中任意選擇一條來使用。如果沒有任意一條語句可以執行(即所有的通道都被阻塞),那么有兩種可能的情況:⑴ 如果給出了default語句,那么就會執行default語句,同時程序的執行會從select語句后的語句中恢復。⑵ 如果沒有default語句,那么select語句將被阻塞,直到至少有一個channel可以進行下去。

在一般的業務場景下,select不會用default,當監聽的流中再沒有數據,IO操作就 會阻塞現象,如果使用了default,此時可以出讓CPU時間片。如果使用了default 就形成了非阻塞狀態,形成了忙輪訓,會占用CPU、系統資源。

阻塞與非阻塞使用場景

  • 阻塞: 如:在監聽超時退出時,如果100秒內無操作,擇退出,此時添加了default會形成忙輪訓,超時監聽變成了無效。
  • 非阻塞: 如,在一個只有一個業務邏輯處理時,主進程控制進程的退出。此時可以使用default。

定時器

Go語言中定時器的使用有三個方法

  • time.Sleep()
  • time.NewTimer() 返回一個時間的管道, time.C 讀取管道的內容
  • time.After(5 * time.Second) 封裝了time.NewTimer(),反回了一個 time.C的管道

示例

select {
    case <-time.After(time.Second * 10):
}

鎖和條件變量

Go語言中為了解決協程間同步問題,提供了標准庫代碼,包syncsync/atomic中。

互斥鎖

互斥鎖是傳統並發編程對共享資源進行訪問控制的主要手段,它由標准庫sync中的Mutex結構體類型表示。sync.Mutex類型只有兩個公開的指針方法,Lock和Unlock。Lock鎖定當前的共享資源,Unlock進行解鎖。

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

var mutex sync.Mutex

func print(str string) {
	mutex.Lock()         // 添加互斥鎖
	defer mutex.Unlock() // 使用結束時解鎖

	for _, data := range str { // 迭代器
		fmt.Printf("%c", data)
		time.Sleep(time.Second) // 放大協程競爭效果
	}
	fmt.Println()
}

func main() {
	go print("hello") // main 中傳參
	go print("world")
	for {
		runtime.GC()
	}
}

讀寫鎖

讀寫鎖的使用場景一般為讀多寫少,可以讓多個讀操作並發,同時讀取,但是對於寫操作是完全互斥的。也就是說,當一個goroutine進行寫操作的時候,其他goroutine不能進行讀寫操作;當一個goroutine獲取讀鎖之后,其他的goroutine獲取寫鎖都會等待

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var count int           // 全局變量count
var rwlock sync.RWMutex // 全局讀寫鎖 rwlock

func read(n int) {
	for {
		rwlock.RLock()
		fmt.Printf("reading goroutine %d ...\n", n)
		num := count
		fmt.Printf("read goroutine %d finished,get number %d\n", n, num)
		rwlock.RUnlock()
	}

}
func write(n int) {
	for {
		rwlock.Lock()
		fmt.Printf("writing goroutine %d ...\n", n)
		num := rand.Intn(1000)
		count = num
		fmt.Printf("write goroutine %d finished,write number %d\n", n, num)
		rwlock.Unlock()
	}
}

func main() {
	for i := 0; i < 5; i++ {
		go read(i + 1)
		time.Sleep(time.Microsecond * 100)
	}
	for i := 0; i < 5; i++ {
		go write(i + 1)
		time.Sleep(time.Microsecond * 100)
	}
	for {

	}
}

可以看出,讀寫鎖控制下的多個寫操作之間都是互斥的,並且寫操作與讀操作之間也都是互斥的。但是,多個讀操作之間不存在互斥關系。

Go語言中的死鎖

死鎖 deadlock 是指兩個或兩個以上的進程在執行過程中,由於競爭資源或者由於彼此通信而造成的一種阻塞的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖。

單gorutine同時讀寫,寫死鎖

在一個gorutine中,當channel無緩沖,寫阻塞,等待讀取導致死鎖

解決,應該至少在2個gorutine進行channle通訊,或者使用緩沖區。

package main

func main() {
	channel := make(chan int)
	channel <- 1
	<-channel
}

多gorutine使用一個channel通信,寫先於讀

代碼順序執行時,寫操作阻塞,導致后面協程無法啟動進行讀操作,導致死鎖

package main

func main() {
	channel := make(chan int)
	channel <- 1
	go func() {
		<-channel
	}()
}

多channel交叉死鎖

在goroutine中,多個goroutine使用多個channel互相等待對方寫入,導致死鎖

package main

func main() {

	channel1 := make(chan int)
	channel2 := make(chan int)

	go func() {

		select {
		case <-channel1:
			channel2 <- 1
		}
	}()

	select {
	case <-channel2:
		channel1 <- 1
	}
}

隱性死鎖

盡量不要將 互斥鎖、讀寫鎖 與 channel 混用情況下,讓讀先進行讀時,因為沒寫入被阻塞,無法解除。寫入時,因為沒有讀出被阻塞,鎖無法解除,導致無數據輸出,形成隱形死鎖。此時編譯器是不報錯的。

package main

import (
	"fmt"
	"sync"
)

func main() {
	channel := make(chan int)
	var rwlock sync.RWMutex
	go func() {
		for {
			rwlock.Lock()
			channel <- 1
			fmt.Println("write", 1)
			rwlock.Unlock()
		}
	}()

	go func() {
		for {
			rwlock.RLock()
			n := <-channel
			fmt.Println(n)
			rwlock.Unlock()
		}

	}()

	for {

	}
}

Context 上下文

context定義了上下文類型,該類型在API邊界之間以及進程之間傳遞截止時間,取消信號和其他請求范圍的值。當在對請求傳入一個上下文,可以選擇將其替換為使用WithCancelWithDeadlineWithTimeout。在取消后,從該context處派生的所有子請求也會被取消。

Context的結構體

  • Deadline() 返回context的截止時間。
  • Done() 返回一個channle,當timeout或cancelfuc將會close(chan)
  • Err() 返回錯誤,未關閉Done()返回nil,取消,返回 "context canceled", Deadline返回超時
  • Value 返回值。
type Context interface {
	// Deadline returns the time when work done on behalf of this context
	// should be canceled. Deadline returns ok==false when no deadline is
	// set. Successive calls to Deadline return the same results.
	Deadline() (deadline time.Time, ok bool)

	// Done returns a channel that's closed when work done on behalf of this
	// context should be canceled. Done may return nil if this context can
	// never be canceled. Successive calls to Done return the same value.
	// The close of the Done channel may happen asynchronously,
	// after the cancel function returns.
	//
	// WithCancel arranges for Done to be closed when cancel is called;
	// WithDeadline arranges for Done to be closed when the deadline
	// expires; WithTimeout arranges for Done to be closed when the timeout
	// elapses.
	//
	// Done is provided for use in select statements:
	//
	//  // Stream generates values with DoSomething and sends them to out
	//  // until DoSomething returns an error or ctx.Done is closed.
	//  func Stream(ctx context.Context, out chan<- Value) error {
	//  	for {
	//  		v, err := DoSomething(ctx)
	//  		if err != nil {
	//  			return err
	//  		}
	//  		select {
	//  		case <-ctx.Done():
	//  			return ctx.Err()
	//  		case out <- v:
	//  		}
	//  	}
	//  }
	//
	// See https://blog.golang.org/pipelines for more examples of how to use
	// a Done channel for cancellation.
	Done() <-chan struct{}

	// If Done is not yet closed, Err returns nil.
	// If Done is closed, Err returns a non-nil error explaining why:
	// Canceled if the context was canceled
	// or DeadlineExceeded if the context's deadline passed.
	// After Err returns a non-nil error, successive calls to Err return the same error.
	Err() error

	// Value returns the value associated with this context for key, or nil
	// if no value is associated with key. Successive calls to Value with
	// the same key returns the same result.
	//
	// Use context values only for request-scoped data that transits
	// processes and API boundaries, not for passing optional parameters to
	// functions.
	//
	// A key identifies a specific value in a Context. Functions that wish
	// to store values in Context typically allocate a key in a global
	// variable then use that key as the argument to context.WithValue and
	// Context.Value. A key can be any type that supports equality;
	// packages should define keys as an unexported type to avoid
	// collisions.
	//
	// Packages that define a Context key should provide type-safe accessors
	// for the values stored using that key:
	//
	// 	// Package user defines a User type that's stored in Contexts.
	// 	package user
	//
	// 	import "context"
	//
	// 	// User is the type of value stored in the Contexts.
	// 	type User struct {...}
	//
	// 	// key is an unexported type for keys defined in this package.
	// 	// This prevents collisions with keys defined in other packages.
	// 	type key int
	//
	// 	// userKey is the key for user.User values in Contexts. It is
	// 	// unexported; clients use user.NewContext and user.FromContext
	// 	// instead of using this key directly.
	// 	var userKey key
	//
	// 	// NewContext returns a new Context that carries value u.
	// 	func NewContext(ctx context.Context, u *User) context.Context {
	// 		return context.WithValue(ctx, userKey, u)
	// 	}
	//
	// 	// FromContext returns the User value stored in ctx, if any.
	// 	func FromContext(ctx context.Context) (*User, bool) {
	// 		u, ok := ctx.Value(userKey).(*User)
	// 		return u, ok
	// 	}
	Value(key interface{}) interface{}
}

演示使用可取消的上下文。可在函數結束時defer cancel() 防止goroutine的泄露。

package main

import (
	"context"
	"fmt"
	"time"
)

func worker(ctx context.Context, name string) {
	n := 0
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "去划水了", n)
			return
		default:
			fmt.Println(name, "干活中", n)
			time.Sleep(time.Second)
		}
		n++
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	for n := 0; n < 5; n++ {
		go worker(ctx, fmt.Sprintf("worker%d", n))
	}

	<-time.After(time.Second * 5)
	cancel()
	for {

	}
}

超時處理,WithTimeout 當時間到達設置的時間后退出,也可以使用cancelFunc()退出處理

package main

import (
	"context"
	"fmt"
	"time"
)

func worker(ctx context.Context, name string) {
	n := 0
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "去划水了", n)
			return
		default:
			fmt.Println(name, "干活中", n)
			time.Sleep(time.Second)
		}
		n++
	}
}

func main() {
	//ctx, cancel := context.WithCancel(context.Background())
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

	for n := 0; n < 2; n++ {
		go worker(ctx, fmt.Sprintf("worker%d", n))
	}

	<-time.After(time.Second * 5)
	fmt.Println("取消了")
	cancel()
}

WithDeadline,在標准庫中可以看出,實際上WithTimeout是封裝了WithDeadline。其功能也是超時退出。

package main

import (
	"context"
	"fmt"
	"time"
)

func worker(ctx context.Context, name string) {
	n := 0
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "去划水了", n)
			fmt.Println(ctx.Err())
			return
		default:
			fmt.Println(name, "干活中", n)
			time.Sleep(time.Second)
		}
		n++
	}
}

func main() {
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*3))
	for n := 0; n < 2; n++ {
		go worker(ctx, fmt.Sprintf("worker%d", n))
	}

	<-time.After(time.Second * 5)
	fmt.Println("取消了")
	cancel()
}

Context總結

  • Context是Go語言在1.7中加入標准庫的,是作為Goroutine線程安全,防止線程泄露的上下文管理的操作。
  • context包的核心是Context結構體。
  • Context的常用方法為 WithTimeout()WithCancel()
  • Context在使用時,不要放在結構體內使用,要以函數的參數進行傳遞。
  • Context是線程安全的,可以在多個Goroutine傳遞,當對其取消操作時,所有Goroutine都執行取消操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM