轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com/archives/574
我們在上一篇文章中講解了 Go HTTP 標准庫的實現原理,這一次我找到了一個號稱比net/http快十倍的Go框架 fasthttp,這次我們再來看看它有哪些優秀的設計值得我們去挖掘。
一個典型的 HTTP 服務應該如圖所示:
基於HTTP構建的服務標准模型包括兩個端,客戶端(Client
)和服務端(Server
)。HTTP 請求從客戶端發出,服務端接受到請求后進行處理然后將響應返回給客戶端。所以http服務器的工作就在於如何接受來自客戶端的請求,並向客戶端返回響應。
這篇我們來講講 Server 端的實現。
實現原理
net/http 與 fasthttp 實現對比
我們在講 net/http
的時候講過,它的處理流程大概是這樣的:
- 注冊處理器到一個 hash 表中,可以通過鍵值路由匹配;
- 注冊完之后就是開啟循環監聽,每監聽到一個連接就會創建一個 Goroutine;
- 在創建好的 Goroutine 里面會循環的等待接收請求數據,然后根據請求的地址去處理器路由表中匹配對應的處理器,然后將請求交給處理器處理;
這樣做在連接數比較少的時候是沒什么問題的,但是在連接數非常多的時候,每個連接都會創建一個 Goroutine 就會給系統帶來一定的壓力。這也就造成了 net/http
在處理高並發時的瓶頸。
下面我們再看看 fasthttp 是如何做的:
- 啟動監聽;
- 循環監聽端口獲取連接;
- 獲取到連接之后首先會去 ready 隊列里獲取 workerChan,獲取不到就會去對象池獲取;
- 將監聽的連接傳入到 workerChan 的 channel 中;
- workerChan 有一個 Goroutine 一直循環獲取 channel 中的數據,獲取到之后就會對請求進行處理然后返回。
上面有提到 workerChan 其實就是一個連接處理對象,這個對象里面有一個 channel 用來傳遞連接;每個 workerChan 在后台都會有一個 Goroutine 循環獲取 channel 中的連接,然后進行處理。如果沒有設置最大同時連接處理數的話,默認是 256 * 1024
個。這樣可以在並發很高的時候還可以同時保證對外提供服務。
除此之外,在實現上還通過 sync.Pool 來大量的復用對象,減少內存分配,如:
workerChanPool 、ctxPool 、readerPool、writerPool 等等多大30多個 sync.Pool 。
除了復用對象,fasthttp 還會切片,通過 s = s[:0]
和 s = append(s[:0], b…)
來減少切片的再次創建。
fasthttp 由於需要和 string 打交道的地方很多,所以還從很多地方盡量的避免[]byte
到string轉換時帶來的內存分配和拷貝帶來的消耗 。
小結
綜上我們大致介紹了一下 fasthttp 提升性能的點:
- 控制異步 Goroutine 的同時處理數量,最大默認是
256 * 1024
個; - 使用 sync.Pool 來大量的復用對象和切片,減少內存分配;
- 盡量的避免
[]byte
到string轉換時帶來的內存分配和拷貝帶來的消耗 ;
源碼解析
我們以一個簡單的例子作為開始:
func main() {
if err := fasthttp.ListenAndServe(":8088", requestHandler); err != nil {
log.Fatalf("Error in ListenAndServe: %s", err)
}
}
func requestHandler(ctx *fasthttp.RequestCtx) {
fmt.Fprintf(ctx, "Hello, world!\n\n")
}
我們調用 ListenAndServe 函數會啟動服務監聽,等待任務進行處理。ListenAndServe 函數實際上會調用到 Server 的 ListenAndServe 方法,這里我們看一下 Server 結構體的字段:
上圖簡單的列舉了一些 Server 結構體的常見字段,包括:請求處理器、服務名、請求讀取超時時間、請求寫入超時時間、每個連接最大請求數等。除此之外還有很多其他參數,可以在各個維度上控制服務端的一些參數。
Server 的 ListenAndServe 方法會獲取 TCP 監聽,然后調用 Serve 方法執行服務端的邏輯處理。
Server 方法主要做了以下幾件事:
- 初始化並啟動 worker Pool;
- 接收請求 Connection;
- 將 Connection 交給 worker Pool 處理;
func (s *Server) Serve(ln net.Listener) error {
...
s.mu.Unlock()
// 初始化 worker Pool
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
connState: s.setState,
}
// 啟動 worker Pool
wp.Start()
// 循環處理 connection
for {
// 獲取 connection
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.Stop()
if err == io.EOF {
return nil
}
return err
}
s.setState(c, StateNew)
atomic.AddInt32(&s.open, 1)
// 處理 connection
if !wp.Serve(c) {
// 進入if 說明已到並發極限
...
}
c = nil
}
}
worker Pool
worker Pool 是用來處理所有請求 Connection 的,這里稍微看看 workerPool 結構體的字段:
- WorkerFunc: 用來匹配請求對應的 handler 並執行;
- MaxWorkersCount:最大同時處理的請求數;
- ready:空閑的 workerChan;
- workerChanPool:workerChan 的對象池,是一個 sync.Pool 類型的;
- workersCount:目前正在處理的請求數;
下面我們看一下 workerPool 的 Start 方法:
func (wp *workerPool) Start() {
if wp.stopCh != nil {
panic("BUG: workerPool already started")
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
// 設置 worker Pool 的創建函數
wp.workerChanPool.New = func() interface{} {
return &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
go func() {
var scratch []*workerChan
for {
// 沒隔一段時間會清理空閑超時的 workerChan
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
// 默認是 10 s
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
Start 方法里面主要是:
- 設置 workerChanPool 的創建函數;
- 啟動一個 Goroutine 定時清理 workerPool 中的 ready 中保存的空閑 workerChan,默認每 10s 啟動一次。
獲取連接
func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
for {
c, err := ln.Accept()
if err != nil {
if c != nil {
panic("BUG: net.Listener returned non-nil conn and non-nil error")
}
...
return nil, io.EOF
}
if c == nil {
panic("BUG: net.Listener returned (nil, nil)")
}
// 校驗每個ip對應的連接數
if s.MaxConnsPerIP > 0 {
pic := wrapPerIPConn(s, c)
if pic == nil {
if time.Since(*lastPerIPErrorTime) > time.Minute {
s.logger().Printf("The number of connections from %s exceeds MaxConnsPerIP=%d",
getConnIP4(c), s.MaxConnsPerIP)
*lastPerIPErrorTime = time.Now()
}
continue
}
c = pic
}
return c, nil
}
}
獲取連接其實沒什么好說的,和 net/http
庫一樣調用的 TCPListener 的 accept 方法獲取 TCP Connection。
處理連接
處理連接部分首先會獲取 workerChan ,workerChan 結構體里面包含了兩個字段:lastUseTime、channel:
type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
}
-
lastUseTime 標識最后一次被使用的時間;
-
ch 是用來傳遞 Connection 用的。
獲取到 Connection 之后會傳入到 workerChan 的 channel 中,每個對應的 workerChan 都有一個異步 Goroutine 在處理 channel 里面的 Connection。
獲取 workerChan
func (wp *workerPool) Serve(c net.Conn) bool {
// 獲取 workerChan
ch := wp.getCh()
if ch == nil {
return false
}
// 將 Connection 放入到 channel 中
ch.ch <- c
return true
}
Serve 方法主要是通過 getCh 方法獲取 workerChan ,然后將當前的 Connection 傳入到 workerChan 的 channel 中。
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
// 嘗試從空閑隊列里獲取 workerChan
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
// 獲取不到則從對象池中獲取
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
// 為新的 workerChan 開啟 goroutine
go func() {
// 處理 channel 中的數據
wp.workerFunc(ch)
// 處理完之后重新放回到對象池中
wp.workerChanPool.Put(vch)
}()
}
return ch
}
getCh 方法首先會去 ready 空閑隊列中獲取 workerChan,如果獲取不到則從對象池中獲取,從對象池中獲取的新的 workerChan 會啟動 Goroutine 用來處理 channel 中的數據。
處理連接
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
// 消費 channel 中的數據
for c = range ch.ch {
if c == nil {
break
}
// 讀取請求數據並響應返回
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
...
}
c = nil
// 將當前的 workerChan 放入的 ready 隊列中
if !wp.release(ch) {
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
這里會遍歷獲取 workerChan 的 channel 中的 Connection 然后執行 WorkerFunc 函數處理請求,處理完畢之后會將當前的 workerChan 重新放入到 ready 隊列中復用。
需要注意的是,這個循環會在獲取 Connection 為 nil 的時候跳出循環,這個 nil 是 workerPool 在異步調用 clean 方法檢查該 workerChan 空閑時間超長了就會往 channel 中傳入一個 nil。
這里設置的 WorkerFunc 函數是 Server 的 serveConn 方法,里面會獲取到請求的參數,然后根據請求調用到對應的 handler 進行請求處理,然后返回 response,由於 serveConn 方法比較長這里就不解析了,感興趣的同學自己看看。
總結
我們這里分析了 fasthttp 的實現原理,通過原理我們可以知道 fasthttp 和 net/http
在實現上面有什么差異,從而大致得出 fasthttp 快的原因,然后再從它的實現細節知道它在實現上是如何做到減少內存分配從而提高性能的。