前言
frp幾乎所有的連接處理都是構建在mux模塊之上的,重要性不必多說,來看一下這是個啥吧
ps: 安裝方法
go get "github.com/fatedier/golib/net/mux"
該模塊很小,不到300行,分為兩個文件:mux.go
和rule.go
。
因為rule.go
文件相對簡單一些,我們先來看這個。
role.go文件
首先看其中所命名的函數類型MatchFunc
:
type MatchFunc func(data []byte) (match bool)
該類型的函數用來判斷data
屬於什么協議。
那么具體如何判斷呢,這里也實現了三個例子:
var (
HttpsNeedBytesNum uint32 = 1
HttpNeedBytesNum uint32 = 3
YamuxNeedBytesNum uint32 = 2
)
var HttpsMatchFunc MatchFunc = func(data []byte) bool {
if len(data) < int(HttpsNeedBytesNum) {
return false
}
if data[0] == 0x16 {
return true
} else {
return false
}
}
// From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
var httpHeadBytes = map[string]struct{}{
"GET": struct{}{},
"HEA": struct{}{},
"POS": struct{}{},
"PUT": struct{}{},
"DEL": struct{}{},
"CON": struct{}{},
"OPT": struct{}{},
"TRA": struct{}{},
"PAT": struct{}{},
}
var HttpMatchFunc MatchFunc = func(data []byte) bool {
if len(data) < int(HttpNeedBytesNum) {
return false
}
_, ok := httpHeadBytes[string(data[:3])]
return ok
}
// From https://github.com/hashicorp/yamux/blob/master/spec.md
var YamuxMatchFunc MatchFunc = func(data []byte) bool {
if len(data) < int(YamuxNeedBytesNum) {
return false
}
if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 {
return true
}
return false
}
這三個函數分別實現了區分HTTPS
,HTTP
以及go中特有的yamux
(實際上這是一個庫,可以參考Go中的I/O多路復用)。
mux.go文件
先來看其中的struct
,第一個是Mux
第二個是listener
,這里先來看一下較為簡單的listener
。
listener結構體
type listener struct {
mux *Mux
priority int
needBytesNum uint32
matchFn MatchFunc
c chan net.Conn
mu sync.RWMutex
}
// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (net.Conn, error) {
...
}
// Close removes this listener from the parent mux and closes the channel.
func (ln *listener) Close() error {
...
}
func (ln *listener) Addr() net.Addr {
...
}
剛看到這個結構體我們可能很迷惑,不知道都是干啥的,而且網絡編程中一般listener這種東西要綁定在一個套接字上,但很明顯listener
沒有,不過其唯一跟套接字相關的可能是其c
字段,其是一個由net
包中的Conn
接口組成的chanel
;然后mu
字段就是讀寫鎖了,這個很簡單;然后mux
字段則是上面提到的兩個結構體中的另一個結構體Mux
的指針;接下來到了priority
字段上,顧名思義,這個似乎跟優先級有關系,暫且存疑;needBytesNum
則更有些蒙了,不過感覺其是跟讀取byte的數量有關系,最后是matchFn
。
好,初步認識了這個結構體的結構后,我們看看其方法。三個方法的listener
實現了net
模塊中的Listener
接口:
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
// Addr returns the listener's network address.
Addr() Addr
}
然后先來分析其Accept
方法:
func (ln *listener) Accept() (net.Conn, error) {
conn, ok := <-ln.c
if !ok {
return nil, fmt.Errorf("network connection closed")
}
return conn, nil
}
該方法很簡單,就是從c
這個由Conn
組成的channel
中,獲取Conn
對象,好這里我們就明白了,這個listener
和普通的不一樣,他很特別,普通的listener
監聽的是套接字,而他監聽的是channel
,另外,肯定有某個地方在不停的往c
這個channel
中放Conn
。
接下來是Close
方法:
func (ln *listener) Close() error {
if ok := ln.mux.release(ln); ok {
// Close done to signal to any RLock holders to release their lock.
close(ln.c)
}
return nil
}
我們暫且先把這個ln.mux.release(ln)
放到一邊,因為還不知道這個東西干了啥,暫且只需關注close(ln.c)
,我們知道這個函數是用來關閉channel
的,go推薦由發送端調用,但這里似乎listener
是一個消費端,可以看一下如何優雅的關閉Go Channel,看來重點在於ln.mux.release(ln)
這里,我們暫且存疑[1],留待下面解決。
最后是Addr
方法:
func (ln *listener) Addr() net.Addr {
if ln.mux == nil {
return nil
}
ln.mux.mu.RLock()
defer ln.mux.mu.RUnlock()
if ln.mux.ln == nil {
return nil
}
return ln.mux.ln.Addr()
}
在這里,mu
字段就用上了,加讀鎖,然后返回mux
字段中的ln
字段的Addr
方法。也就是這句return ln.mux.ln.Addr()
。
Mux結構體
字段以及相關函數
Mux結構體則相對來說復雜很多,先來看一下他的字段定義:
type Mux struct {
ln net.Listener
defaultLn *listener
// sorted by priority
lns []*listener
maxNeedBytesNum uint32
mu sync.RWMutex
}
好,第一個字段ln
是一個Listener
接口;然后defaultLn
是一個listener
的指針;lns
則是由listener
的指針組成的切片,根據注釋// sorted by priority
,我們終於知道listener
的priority
字段是干啥的了;接下來是maxNeedBytesNum
字段,好奇怪,比起listener
的needBytesNum
多了個“Max”,所以我們推測這個值取得是lns
以及defaultLn
字段中所有listener
中needBytesNum
值最大的;最后的mu
字段我們就不說了。
需要注意的是:我們可能會發現Mux
和listener
存在相互引用,但在Go
中我們倒也不用太擔心,因為Go
采用“標記-回收”或者其變種的垃圾回收算法,感興趣可以參考Golang 垃圾回收剖析
在mux.go
文件中定義了Mux
的生成函數NewMux
:
func NewMux(ln net.Listener) (mux *Mux) {
mux = &Mux{
ln: ln,
lns: make([]*listener, 0),
}
return
}
很簡單,需要注意的是ln
字段存儲的一般不是listener
這樣的非常規Listener,一般是TCPListener
這樣具體的綁定了套接字的監聽器。
Mux方法
接下來看Mux
結構體的方法,首先看Listen
和copyLns
// priority
func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener {
// 1
ln := &listener{
c: make(chan net.Conn),
mux: mux,
priority: priority,
needBytesNum: needBytesNum,
matchFn: fn,
}
mux.mu.Lock()
defer mux.mu.Unlock()
// 2
if needBytesNum > mux.maxNeedBytesNum {
mux.maxNeedBytesNum = needBytesNum
}
// 3
newlns := append(mux.copyLns(), ln)
sort.Slice(newlns, func(i, j int) bool {
if newlns[i].priority == newlns[j].priority {
return newlns[i].needBytesNum < newlns[j].needBytesNum
}
return newlns[i].priority < newlns[j].priority
})
mux.lns = newlns
return ln
}
func (mux *Mux) copyLns() []*listener {
lns := make([]*listener, 0, len(mux.lns))
for _, l := range mux.lns {
lns = append(lns, l)
}
return lns
}
copyLns
方法很簡單,就是跟名字的含義一樣,生成一個lns
字段的副本並返回。
Listen
基本做了三步:
- 生成一個
listener
結構體實例,並獲取互斥鎖 - 根據情況更新
needBytesNum
字段 - 將新生成的
listener
實例按照優先級放入lns
字段對應的slice中
接下來是ListenHttp
和ListenHttps
方法:
func (mux *Mux) ListenHttp(priority int) net.Listener {
return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc)
}
func (mux *Mux) ListenHttps(priority int) net.Listener {
return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc)
}
這兩個差不多,所以放到一起說,基本都是專門寫了一個方法讓我們能方便的創建處理Http
或者Https
的listener
。
再來看DefaultListener
方法:
func (mux *Mux) DefaultListener() net.Listener {
mux.mu.Lock()
defer mux.mu.Unlock()
if mux.defaultLn == nil {
mux.defaultLn = &listener{
c: make(chan net.Conn),
mux: mux,
}
}
return mux.defaultLn
}
這個方法很簡單,基本就是有則返回沒有則生成然后返回的套路。不過我們要注意defaultLn
字段中的listener
是不放入lns
字段中的。
接下來是Server
方法:
// Serve handles connections from ln and multiplexes then across registered listeners.
func (mux *Mux) Serve() error {
for {
// Wait for the next connection.
// If it returns a temporary error then simply retry.
// If it returns any other error then exit immediately.
conn, err := mux.ln.Accept()
if err, ok := err.(interface {
Temporary() bool
}); ok && err.Temporary() {
continue
}
if err != nil {
return err
}
go mux.handleConn(conn)
}
}
一般來說,當我們調用NewMux
函數以后,接下來就會調用Server
方法,該方法基本上就是阻塞監聽某個套接字,當有連接建立成功后立即另起一個goroutine調用handleConn
方法;當連接建立失敗根據err
是否含有Temporary
方法,如果有則執行並忽略錯誤,沒有則返回錯誤。
現在我們看看handleConn
方法干了些啥:
func (mux *Mux) handleConn(conn net.Conn) {
// 1
mux.mu.RLock()
maxNeedBytesNum := mux.maxNeedBytesNum
lns := mux.lns
defaultLn := mux.defaultLn
mux.mu.RUnlock()
// 2
sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum))
data := make([]byte, maxNeedBytesNum)
conn.SetReadDeadline(time.Now().Add(DefaultTimeout))
_, err := io.ReadFull(rd, data)
if err != nil {
conn.Close()
return
}
conn.SetReadDeadline(time.Time{})
// 3
for _, ln := range lns {
if match := ln.matchFn(data); match {
err = errors.PanicToError(func() {
ln.c <- sharedConn
})
if err != nil {
conn.Close()
}
return
}
}
// No match listeners
if defaultLn != nil {
err = errors.PanicToError(func() {
defaultLn.c <- sharedConn
})
if err != nil {
conn.Close()
}
return
}
// No listeners for this connection, close it.
conn.Close()
return
}
handleConn
方法也不算復雜,大體可以分為三步:
- 獲取當前狀態
- 從
conn
中讀取數據,注意:shareConn
和rd
存在單向關系,如果從rd
中讀取數據的話,數據也會復制一份放到shareConn
中,反過來就不成立了 - 讀取到的數據會被遍歷,最終選出
與matchFunc
匹配的最高優先級的listener
,並將shareConn
放入該listener
的c
字段中,如果沒有匹配到則放到defaultLn
中的c
字段中,如果defaultLn
是nil
的話就不處理,直接關閉conn
。
最后來到了release
方法了:
func (mux *Mux) release(ln *listener) bool {
result := false
mux.mu.Lock()
defer mux.mu.Unlock()
lns := mux.copyLns()
for i, l := range lns {
if l == ln {
lns = append(lns[:i], lns[i+1:]...)
result = true
break
}
}
mux.lns = lns
return result
}
release方法意思很明確:把對應的listener
從lns
中移除,並把結果返回,整個過程有互斥鎖,我們回到存疑1,盡管有互斥鎖,但在這種情況下:當某個goroutine運行到handleConn
已經執行到了第三階段的開始狀態(也就是還沒有找到匹配的listener
)時,且Go
運行在多核狀態下,當另一個goroutine運行完listener
的Close
方法時,這時就可能發生往一個已經關閉的channel
中send數據,但請注意handleConn
的第三步的這段代碼:
err = errors.PanicToError(func() { // 就是這里了
ln.c <- sharedConn
})
if err != nil {
conn.Close()
}
這個PanicToError
是這樣的:
func PanicToError(fn func()) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic error: %v", r)
}
}()
fn()
return
}
基本上就是執行了recover
然后將錯誤打印出來,結合下面的對err的判斷,就會將send失敗的conn關閉。
總結
Mux
中包含了一個初始監聽器,基本上所有的事件(比如說新的連接建立,之所以叫事件是因為我實在想不出更精確的詞語了)都起源於此listener
實現了net.Listener
接口,可以作為二級監聽器使用(比如傳給net/http.Server
結構體的Server
方法進行處理)。Mux
包含了一個由listener
組成的有序slice,當有事件產生時就會遍歷這個slice找出合適的listener
並將事件傳給他。
講到這里基本上是完事了。整個mux
模塊還是比較簡單的,起碼是由一個個簡單的東西組合而成。那么一起來意淫一下整體流程吧。
假如我要實現這么一個網絡程序:
- 綁定監聽一個基於tcp的套接字
- 我們允許其應用層可支持多個(比如說支持http https這兩個吧,盡管http和https可以說是一個協議。。),不同的應用層協議對應不同的處理函數
就這么兩個很簡單的要求,不難吧。
那么我們一起來實現吧:
type HandleFunc func(c net.Conn) (n int, err error)
type MyServer struct {
l net.Listener
hFunc HandleFunc
}
func (h *MyServer) Server() (err error) {
for {
conn, err := h.l.Accept()
if err != nil {
return
}
go h.hFunc(conn)
}
}
func HandleHttp(c net.Conn)(n int, err error){
n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?"))
}
func HandleHttps(c net.Conn)(n int, err error){
n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?"))
}
func main() (err error){
ln, err := net.Listen("tcp", "0.0.0.0:12345")
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
muxer = mux.NewMux(ln)
var lHttp, lHttps net.Listener
lHttp = muxer.ListenHttp(1)
httpServer := *MyServer{lHttp, HandleHttp}
lHttps = muxer.ListenHttps(2)
httpsServer := *MyServer{lHttps, HandleHttps}
go httpServer.Server()
go httpsServer.Server()
err = muxer.Serve()
}