frp源碼剖析-frp中的mux模塊


前言

frp幾乎所有的連接處理都是構建在mux模塊之上的,重要性不必多說,來看一下這是個啥吧

ps: 安裝方法

go get "github.com/fatedier/golib/net/mux"

該模塊很小,不到300行,分為兩個文件:mux.gorule.go
因為rule.go文件相對簡單一些,我們先來看這個。

role.go文件

首先看其中所命名的函數類型MatchFunc

type MatchFunc func(data []byte) (match bool)

該類型的函數用來判斷data屬於什么協議。

那么具體如何判斷呢,這里也實現了三個例子:

var (
	HttpsNeedBytesNum uint32 = 1
	HttpNeedBytesNum  uint32 = 3
	YamuxNeedBytesNum uint32 = 2
)

var HttpsMatchFunc MatchFunc = func(data []byte) bool {
	if len(data) < int(HttpsNeedBytesNum) {
		return false
	}

	if data[0] == 0x16 {
		return true
	} else {
		return false
	}
}

// From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
var httpHeadBytes = map[string]struct{}{
	"GET": struct{}{},
	"HEA": struct{}{},
	"POS": struct{}{},
	"PUT": struct{}{},
	"DEL": struct{}{},
	"CON": struct{}{},
	"OPT": struct{}{},
	"TRA": struct{}{},
	"PAT": struct{}{},
}

var HttpMatchFunc MatchFunc = func(data []byte) bool {
	if len(data) < int(HttpNeedBytesNum) {
		return false
	}

	_, ok := httpHeadBytes[string(data[:3])]
	return ok
}

// From https://github.com/hashicorp/yamux/blob/master/spec.md
var YamuxMatchFunc MatchFunc = func(data []byte) bool {
	if len(data) < int(YamuxNeedBytesNum) {
		return false
	}

	if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 {
		return true
	}
	return false
}

這三個函數分別實現了區分HTTPS,HTTP以及go中特有的yamux(實際上這是一個庫,可以參考Go中的I/O多路復用)。

mux.go文件

先來看其中的struct,第一個是Mux第二個是listener,這里先來看一下較為簡單的listener

listener結構體

type listener struct {
	mux *Mux

	priority     int
	needBytesNum uint32
	matchFn      MatchFunc

	c  chan net.Conn
	mu sync.RWMutex
}

// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (net.Conn, error) {
	...
}

// Close removes this listener from the parent mux and closes the channel.
func (ln *listener) Close() error {
	...
}

func (ln *listener) Addr() net.Addr {
	...
}

剛看到這個結構體我們可能很迷惑,不知道都是干啥的,而且網絡編程中一般listener這種東西要綁定在一個套接字上,但很明顯listener沒有,不過其唯一跟套接字相關的可能是其c字段,其是一個由net包中的Conn接口組成的chanel;然后mu字段就是讀寫鎖了,這個很簡單;然后mux字段則是上面提到的兩個結構體中的另一個結構體Mux的指針;接下來到了priority字段上,顧名思義,這個似乎跟優先級有關系,暫且存疑;needBytesNum則更有些蒙了,不過感覺其是跟讀取byte的數量有關系,最后是matchFn

好,初步認識了這個結構體的結構后,我們看看其方法。三個方法的listener實現了net模塊中的Listener接口:

// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
	// Accept waits for and returns the next connection to the listener.
	Accept() (Conn, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() Addr
}

然后先來分析其Accept方法:

func (ln *listener) Accept() (net.Conn, error) {
	conn, ok := <-ln.c
	if !ok {
		return nil, fmt.Errorf("network connection closed")
	}
	return conn, nil
}

該方法很簡單,就是從c這個由Conn組成的channel中,獲取Conn對象,好這里我們就明白了,這個listener和普通的不一樣,他很特別,普通的listener監聽的是套接字,而他監聽的是channel,另外,肯定有某個地方在不停的往c這個channel中放Conn

接下來是Close方法:

func (ln *listener) Close() error {
	if ok := ln.mux.release(ln); ok {
		// Close done to signal to any RLock holders to release their lock.
		close(ln.c)
	}
	return nil
}

我們暫且先把這個ln.mux.release(ln)放到一邊,因為還不知道這個東西干了啥,暫且只需關注close(ln.c),我們知道這個函數是用來關閉channel的,go推薦由發送端調用,但這里似乎listener是一個消費端,可以看一下如何優雅的關閉Go Channel,看來重點在於ln.mux.release(ln)這里,我們暫且存疑[1],留待下面解決。

最后是Addr方法:

func (ln *listener) Addr() net.Addr {
	if ln.mux == nil {
		return nil
	}
	ln.mux.mu.RLock()
	defer ln.mux.mu.RUnlock()
	if ln.mux.ln == nil {
		return nil
	}
	return ln.mux.ln.Addr()
}

在這里,mu字段就用上了,加讀鎖,然后返回mux字段中的ln字段的Addr方法。也就是這句return ln.mux.ln.Addr()

Mux結構體

字段以及相關函數

Mux結構體則相對來說復雜很多,先來看一下他的字段定義:

type Mux struct {
	ln net.Listener

	defaultLn *listener

	// sorted by priority
	lns             []*listener
	maxNeedBytesNum uint32

	mu sync.RWMutex
}

好,第一個字段ln是一個Listener接口;然后defaultLn是一個listener的指針;lns則是由listener的指針組成的切片,根據注釋// sorted by priority,我們終於知道listenerpriority字段是干啥的了;接下來是maxNeedBytesNum字段,好奇怪,比起listenerneedBytesNum多了個“Max”,所以我們推測這個值取得是lns以及defaultLn字段中所有listenerneedBytesNum值最大的;最后的mu字段我們就不說了。

需要注意的是:我們可能會發現Muxlistener存在相互引用,但在Go中我們倒也不用太擔心,因為Go采用“標記-回收”或者其變種的垃圾回收算法,感興趣可以參考Golang 垃圾回收剖析

mux.go文件中定義了Mux的生成函數NewMux:

func NewMux(ln net.Listener) (mux *Mux) {
	mux = &Mux{
		ln:  ln,
		lns: make([]*listener, 0),
	}
	return
}

很簡單,需要注意的是ln字段存儲的一般不是listener這樣的非常規Listener,一般是TCPListener這樣具體的綁定了套接字的監聽器。

Mux方法

接下來看Mux結構體的方法,首先看ListencopyLns

// priority
func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener {
    // 1
	ln := &listener{
		c:            make(chan net.Conn),
		mux:          mux,
		priority:     priority,
		needBytesNum: needBytesNum,
		matchFn:      fn,
	}

	mux.mu.Lock()
	defer mux.mu.Unlock()
	// 2
	if needBytesNum > mux.maxNeedBytesNum {
		mux.maxNeedBytesNum = needBytesNum
	}

    // 3
	newlns := append(mux.copyLns(), ln)
	sort.Slice(newlns, func(i, j int) bool {
		if newlns[i].priority == newlns[j].priority {
			return newlns[i].needBytesNum < newlns[j].needBytesNum
		}
		return newlns[i].priority < newlns[j].priority
	})
	mux.lns = newlns
	return ln
}

func (mux *Mux) copyLns() []*listener {
	lns := make([]*listener, 0, len(mux.lns))
	for _, l := range mux.lns {
		lns = append(lns, l)
	}
	return lns
}

copyLns方法很簡單,就是跟名字的含義一樣,生成一個lns字段的副本並返回。

Listen基本做了三步:

  1. 生成一個listener結構體實例,並獲取互斥鎖
  2. 根據情況更新needBytesNum字段
  3. 將新生成的listener實例按照優先級放入lns字段對應的slice中

接下來是ListenHttpListenHttps方法:

func (mux *Mux) ListenHttp(priority int) net.Listener {
	return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc)
}

func (mux *Mux) ListenHttps(priority int) net.Listener {
	return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc)
}

這兩個差不多,所以放到一起說,基本都是專門寫了一個方法讓我們能方便的創建處理Http或者Httpslistener

再來看DefaultListener方法:

func (mux *Mux) DefaultListener() net.Listener {
	mux.mu.Lock()
	defer mux.mu.Unlock()
	if mux.defaultLn == nil {
		mux.defaultLn = &listener{
			c:   make(chan net.Conn),
			mux: mux,
		}
	}
	return mux.defaultLn
}

這個方法很簡單,基本就是有則返回沒有則生成然后返回的套路。不過我們要注意defaultLn字段中的listener是不放入lns字段中的。

接下來是Server方法:

// Serve handles connections from ln and multiplexes then across registered listeners.
func (mux *Mux) Serve() error {
	for {
		// Wait for the next connection.
		// If it returns a temporary error then simply retry.
		// If it returns any other error then exit immediately.
		conn, err := mux.ln.Accept()
		if err, ok := err.(interface {
			Temporary() bool
		}); ok && err.Temporary() {
			continue
		}

		if err != nil {
			return err
		}

		go mux.handleConn(conn)
	}
}

一般來說,當我們調用NewMux函數以后,接下來就會調用Server方法,該方法基本上就是阻塞監聽某個套接字,當有連接建立成功后立即另起一個goroutine調用handleConn方法;當連接建立失敗根據err是否含有Temporary方法,如果有則執行並忽略錯誤,沒有則返回錯誤。

現在我們看看handleConn方法干了些啥:

func (mux *Mux) handleConn(conn net.Conn) {
    // 1
	mux.mu.RLock()
	maxNeedBytesNum := mux.maxNeedBytesNum
	lns := mux.lns
	defaultLn := mux.defaultLn
	mux.mu.RUnlock()
    
    // 2
	sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum))
	data := make([]byte, maxNeedBytesNum)

	conn.SetReadDeadline(time.Now().Add(DefaultTimeout))
	_, err := io.ReadFull(rd, data)
	if err != nil {
		conn.Close()
		return
	}
	conn.SetReadDeadline(time.Time{})
    // 3
	for _, ln := range lns {
		if match := ln.matchFn(data); match {
			err = errors.PanicToError(func() {
				ln.c <- sharedConn
			})
			if err != nil {
				conn.Close()
			}
			return
		}
	}

	// No match listeners
	if defaultLn != nil {
		err = errors.PanicToError(func() {
			defaultLn.c <- sharedConn
		})
		if err != nil {
			conn.Close()
		}
		return
	}

	// No listeners for this connection, close it.
	conn.Close()
	return
}

handleConn方法也不算復雜,大體可以分為三步:

  1. 獲取當前狀態
  2. conn中讀取數據,注意:shareConnrd存在單向關系,如果從rd中讀取數據的話,數據也會復制一份放到shareConn中,反過來就不成立了
  3. 讀取到的數據會被遍歷,最終選出與matchFunc匹配的最高優先級的listener,並將shareConn放入該listenerc字段中,如果沒有匹配到則放到defaultLn中的c字段中,如果defaultLnnil的話就不處理,直接關閉conn

最后來到了release方法了:

func (mux *Mux) release(ln *listener) bool {
	result := false
	mux.mu.Lock()
	defer mux.mu.Unlock()
	lns := mux.copyLns()

	for i, l := range lns {
		if l == ln {
			lns = append(lns[:i], lns[i+1:]...)
			result = true
			break
		}
	}
	mux.lns = lns
	return result
}

release方法意思很明確:把對應的listenerlns中移除,並把結果返回,整個過程有互斥鎖,我們回到存疑1,盡管有互斥鎖,但在這種情況下:當某個goroutine運行到handleConn已經執行到了第三階段的開始狀態(也就是還沒有找到匹配的listener)時,且Go運行在多核狀態下,當另一個goroutine運行完listenerClose方法時,這時就可能發生往一個已經關閉的channel中send數據,但請注意handleConn的第三步的這段代碼:

err = errors.PanicToError(func() { // 就是這里了
	ln.c <- sharedConn
})
if err != nil {
	conn.Close()
}

這個PanicToError是這樣的:

func PanicToError(fn func()) (err error) {
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("Panic error: %v", r)
		}
	}()

	fn()
	return
}

基本上就是執行了recover然后將錯誤打印出來,結合下面的對err的判斷,就會將send失敗的conn關閉。

總結

  1. Mux中包含了一個初始監聽器,基本上所有的事件(比如說新的連接建立,之所以叫事件是因為我實在想不出更精確的詞語了)都起源於此
  2. listener實現了net.Listener接口,可以作為二級監聽器使用(比如傳給net/http.Server結構體的Server方法進行處理)。
  3. Mux包含了一個由listener組成的有序slice,當有事件產生時就會遍歷這個slice找出合適的listener並將事件傳給他。

講到這里基本上是完事了。整個mux模塊還是比較簡單的,起碼是由一個個簡單的東西組合而成。那么一起來意淫一下整體流程吧。

假如我要實現這么一個網絡程序:

  1. 綁定監聽一個基於tcp的套接字
  2. 我們允許其應用層可支持多個(比如說支持http https這兩個吧,盡管http和https可以說是一個協議。。),不同的應用層協議對應不同的處理函數

就這么兩個很簡單的要求,不難吧。

那么我們一起來實現吧:


type HandleFunc func(c net.Conn) (n int, err error) 

type MyServer struct {
    l net.Listener
    hFunc HandleFunc
}

func (h *MyServer) Server() (err error) {
    for {
        conn, err := h.l.Accept()
        if err != nil {
            return
        }
        go h.hFunc(conn)
    }
}

func HandleHttp(c net.Conn)(n int, err error){
    n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?"))
}

func HandleHttps(c net.Conn)(n int, err error){
    n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?"))
}


func main() (err error){
    ln, err := net.Listen("tcp", "0.0.0.0:12345")
	if err != nil {
		err = fmt.Errorf("Create server listener error, %v", err)
		return
	}
	muxer = mux.NewMux(ln)
	
	var lHttp, lHttps net.Listener
	lHttp = muxer.ListenHttp(1)
	httpServer := *MyServer{lHttp, HandleHttp}
	
	lHttps = muxer.ListenHttps(2)
	httpsServer := *MyServer{lHttps, HandleHttps}
	
	go httpServer.Server()
	go httpsServer.Server()

	err = muxer.Serve()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM