前言
frp幾乎所有的連接處理都是構建在mux模塊之上的,重要性不必多說,來看一下這是個啥吧
ps: 安裝方法
go get "github.com/fatedier/golib/net/mux"
該模塊很小,不到300行,分為兩個文件:mux.go和rule.go。
因為rule.go文件相對簡單一些,我們先來看這個。
role.go文件
首先看其中所命名的函數類型MatchFunc:
type MatchFunc func(data []byte) (match bool)
該類型的函數用來判斷data屬於什么協議。
那么具體如何判斷呢,這里也實現了三個例子:
var (
HttpsNeedBytesNum uint32 = 1
HttpNeedBytesNum uint32 = 3
YamuxNeedBytesNum uint32 = 2
)
var HttpsMatchFunc MatchFunc = func(data []byte) bool {
if len(data) < int(HttpsNeedBytesNum) {
return false
}
if data[0] == 0x16 {
return true
} else {
return false
}
}
// From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
var httpHeadBytes = map[string]struct{}{
"GET": struct{}{},
"HEA": struct{}{},
"POS": struct{}{},
"PUT": struct{}{},
"DEL": struct{}{},
"CON": struct{}{},
"OPT": struct{}{},
"TRA": struct{}{},
"PAT": struct{}{},
}
var HttpMatchFunc MatchFunc = func(data []byte) bool {
if len(data) < int(HttpNeedBytesNum) {
return false
}
_, ok := httpHeadBytes[string(data[:3])]
return ok
}
// From https://github.com/hashicorp/yamux/blob/master/spec.md
var YamuxMatchFunc MatchFunc = func(data []byte) bool {
if len(data) < int(YamuxNeedBytesNum) {
return false
}
if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 {
return true
}
return false
}
這三個函數分別實現了區分HTTPS,HTTP以及go中特有的yamux(實際上這是一個庫,可以參考Go中的I/O多路復用)。
mux.go文件
先來看其中的struct,第一個是Mux第二個是listener,這里先來看一下較為簡單的listener。
listener結構體
type listener struct {
mux *Mux
priority int
needBytesNum uint32
matchFn MatchFunc
c chan net.Conn
mu sync.RWMutex
}
// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (net.Conn, error) {
...
}
// Close removes this listener from the parent mux and closes the channel.
func (ln *listener) Close() error {
...
}
func (ln *listener) Addr() net.Addr {
...
}
剛看到這個結構體我們可能很迷惑,不知道都是干啥的,而且網絡編程中一般listener這種東西要綁定在一個套接字上,但很明顯listener沒有,不過其唯一跟套接字相關的可能是其c字段,其是一個由net包中的Conn接口組成的chanel;然后mu字段就是讀寫鎖了,這個很簡單;然后mux字段則是上面提到的兩個結構體中的另一個結構體Mux的指針;接下來到了priority字段上,顧名思義,這個似乎跟優先級有關系,暫且存疑;needBytesNum則更有些蒙了,不過感覺其是跟讀取byte的數量有關系,最后是matchFn。
好,初步認識了這個結構體的結構后,我們看看其方法。三個方法的listener實現了net模塊中的Listener接口:
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
// Addr returns the listener's network address.
Addr() Addr
}
然后先來分析其Accept方法:
func (ln *listener) Accept() (net.Conn, error) {
conn, ok := <-ln.c
if !ok {
return nil, fmt.Errorf("network connection closed")
}
return conn, nil
}
該方法很簡單,就是從c這個由Conn組成的channel中,獲取Conn對象,好這里我們就明白了,這個listener和普通的不一樣,他很特別,普通的listener監聽的是套接字,而他監聽的是channel,另外,肯定有某個地方在不停的往c這個channel中放Conn。
接下來是Close方法:
func (ln *listener) Close() error {
if ok := ln.mux.release(ln); ok {
// Close done to signal to any RLock holders to release their lock.
close(ln.c)
}
return nil
}
我們暫且先把這個ln.mux.release(ln)放到一邊,因為還不知道這個東西干了啥,暫且只需關注close(ln.c),我們知道這個函數是用來關閉channel的,go推薦由發送端調用,但這里似乎listener是一個消費端,可以看一下如何優雅的關閉Go Channel,看來重點在於ln.mux.release(ln)這里,我們暫且存疑[1],留待下面解決。
最后是Addr方法:
func (ln *listener) Addr() net.Addr {
if ln.mux == nil {
return nil
}
ln.mux.mu.RLock()
defer ln.mux.mu.RUnlock()
if ln.mux.ln == nil {
return nil
}
return ln.mux.ln.Addr()
}
在這里,mu字段就用上了,加讀鎖,然后返回mux字段中的ln字段的Addr方法。也就是這句return ln.mux.ln.Addr()。
Mux結構體
字段以及相關函數
Mux結構體則相對來說復雜很多,先來看一下他的字段定義:
type Mux struct {
ln net.Listener
defaultLn *listener
// sorted by priority
lns []*listener
maxNeedBytesNum uint32
mu sync.RWMutex
}
好,第一個字段ln是一個Listener接口;然后defaultLn是一個listener的指針;lns則是由listener的指針組成的切片,根據注釋// sorted by priority,我們終於知道listener的priority字段是干啥的了;接下來是maxNeedBytesNum字段,好奇怪,比起listener的needBytesNum多了個“Max”,所以我們推測這個值取得是lns以及defaultLn字段中所有listener中needBytesNum值最大的;最后的mu字段我們就不說了。
需要注意的是:我們可能會發現Mux和listener存在相互引用,但在Go中我們倒也不用太擔心,因為Go采用“標記-回收”或者其變種的垃圾回收算法,感興趣可以參考Golang 垃圾回收剖析
在mux.go文件中定義了Mux的生成函數NewMux:
func NewMux(ln net.Listener) (mux *Mux) {
mux = &Mux{
ln: ln,
lns: make([]*listener, 0),
}
return
}
很簡單,需要注意的是ln字段存儲的一般不是listener這樣的非常規Listener,一般是TCPListener這樣具體的綁定了套接字的監聽器。
Mux方法
接下來看Mux結構體的方法,首先看Listen和copyLns
// priority
func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener {
// 1
ln := &listener{
c: make(chan net.Conn),
mux: mux,
priority: priority,
needBytesNum: needBytesNum,
matchFn: fn,
}
mux.mu.Lock()
defer mux.mu.Unlock()
// 2
if needBytesNum > mux.maxNeedBytesNum {
mux.maxNeedBytesNum = needBytesNum
}
// 3
newlns := append(mux.copyLns(), ln)
sort.Slice(newlns, func(i, j int) bool {
if newlns[i].priority == newlns[j].priority {
return newlns[i].needBytesNum < newlns[j].needBytesNum
}
return newlns[i].priority < newlns[j].priority
})
mux.lns = newlns
return ln
}
func (mux *Mux) copyLns() []*listener {
lns := make([]*listener, 0, len(mux.lns))
for _, l := range mux.lns {
lns = append(lns, l)
}
return lns
}
copyLns方法很簡單,就是跟名字的含義一樣,生成一個lns字段的副本並返回。
Listen基本做了三步:
- 生成一個
listener結構體實例,並獲取互斥鎖 - 根據情況更新
needBytesNum字段 - 將新生成的
listener實例按照優先級放入lns字段對應的slice中
接下來是ListenHttp和ListenHttps方法:
func (mux *Mux) ListenHttp(priority int) net.Listener {
return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc)
}
func (mux *Mux) ListenHttps(priority int) net.Listener {
return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc)
}
這兩個差不多,所以放到一起說,基本都是專門寫了一個方法讓我們能方便的創建處理Http或者Https的listener。
再來看DefaultListener方法:
func (mux *Mux) DefaultListener() net.Listener {
mux.mu.Lock()
defer mux.mu.Unlock()
if mux.defaultLn == nil {
mux.defaultLn = &listener{
c: make(chan net.Conn),
mux: mux,
}
}
return mux.defaultLn
}
這個方法很簡單,基本就是有則返回沒有則生成然后返回的套路。不過我們要注意defaultLn字段中的listener是不放入lns字段中的。
接下來是Server方法:
// Serve handles connections from ln and multiplexes then across registered listeners.
func (mux *Mux) Serve() error {
for {
// Wait for the next connection.
// If it returns a temporary error then simply retry.
// If it returns any other error then exit immediately.
conn, err := mux.ln.Accept()
if err, ok := err.(interface {
Temporary() bool
}); ok && err.Temporary() {
continue
}
if err != nil {
return err
}
go mux.handleConn(conn)
}
}
一般來說,當我們調用NewMux函數以后,接下來就會調用Server方法,該方法基本上就是阻塞監聽某個套接字,當有連接建立成功后立即另起一個goroutine調用handleConn方法;當連接建立失敗根據err是否含有Temporary方法,如果有則執行並忽略錯誤,沒有則返回錯誤。
現在我們看看handleConn方法干了些啥:
func (mux *Mux) handleConn(conn net.Conn) {
// 1
mux.mu.RLock()
maxNeedBytesNum := mux.maxNeedBytesNum
lns := mux.lns
defaultLn := mux.defaultLn
mux.mu.RUnlock()
// 2
sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum))
data := make([]byte, maxNeedBytesNum)
conn.SetReadDeadline(time.Now().Add(DefaultTimeout))
_, err := io.ReadFull(rd, data)
if err != nil {
conn.Close()
return
}
conn.SetReadDeadline(time.Time{})
// 3
for _, ln := range lns {
if match := ln.matchFn(data); match {
err = errors.PanicToError(func() {
ln.c <- sharedConn
})
if err != nil {
conn.Close()
}
return
}
}
// No match listeners
if defaultLn != nil {
err = errors.PanicToError(func() {
defaultLn.c <- sharedConn
})
if err != nil {
conn.Close()
}
return
}
// No listeners for this connection, close it.
conn.Close()
return
}
handleConn方法也不算復雜,大體可以分為三步:
- 獲取當前狀態
- 從
conn中讀取數據,注意:shareConn和rd存在單向關系,如果從rd中讀取數據的話,數據也會復制一份放到shareConn中,反過來就不成立了 - 讀取到的數據會被遍歷,最終選出
與matchFunc匹配的最高優先級的listener,並將shareConn放入該listener的c字段中,如果沒有匹配到則放到defaultLn中的c字段中,如果defaultLn是nil的話就不處理,直接關閉conn。
最后來到了release方法了:
func (mux *Mux) release(ln *listener) bool {
result := false
mux.mu.Lock()
defer mux.mu.Unlock()
lns := mux.copyLns()
for i, l := range lns {
if l == ln {
lns = append(lns[:i], lns[i+1:]...)
result = true
break
}
}
mux.lns = lns
return result
}
release方法意思很明確:把對應的listener從lns中移除,並把結果返回,整個過程有互斥鎖,我們回到存疑1,盡管有互斥鎖,但在這種情況下:當某個goroutine運行到handleConn已經執行到了第三階段的開始狀態(也就是還沒有找到匹配的listener)時,且Go運行在多核狀態下,當另一個goroutine運行完listener的Close方法時,這時就可能發生往一個已經關閉的channel中send數據,但請注意handleConn的第三步的這段代碼:
err = errors.PanicToError(func() { // 就是這里了
ln.c <- sharedConn
})
if err != nil {
conn.Close()
}
這個PanicToError是這樣的:
func PanicToError(fn func()) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic error: %v", r)
}
}()
fn()
return
}
基本上就是執行了recover然后將錯誤打印出來,結合下面的對err的判斷,就會將send失敗的conn關閉。
總結
Mux中包含了一個初始監聽器,基本上所有的事件(比如說新的連接建立,之所以叫事件是因為我實在想不出更精確的詞語了)都起源於此listener實現了net.Listener接口,可以作為二級監聽器使用(比如傳給net/http.Server結構體的Server方法進行處理)。Mux包含了一個由listener組成的有序slice,當有事件產生時就會遍歷這個slice找出合適的listener並將事件傳給他。
講到這里基本上是完事了。整個mux模塊還是比較簡單的,起碼是由一個個簡單的東西組合而成。那么一起來意淫一下整體流程吧。
假如我要實現這么一個網絡程序:
- 綁定監聽一個基於tcp的套接字
- 我們允許其應用層可支持多個(比如說支持http https這兩個吧,盡管http和https可以說是一個協議。。),不同的應用層協議對應不同的處理函數
就這么兩個很簡單的要求,不難吧。
那么我們一起來實現吧:
type HandleFunc func(c net.Conn) (n int, err error)
type MyServer struct {
l net.Listener
hFunc HandleFunc
}
func (h *MyServer) Server() (err error) {
for {
conn, err := h.l.Accept()
if err != nil {
return
}
go h.hFunc(conn)
}
}
func HandleHttp(c net.Conn)(n int, err error){
n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?"))
}
func HandleHttps(c net.Conn)(n int, err error){
n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?"))
}
func main() (err error){
ln, err := net.Listen("tcp", "0.0.0.0:12345")
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
muxer = mux.NewMux(ln)
var lHttp, lHttps net.Listener
lHttp = muxer.ListenHttp(1)
httpServer := *MyServer{lHttp, HandleHttp}
lHttps = muxer.ListenHttps(2)
httpsServer := *MyServer{lHttps, HandleHttps}
go httpServer.Server()
go httpsServer.Server()
err = muxer.Serve()
}
