接上文我們查看了bind和listen流程,直到了listen操作會在內核初始化一個epoll表,並將listen的描述符加入到epoll表中
如何保證epoll表初始化一次
前文我們看到pollDesc的init函數中調用了runtime的pollOpen函數完成的epoll創建和描述符加入,這里再貼一次代碼
func (pd *pollDesc) init(fd *FD) error { serverInit.Do(runtime_pollServerInit) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) if errno != 0 { if ctx != 0 { runtime_pollUnblock(ctx) runtime_pollClose(ctx) } return errnoErr(syscall.Errno(errno)) } pd.runtimeCtx = ctx return nil }
runtime_pollServerInit link的是runtime/netpoll.go中的poll_runtime_pollServerInit函數
由於serverInit是sync.Once類型,所以runtime_pollServerInit只被初始化一次,而epoll模型的初始化就是在該函數完成
func poll_runtime_pollServerInit() { netpollGenericInit() } func netpollGenericInit() { if atomic.Load(&netpollInited) == 0 { lock(&netpollInitLock) if netpollInited == 0 { netpollinit() atomic.Store(&netpollInited, 1) } unlock(&netpollInitLock) } }
netpollinit實現了不同模型的初始化,epoll的實現在runtime/netpoll_epoll.go中
func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd < 0 { epfd = epollcreate(1024) if epfd < 0 { println("runtime: epollcreate failed with", -epfd) throw("runtime: netpollinit failed") } closeonexec(epfd) } //... }
可以看到上述代碼里實現了epoll模型的初始化,所以對於一個M主線程只會初始化一張epoll表,所有要監聽的文件描述符都會放入這個表中。
跟隨accept看看goroutine掛起邏輯
當我們調用Listener的Accept時,Listener為接口類型,實際調用的為TCPListener的Accept函數
func (l *TCPListener) Accept() (Conn, error) { if !l.ok() { return nil, syscall.EINVAL } c, err := l.accept() if err != nil { return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return c, nil }
Accept內部調用了accept函數,該函數內部實際調用netFD的accept
func (ln *TCPListener) accept() (*TCPConn, error) { fd, err := ln.fd.accept() if err != nil { return nil, err } //... }
在net/fd_unix.go中實現了linux環境下accept的操作
func (fd *netFD) accept() (netfd *netFD, err error) { d, rsa, errcall, err := fd.pfd.Accept() if err != nil { if errcall != "" { err = wrapSyscallError(errcall, err) } return nil, err } if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { poll.CloseFunc(d) return nil, err } if err = netfd.init(); err != nil { netfd.Close() return nil, err } lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil }
上述函數內部調用的是net/fd_unix.go內部實現的Accept函數
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EAGAIN: if fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return -1, nil, errcall, err } }
上述函數就是tcp底層的函數了,accept(fd.Sysfd)監聽fd.Sysfd描述符,等待可讀事件到來,當可讀事件到來后,就可以認為來了一個新的連接,從而創建一個新的描述符給新的連接。
當accept出現錯誤時,需要判斷err類型,如果是EAGAIN說明當前沒有連接到來,就調用waitRead等待連接,ECONNABORTED說明連接還未accept就斷開了,可以忽略。
func (pd *pollDesc) waitRead(isFile bool) error { return pd.wait('r', isFile) }
進而調用pollDesc的wait操作
func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile) }
wait函數中判斷pd的runtime上下文是否正常,然后調用runtime包的poll_runtime_pollWait實現掛起等待
func poll_runtime_pollWait(pd *pollDesc, mode int) int { err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" { netpollarm(pd, mode) } for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } } return 0 }
poll_runtime_pollWait運行在內核M線程中,輪詢調用netpollblock,所以內核M線程一直在輪詢檢測netpollblock返回值,當其返回true時循環就可以退出,從而用戶態協程就可以繼續運行了。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } // set the gpp semaphore to WAIT for { old := *gpp if old == pdReady { *gpp = 0 return true } if old != 0 { throw("runtime: double wait") } if atomic.Casuintptr(gpp, 0, pdWait) { break } } if waitio || netpollcheckerr(pd, mode) == 0 { gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady }
netpollblock內部根據讀模式還是寫模式,獲取pollDesc成員變量的讀協程或者寫協程地址,然后判斷其狀態是否為pdReady,這里要詳細說一下,golang阻塞一個用戶態協程是要將其狀態設置為0(正在運行)或者pdWait(阻塞),這里為0,所以邏輯繼續往下走,之后做了一個原子操作將gpp設置為pdWait狀態,接着根據這個狀態,執行gopark函數,阻塞住用戶態協程。當內核想激活用戶協程時gopark會返回,然后該函數判斷gpp是否為pdReady,從而激活用戶態協程。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
gopark將用戶態協程放在等待隊列中,然后調用mcall觸發匯編代碼。之后會檢測調用unlockf函數,如果unlockf返回false則說明可以解鎖用戶態協程了。另外官網的注釋說unlockf不要訪問用戶態協程的stack,因為G’s stack可能會在gopark和unlockf之間被移除。到目前為止,我們理解了用戶態協程掛起原理。
epoll就緒后如何激活用戶態協程
想知道如果激活掛起的用戶態協程,就要先看看epoll_wait判斷就緒事件后怎么處理的。runtime/netpoll_epoll.go中實現了epollwait邏輯
func netpoll(delay int64) gList { if epfd == -1 { return gList{} } //... var events [128]epollevent retry: n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { return gList{} } goto retry } var toRun gList for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } //... var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.everr = false if ev.events == _EPOLLERR { pd.everr = true } netpollready(&toRun, pd, mode) } } return toRun }
可以看出netpoll函數調用epollwait返回就緒事件列表,然后遍歷就緒的事件列表,從事件類型中取出pollDesc數據,調用netpollready將曾經掛起的協程放入gList中,然后返回該列表
func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } }
netpollready調用了unblock函數,並且將協程寫入glist中
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := *gpp if old == pdReady { return nil } if old == 0 && !ioready { // Only set READY for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil } var new uintptr if ioready { new = pdReady } if atomic.Casuintptr(gpp, old, new) { if old == pdReady || old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } }
netpollunblock函數修改pd所在協程的狀態為0,表示可運行狀態,所以netpoll函數內部做了這樣幾件事,根據就緒事件列表找到對應的協程,將掛起的協程狀態設置為0表示可運行,然后將該協程放入glist中。在runtime/proc.go中findrunnable會判斷是否初始化epoll,如果初始化了則調用netpoll,從而獲取glist,然后traceGoUnpark激活掛起的協程
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() //... if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } //... }
以上就是golang網絡調度和協程控制的原理,golang通過epoll和用戶態協程調度結合的方式,實現了高並發的網絡處理,這種思路是值得日后我們設計產品借鑒的。
感謝關注我的公眾號