詳解Go語言I/O多路復用netpoller模型


轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com

本文使用的go的源碼15.7

可以從 Go 源碼目錄結構和對應代碼文件了解 Go 在不同平台下的網絡 I/O 模式的實現。比如,在 Linux 系統下基於 epoll,freeBSD 系統下基於 kqueue,以及 Windows 系統下基於 iocp。

因為我們的代碼都是部署在Linux上的,所以本文以epoll封裝實現為例子來講解Go語言中I/O多路復用的源碼實現。

介紹

I/O多路復用

所謂 I/O 多路復用指的就是 select/epoll 這一系列的多路選擇器:支持單一線程同時監聽多個文件描述符(I/O 事件),阻塞等待,並在其中某個文件描述符可讀寫時收到通知。以防很多同學對select或epoll不那么熟悉,所以下面先來講講這兩個選擇器。

首先我們先說一下什么是文件描述符(File descriptor),根據它的英文首字母也簡稱FD,它是一個用於表述指向文件的引用的抽象化概念。它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。

select

int select(int nfds,
            fd_set *restrict readfds,
            fd_set *restrict writefds,
            fd_set *restrict errorfds,
            struct timeval *restrict timeout);

writefds、readfds、和exceptfds是三個文件描述符集合。select會遍歷每個集合的前nfds個描述符,分別找到可以讀取、可以寫入、發生錯誤的描述符,統稱為就緒的描述符。

timeout參數表示調用select時的阻塞時長。如果所有文件描述符都未就緒,就阻塞調用進程,直到某個描述符就緒,或者阻塞超過設置的 timeout 后,返回。如果timeout參數設為 NULL,會無限阻塞直到某個描述符就緒;如果timeout參數設為 0,會立即返回,不阻塞。

當select函數返回后,可以通過遍歷fdset,來找到就緒的描述符。

multiplexing model

select的缺點也列舉一下:

  1. select最大的缺陷就是單個進程所打開的FD是有一定限制的,它由FD_SETSIZE設置,默認值是1024;
  2. 每次調用 select,都需要把 fd 集合從用戶態拷貝到內核態,這個開銷在 fd 很多時會很大;
  3. 每次 kernel 都需要線性掃描整個 fd_set,所以隨着監控的描述符 fd 數量增長,其 I/O 性能會線性下降;

epoll

epoll是selec的增強版本,避免了“性能開銷大”和“文件描述符數量少”兩個缺點。

為方便理解后續的內容,先看一下epoll的用法:

int listenfd = socket(AF_INET, SOCK_STREAM, 0);   
bind(listenfd, ...)
listen(listenfd, ...)

int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //將所有需要監聽的fd添加到epfd中

while(1){
    int n = epoll_wait(...)
    for(接收到數據的socket){
        //處理
    }
}

先用epoll_create創建一個epoll對象實例epfd,同時返回一個引用該實例的文件描述符,返回的文件描述符僅僅指向對應的epoll實例,並不表示真實的磁盤文件節點。

epoll實例內部存儲:

  • 監聽列表:所有要監聽的文件描述符,使用紅黑樹;
  • 就緒列表:所有就緒的文件描述符,使用鏈表;

再通過epoll_ctl將需要監視的fd添加到epfd中,同時為fd設置一個回調函數,並監聽事件event,並添加到監聽列表中。當有事件發生時,會調用回調函數,並將fd添加到epoll實例的就緒隊列上。

最后調用epoll_wait阻塞監聽 epoll 實例上所有的fd的 I/O 事件。當就緒列表中已有數據,那么epoll_wait直接返回,解決了select每次都需要輪詢一遍的問題。

epoll的優點:

epoll的監聽列表使用紅黑樹存儲,epoll_ctl 函數添加進來的 fd 都會被放在紅黑樹的某個節點內,而紅黑樹本身插入和刪除性能比較穩定,時間復雜度 O(logN),並且可以存儲大量的的fd,避免了只能存儲1024個fd的限制;

epoll_ctl 中為每個文件描述符指定了回調函數,並在就緒時將其加入到就緒列表,因此不需要像select一樣遍歷檢測每個文件描述符,只需要判斷就緒列表是否為空即可;

解析

netpoll本質上是對 I/O 多路復用技術的封裝,所以自然也是和epoll一樣脫離不了下面幾步:

  1. netpoll創建及其初始化;
  2. 向netpoll中加入待監控的任務;
  3. 從netpoll獲取觸發的事件;

在go中對epoll提供的三個函數進行了封裝:

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList

netpollinit函數負責初始化netpoll;

netpollopen負責監聽文件描述符上的事件;

netpoll會阻塞等待返回一組已經准備就緒的 Goroutine;

下面是Go語言中編寫的一個TCP server:

func main() {
    listen, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error: ", err)
        return
    } 
    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept error: ", err)
            break
        } 
        // 創建一個goroutine來負責處理讀寫任務
        go HandleConn(conn)
    }
} 

下面我們跟着這個TCP server的源碼一起看看是在哪里使用了netpoll來完成epoll的調用。

net.Listen

這個TCP server中會調用net.Listen創建一個socket同時返回與之對應的fd,該fd用來初始化listener的netFD(go層面封裝的網絡文件描述符),接着調用 netFD的listenStream方法完成對 socket 的 bind&listen和netFD的初始化。

調用過程如下:

listen

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	// 創建一個socket
	s, err := sysSocket(family, sotype, proto)
	if err != nil {
		return nil, err
	}
	...
	// 創建fd
	if fd, err = newFD(s, family, sotype, net); err != nil {
		poll.CloseFunc(s)
		return nil, err
	} 
	if laddr != nil && raddr == nil {
		switch sotype {
		case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
			// 調用 netFD的listenStream方法完成對 socket 的 bind&listen和netFD的初始化
			if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		case syscall.SOCK_DGRAM:
			...
		}
	}
	...
	return fd, nil
}

func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
	ret := &netFD{
		pfd: poll.FD{
			Sysfd:         sysfd,
			IsStream:      sotype == syscall.SOCK_STREAM,
			ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
		},
		family: family,
		sotype: sotype,
		net:    net,
	}
	return ret, nil
}

sysSocket方法會發起一個系統調用創建一個socket,newFD會創建一個netFD,然后調用netFD的listenStream方法進行bind&listen操作,並對netFD進行init。

netFD

netFD是一個文件描述符的封裝,netFD中包含一個FD數據結構,FD中包含了Sysfd 和pollDesc兩個重要的數據結構,Sysfd是sysSocket返回的socket系統文件描述符,pollDesc用於監控文件描述符的可讀或者可寫。

我們繼續看listenStream:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
	...
	// 完成綁定操作
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
	// 進行監聽操作
	if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
	// 初始化fd
	if err = fd.init(); err != nil {
		return err
	}
	lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
	fd.setAddr(fd.addrFunc()(lsa), nil)
	return nil
}

listenStream方法會調用Bind方法完成fd的綁定操作,然后調用listenFunc進行監聽,接着調用fd的init方法,完成FD、pollDesc初始化。

func (pd *pollDesc) init(fd *FD) error {
	// 調用到runtime.poll_runtime_pollServerInit
	serverInit.Do(runtime_pollServerInit)
	// 調用到runtime.poll_runtime_pollOpen
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	...
	return nil
}

runtime_pollServerInit用Once封裝保證只能被調用一次,這個函數在Linux平台上會創建一個epoll文件描述符實例;

poll_runtime_pollOpen調用了netpollopen會將fd注冊到 epoll實例中,並返回一個pollDesc;

netpollinit初始化

func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}

netpollGenericInit會調用平台上特定實現的netpollinit,在Linux中會調用到netpoll_epoll.go的netpollinit方法:

var (
	epfd int32 = -1 // epoll descriptor 
)

func netpollinit() {
	// 創建一個新的 epoll 文件描述符
	epfd = epollcreate1(_EPOLL_CLOEXEC)
	...
	// 創建一個用於通信的管道
	r, w, errno := nonblockingPipe()
	...
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	// 將讀取數據的文件描述符加入監聽
	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
	...
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

調用epollcreate1方法會創建一個epoll文件描述符實例,需要注意的是epfd是一個全局的屬性。然后創建一個用於通信的管道,調用epollctl將讀取數據的文件描述符加入監聽。

netpollopen加入事件監聽

下面再看看poll_runtime_pollOpen方法:

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	pd := pollcache.alloc()
	lock(&pd.lock)
	if pd.wg != 0 && pd.wg != pdReady {
		throw("runtime: blocked write on free polldesc")
	}
	if pd.rg != 0 && pd.rg != pdReady {
		throw("runtime: blocked read on free polldesc")
	}
	pd.fd = fd
	pd.closing = false
	pd.everr = false
	pd.rseq++
	pd.rg = 0
	pd.rd = 0
	pd.wseq++
	pd.wg = 0
	pd.wd = 0
	pd.self = pd
	unlock(&pd.lock)

	var errno int32
	errno = netpollopen(fd, pd)
	return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

poll_runtime_pollOpen方法會通過pollcache.alloc初始化總大小約為 4KB的pollDesc結構體。然后重置pd的屬性,調用netpollopen向epoll實例epfd加入新的輪詢事件監聽文件描述符的可讀和可寫狀態。

下面我們再看看pollCache是如何初始化pollDesc的。

type pollCache struct {
	lock  mutex
	first *pollDesc 
}

const pollBlockSize = 4 * 1024

func (c *pollCache) alloc() *pollDesc {
	lock(&c.lock)
	// 初始化首節點
	if c.first == nil {
		const pdSize = unsafe.Sizeof(pollDesc{})
		n := pollBlockSize / pdSize
		if n == 0 {
			n = 1
		} 
		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        // 初始化pollDesc鏈表
		for i := uintptr(0); i < n; i++ {
			pd := (*pollDesc)(add(mem, i*pdSize))
			pd.link = c.first
			c.first = pd
		}
	}
	pd := c.first
	c.first = pd.link
	lockInit(&pd.lock, lockRankPollDesc)
	unlock(&c.lock)
	return pd
}

pollCache的鏈表頭如果為空,那么初始化首節點,首節點是一個pollDesc的鏈表頭,每次調用該結構體都會返回鏈表頭還沒有被使用的pollDesc。

pollCache

到這里就完成了net.Listen的分析,下面我們看看listen.Accept。

Listener.Accept

Listener.Accept方法最終會調用到netFD的accept方法中:

Accept

func (fd *netFD) accept() (netfd *netFD, err error) {
	// 調用netfd.FD的Accept接受新的 socket 連接,返回 socket 的 fd
	d, rsa, errcall, err := fd.pfd.Accept()
	...
	// 構造一個新的netfd
	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
	// 調用 netFD 的 init 方法完成初始化
	if err = netfd.init(); err != nil {
		netfd.Close()
		return nil, err
	}
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}

這個方法首先會調用到FD的Accept接受新的 socket 連接,並返回新的socket對應的fd,然后調用newFD構造一個新的netfd,並通過init 方法完成初始化。

init方法上面我們已經看過了,下面我們來看看Accept方法:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
	...
	for {
		// 使用 linux 系統調用 accept 接收新連接,創建對應的 socket
		s, rsa, errcall, err := accept(fd.Sysfd)
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
		case syscall.EINTR:
			continue
		case syscall.EAGAIN:
			if fd.pd.pollable() {
				// 如果當前沒有發生期待的 I/O 事件,那么 waitRead 會通過 park goroutine 讓邏輯 block 在這里
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		case syscall.ECONNABORTED: 
			continue
		}
		return -1, nil, errcall, err
	}
}

FD.Accept方法會使用 linux 系統調用 accept 接收新連接,創建對應的 socket,如果沒有可讀的消息,waitRead會被阻塞。這些被park住的goroutine會在goroutine的調度中調用runtime.netpoll被喚醒。

pollWait事件等待

pollDesc.waitRead實際上是調用了runtime.poll_runtime_pollWait

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	...
    // 進入 netpollblock 並且判斷是否有期待的 I/O 事件發生
	for !netpollblock(pd, int32(mode), false) {
		...
	}
	return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
	// 這個 for 循環是為了等待 io ready 或者 io wait
	for {
		old := *gpp
		// gpp == pdReady 表示此時已有期待的 I/O 事件發生,
		// 可以直接返回 unblock 當前 goroutine 並執行響應的 I/O 操作
		if old == pdReady {
			*gpp = 0
			return true
		}
		if old != 0 {
			throw("runtime: double wait")
		}
		// 如果沒有期待的 I/O 事件發生,則通過原子操作把 gpp 的值置為 pdWait 並退出 for 循環
		if atomic.Casuintptr(gpp, 0, pdWait) {
			break
		}
	}
	if waitio || netpollcheckerr(pd, mode) == 0 {
		// 讓出當前線程,將 Goroutine 轉換到休眠狀態並等待運行時的喚醒
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	// be careful to not lose concurrent pdReady notification
	old := atomic.Xchguintptr(gpp, 0)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

poll_runtime_pollWait會用for循環調用netpollblock函數判斷是否有期待的 I/O 事件發生,直到netpollblock返回true表示io ready才會走出循環。

netpollblock方法會判斷當前的狀態是不是處於pdReady,如果是那么直接返回true;如果不是,那么將gpp通過CAS設置為pdWait並退出 for 循環。通過gopark 把當前 goroutine 給 park 住,直到對應的 fd 上發生可讀/可寫或者其他I/O 事件為止。

這些被park住的goroutine會在goroutine的調度中調用runtime.netpoll被喚醒。

netpoll輪詢等待

runtime.netpoll的核心邏輯是: 根據入參 delay設置調用 epoll_wait 的 timeout 值,調用 epoll_wait 從 epoll 的 eventpoll.rdllist雙向列表中獲取IO就緒的fd列表,遍歷epoll_wait 返回的fd列表, 根據調用epoll_ctl注冊fd時封裝的上下文信息組裝可運行的 goroutine 並返回。

執行完 netpoll 之后,會返回一個就緒 fd 列表對應的 goroutine 列表,接下來將就緒的 goroutine 加入到調度隊列中,等待調度運行。

func netpoll(delay int64) gList {
	if epfd == -1 {
		return gList{}
	}
	var waitms int32
    // 因為傳入delay單位是納秒,下面將納秒轉換成毫秒
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		// An arbitrary cap on how long to wait for a timer.
		// 1e9 ms == ~11.5 days.
		waitms = 1e9
	}
	var events [128]epollevent
retry:
	// 等待文件描述符轉換成可讀或者可寫
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	// 返回負值,那么重新調用epollwait進行等待
	if n < 0 {
		...
		goto retry
	}
	var toRun gList
	// 意味着被監控的文件描述符出現了待處理的事件
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		} 
		...
		// 判斷發生的事件類型,讀類型或者寫類型
		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			// 取出保存在 epollevent 里的 pollDesc
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.everr = false
			if ev.events == _EPOLLERR {
				pd.everr = true
			}
			// 調用 netpollready,傳入就緒 fd 的 pollDesc
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

netpoll會調用epollwait獲取就緒的 fd 列表,對應的epoll函數是epoll_wait。toRun是一個 g 的鏈表,存儲要恢復的 goroutines,最后返回給調用方。如果epollwait返回的n大於零,那么表示被監控的文件描述符出現了待處理的事件,那么需要調用for循環進行處理。循環里面會根據時間類型設置mode,然后拿出對應的pollDesc,調用netpollready方法。

下面我們再看一下netpollready:

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	// 獲取對應的g的指針
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	// 將對應的g加入到toRun列表中
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg
	// 根據傳入的mode判斷事件類型
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		// 取出 gpp 存儲的 g
		old := *gpp
		if old == pdReady {
			return nil
		}
		if old == 0 && !ioready {
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady
		}
		// cas 將讀或者寫信號量轉換成 pdReady
		if atomic.Casuintptr(gpp, old, new) {
			if old == pdWait {
				old = 0
			}
			// 返回對應的 g指針
			return (*g)(unsafe.Pointer(old))
		}
	}
}

講完了runtime.netpoll的源碼有個需要注意的地方,調用runtime.netpoll的地方有兩處:

  • 在調度器中執行runtime.schedule(),該方法中會執行runtime.findrunable(),在runtime.findrunable()中調用了runtime.netpoll獲取待執行的goroutine;
  • Go runtime 在程序啟動的時候會創建一個獨立的sysmon監控線程,sysmon 每 20us~10ms 運行一次,每次運行會檢查距離上一次執行netpoll是否超過10ms,如果是則會調用一次runtime.netpoll

這些入口的調用感興趣的可以自己去看看。

總結

本文從I/O多路復用開始講解select以及epoll,然后再回到go語言中去看它是如何實現多路復用這樣的結構的。通過追蹤源碼可以發現,其實go也是根據epoll來封裝自己的函數:

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

通過這三個函數來實現對epoll的創建實例、注冊、事件等待操作。

對於I/O多路復用不是很了解的同學也可以借此機會多多的去學習一下網絡編程方面的知識,擴充一下知識面。

Reference

https://www.infoq.cn/article/boeavgkiqmvcj8qjnbxk

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-netpoller/#66-網絡輪詢器

https://zhuanlan.zhihu.com/p/64138532

https://imageslr.github.io/2020/02/27/select-poll-epoll.html

http://singlecool.com/2020/12/13/golang-netpoll/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM