golang版本1.12.9;操作系統:readhat 7.4
golang的底層使用epoll來實現IO復用。netPoll通過pollDesc結構體將文件描述符與底層進行了綁定。netpoll實現了用戶層面的與底層網絡IO相關的goroutine的阻塞/非阻塞管理。
對netpoll的介紹按照這篇文章的思路按照tcp建鏈中的listen/accept/read/write/close動作詳解過程。
下面以TCP為例完整解析TCP的建鏈/斷鏈以及讀寫過程
listen流程:
ListenTCP --> listenTCP --> internetSocket --> socket --> listenStream
unix的listen函數用於將一個socket轉換為監聽socket。golang中同時結合了創建socket的步驟。
// src/net/tcpsock.go
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) { switch network {
//支持tcp協議為”tcp4“和“tcp6”,當使用"tcp"時可以通過地址格式進行判斷 case "tcp", "tcp4", "tcp6": default: return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: laddr.opAddr(), Err: UnknownNetworkError(network)} }
//對laddr進行初始化(非nil),用於在socket函數中進入監聽處理流程(見下文) if laddr == nil { laddr = &TCPAddr{} } sl := &sysListener{network: network, address: laddr.String()} ln, err := sl.listenTCP(context.Background(), laddr) if err != nil { return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: laddr.opAddr(), Err: err} } return ln, nil }
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) { //此處mode為"listen",可以表示一個tcp服務端;此外還有一個mode為"dial",表示連接的發起端,可以表示一個tcp客戶端
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control) if err != nil { return nil, err } return &TCPListener{fd}, nil }
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { //此處判斷mode為"dial"的場景。如果"dial" mode下的遠端IP為通配符,則將遠端IP轉換為本地IP(127.0.0.1或::1),即默認連接本地server。
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() { raddr = raddr.toLocal(net) }
//favoriteAddrFamily函數用於判斷地址類型為IPv4還是IPv6,主要用於net為"tcp"時的場景。 family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode) return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn) }
//看下這個用於判斷地址類型的函數
func favoriteAddrFamily(network string, laddr, raddr sockaddr, mode string) (family int, ipv6only bool) { //可以看到,如果直接寫明"tcp4"或"tcp6"時會直接返回對應的地址類型
switch network[len(network)-1] { case '4': return syscall.AF_INET, false case '6': return syscall.AF_INET6, true }
//下面用於處理network為"tcp的場景",可以看出直接指定"tcp4"或"tcp6"時可以提高一些執行效率,但可能影響擴展性
//如果使用監聽模式,且本地沒有指定監聽地址,需要通過對系統本地地址進行測試來判定使用的IP類型 if mode == "listen" && (laddr == nil || laddr.isWildcard()) {
//supportsIPv4map函數用於測試系統是否支持ipv4MappedIPv6功能。如果系統支持該功能,或者不支持IPv4,則使用IPv6 if supportsIPv4map() || !supportsIPv4() { return syscall.AF_INET6, false }
//如果沒有指定監聽地址,同時不支持ipv4MappedIPv6功能,且支持IPv4,則使用IPv4 if laddr == nil { return syscall.AF_INET, false }
//通過判斷IP地址(長度和格式)來判斷所使用的IP類型。實現函數定義在src/net/tcpsock_posix.go中 return laddr.family(), false }
//如果未指定本端和遠端地址或明確指定了本端和遠端需要的地址類型為IPv4,則使用IPv4。可用於處理"listen"和"dial" mode。
//"listen" mode下已經處理了laddr == nil的情況,如果laddr非nil,調用family()函數判斷IP類型即可獲得IP類型 if (laddr == nil || laddr.family() == syscall.AF_INET) && (raddr == nil || raddr.family() == syscall.AF_INET) { return syscall.AF_INET, false }
//其他情況使用IPv6,如僅支持IPv6的場景 return syscall.AF_INET6, false }
從下面注釋中可以看出socket用於返回一個網絡描述符。listen場景下,需要使用socket->setsocketopt->bind->listen這幾個系統調用來創建一個監聽socket
1 // socket returns a network file descriptor that is ready for 2 // asynchronous I/O using the network poller. --此處應該是同步IO吧 3 func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { 4 //創建一個socket 5 s, err := sysSocket(family, sotype, proto) 6 if err != nil { 7 return nil, err 8 } 9 //設置socket選項,處理IPv6並設置允許廣播 10 if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil { 11 poll.CloseFunc(s) 12 return nil, err 13 } 14 //初始化一個newFD,下面講解 15 if fd, err = newFD(s, family, sotype, net); err != nil { 16 poll.CloseFunc(s) 17 return nil, err 18 } 19 20 // 用於處理TCP或UDP服務端。服務端需要確保本地監聽地址非nil(但可以為""),否則會被認為是一個客戶端socket。ListenTCP中已經對laddr賦初值 21 if laddr != nil && raddr == nil { 22 switch sotype { 23 //處理流協議,TCP 24 case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: 25 if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { 26 fd.Close() 27 return nil, err 28 } 29 return fd, nil 30 //處理數據報協議,UDP 31 case syscall.SOCK_DGRAM: 32 if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
33 fd.Close() 34 return nil, err 35 } 36 return fd, nil 37 } 38 } 39 //處理非監聽socket場景,即客戶端發起連接 40 if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil { 41 fd.Close() 42 return nil, err 43 } 44 return fd, nil 45 }
-----------------------------------------------------------------------------------------------------------------------------------------
golang的系統調用
以上述第5行代碼中創建一個socket為例,看golang如何通過系統調用產生一個socket。sysSocket中過的調用鏈如下:
sysSocket->socketFunc->syscall.Socket->socket。socket函數內容如下,實際通過調用RawSyscall來運行系統調用,其系統調用ID為SYS_SOCKET,值為41
func socket(domain int, typ int, proto int) (fd int, err error) { r0, _, e1 := RawSyscall(SYS_SOCKET, uintptr(domain), uintptr(typ), uintptr(proto)) fd = int(r0) if e1 != 0 { err = errnoErr(e1) } return }
golang中的系統調用定義在src/syscall目錄下,redhat系統對應的文件為zsysnum_linux_amd64.go。
操作系統中的系統調用ID定義在/usr/include/asm/unistd.h中(內容如下),其中32位系統使用<asm/unistd_32.h>,64位系統使用<asm/unistd_64.h>。
如golang中的SYS_SOCKET系統調用ID為41,對應redhat系統的定義為#define __NR_socket 41
//asm/unistd.h
#ifndef _ASM_X86_UNISTD_H #define _ASM_X86_UNISTD_H /* x32 syscall flag bit */ #define __X32_SYSCALL_BIT 0x40000000 # ifdef __i386__ # include <asm/unistd_32.h> # elif defined(__ILP32__) # include <asm/unistd_x32.h> # else # include <asm/unistd_64.h> # endif #endif /* _ASM_X86_UNISTD_H */
golang可以通過如下4個函數來執行系統調用,后綴帶"6"的表示有6個入參,不帶"6"的表示有4個入參。(redhat系統的實現定義在src/syscall/asm_linux_amd64.s中)
golang的系統調用可以簡單分為:阻塞系統調用,非阻塞系統調用和wrapped系統調用。wrapped系統調用就是自己封裝的,如下文的epoll相關的系統調用就屬於這一類。
func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)
Syscall與RawSyscall的有如下區別(以下內容來自 golang syscall原理),通常系統調用使用Syscall,防止阻塞同一個P中的其他goroutine的執行
- Syscall在進入系統調用的時候,調用了runtime·entersyscall(SB)函數,在結束系統調用的時候調用了runtime·exitsyscall(SB)。做到進入和退出syscall的時候通知runtime。
- 這兩個函數runtime·entersyscall和runtime·exitsyscall的實現在proc.go文件里面。其實在runtime·entersyscall函數里面,通知系統調用時候,是會將g的M的P解綁,P可以去繼續獲取M執行其余的g,這樣提升效率。 所以如果用戶代碼使用了 RawSyscall 來做一些阻塞的系統調用,是有可能阻塞其它的 g 的。RawSyscall 只是為了在執行那些一定不會阻塞的系統調用時,能節省兩次對 runtime 的函數調用消耗
RawSyscall的源碼如下,第一個參數表示系統調用ID,其余為該系統調用所需的參數。整體上比較簡單,3~8行將參數傳到寄存器,10行執行系統調用,生成socket,后續就是用於處理錯誤碼和返回值
1 // func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2, err uintptr) 2 TEXT ·RawSyscall(SB),NOSPLIT,$0-56 3 MOVQ a1+8(FP), DI 4 MOVQ a2+16(FP), SI 5 MOVQ a3+24(FP), DX 6 MOVQ $0, R10 7 MOVQ $0, R8 8 MOVQ $0, R9 9 MOVQ trap+0(FP), AX // syscall entry 10 SYSCALL 11 CMPQ AX, $0xfffffffffffff001 12 JLS ok1 13 MOVQ $-1, r1+32(FP) 14 MOVQ $0, r2+40(FP) 15 NEGQ AX 16 MOVQ AX, err+48(FP) 17 RET 18 ok1: 19 MOVQ AX, r1+32(FP) 20 MOVQ DX, r2+40(FP) 21 MOVQ $0, err+48(FP) 22 RET
socket代碼就是調用了RawSyscall函數來執行SYS_SOCKET類型的系統調用,由於socket系統調用不會阻塞,因此可以使用RawSyscall。
-----------------------------------------------------------------------------------------------------------------------
繼續listen流程,在創建完socket並設置socket選項后,進入流處理環節。需要注意的是,此處用到了newFD函數初始化的變量fd。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { var err error
//此處設置了監聽socket所使用的選項,允許地址socket重用,實際中並不推薦使用socket地址重用。 if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil { return err } var lsa syscall.Sockaddr
//構建一個實現了syscall.Sockaddr結構的結構體,如syscall.SockaddrInet4/syscall.SockaddrInet4/syscall.SockaddrUnix if lsa, err = laddr.sockaddr(fd.family); err != nil { return err }
//ctrlFn在bind前調用,可以用於設置socket選項。此處為nil。用法可以參考net/listen_test.go中的TestListenConfigControl函數 if ctrlFn != nil { c, err := newRawConn(fd) if err != nil { return err } if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil { return err } }
// 為socket綁定地址 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) }
//使用系統調用SYS_LISTEN,第二個參數表示監聽隊列大小,來自"/proc/sys/net/core/somaxconn"或syscall.SOMAXCONN(參見src/net/sock_linux.go)
//該函數等同於系統調用:
// #include<sys/socket.h> // int listen(int sockfd, int backlog) if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) }
//fd.init中會初始化epoll,並注冊文件描述符。用於在accept時有連接建立時上報連接事件通知。用法參見epoll+socket實現 socket並發 linux服務器 if err = fd.init(); err != nil { return err }
//獲取socket信息,此處會使用系統調用getsockname來獲得配置的socket信息,並設置到fd.laddr中。監聽socket的fd.raddr為nil lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil }
listenStream失敗后需要關閉fd.pd和fd.Sysfd
func (fd *netFD) Close() error {
//與GC相關的設置 runtime.SetFinalizer(fd, nil) return fd.pfd.Close() }
func (fd *FD) Close() error {
//將鎖狀態設置為mutexClosed,即mu.state的第一個bit位,后續將不能使用該鎖進行lock操作,僅能unlock減少鎖的引用計數。
//讀寫對應底層全雙工鏈路,讀寫操作不互斥。具體參見這篇文章 if !fd.fdmu.increfAndClose() { return errClosing(fd.isFile) } // Unblock any I/O. Once it all unblocks and returns, // so that it cannot be referring to fd.sysfd anymore, // the final decref will close fd.sysfd. This should happen // fairly quickly, since all the I/O is non-blocking, and any // attempts to block in the pollDesc will return errClosing(fd.isFile).
//調用runtime_pollUnblock函數unblock pd中的讀寫goroutine。 fd.pd.evict() //當文件鎖的引用計數為0時才能做fd.destroy清理動作 err := fd.decref() // Wait until the descriptor is closed. If this was the only // reference, it is already closed. Only wait if the file has // not been set to blocking mode, as otherwise any current I/O // may be blocking, and that would block the Close. // No need for an atomic read of isBlocking, increfAndClose means // we have exclusive access to fd.
//此處用於在epoll場景下,阻塞等待釋放文件鎖,最終退出Close函數。destroy函數會釋放鎖。注意此處的鎖和fd.fdum不是一個鎖 if fd.isBlocking == 0 { runtime_Semacquire(&fd.csema) } return err }
func (fd *FD) decref() error {
//此處減小文件鎖的引用計數並判斷引用計數是否為0,只有引用計數為0且文件鎖關閉時才能進行destory工作 if fd.fdmu.decref() {
//除poll.decref外,在poll.readUnlock和poll.writeUnlock也會調用poll.desory()函數,主要用於退出讀或寫操作時。
return fd.destroy() } return nil }
func (fd *FD) destroy() error { //去注冊epoll事件 fd.pd.close()
//關閉底層socket err := CloseFunc(fd.Sysfd) fd.Sysfd = -1
//runtime_Semacquire和runtime_Semrelease對應P/V操作,。可以參見golang中的鎖源碼實現:Mutex
//此處表示該文件鎖的引用計數為0且已經關閉,釋放文件鎖,退出Close函數(其他地方可以進行如readUnlock/writeUnlock操作) runtime_Semrelease(&fd.csema) return err }
accept會增加監聽socket的引用計數
-----------------------------------------------------------------------------------------------------------------------
listen中netpoll的處理
FD的結構體如下,最主要的成員為fdmu和pd,前者為socket創建的文件描述符,后者為與epoll相關的底層結構體。golang中每一個連接都被抽象為一個FD。
//src/internal/poll/fd_unix.go
type FD struct { // Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex // System file descriptor. Immutable until Close. Sysfd int // I/O poller. pd pollDesc // Writev cache. iovecs *[]syscall.Iovec // Semaphore signaled when file is closed. csema uint32 // Non-zero if this file has been set to blocking mode. isBlocking uint32 // Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. IsStream bool // Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool // Whether this is a file rather than a network socket. isFile bool }
//src/runtime/netpoll.go
type pollDesc struct { link *pollDesc // 可以看作是pollcache鏈表的鏈表指針 // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), // that will blow up when GC starts moving objects. lock mutex // protects the following fields fd uintptr closing bool user uint32 // user settable cookie rseq uintptr // protects from stale read timers rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wseq uintptr // protects from stale write timers wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline }
初始化FD
func (fd *netFD) init() error {
//當文件描述符為網絡時,需要將第二個參數置為true,表示使用poll機制 return fd.pfd.Init(fd.net, true) }
// Init initializes the FD. The Sysfd field should already be set. // This can be called multiple times on a single FD. // The net argument is a network name from the net package (e.g., "tcp"), // or "file". // Set pollable to true if fd should be managed by runtime netpoll. func (fd *FD) Init(net string, pollable bool) error { // We don't actually care about the various network types. if net == "file" { fd.isFile = true }
//如果不使用poll機制,則fd置為blocking mode並返回。當net=="file"時不會使用poll if !pollable { fd.isBlocking = 1 return nil }
//初始化pollDesc。入參校驗后執行poll相關的操作,實際執行如下兩步系統調用,即創建一個epoll句柄,並注冊fd到epoll句柄epfd中
//int epoll_create(int size);
//int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); err := fd.pd.init(fd) if err != nil { // If we could not initialize the runtime poller, // assume we are using blocking mode. fd.isBlocking = 1 } return err }
//src/internal/poll/fd_poll_runtime.go文件中定義了poll相關的操作,如下:
// func runtime_pollServerInit()
// func runtime_pollOpen(fd uintptr) (uintptr, int)
// func runtime_pollClose(ctx uintptr)
// func runtime_pollWait(ctx uintptr, mode int) int
// func runtime_pollWaitCanceled(ctx uintptr, mode int) int
// func runtime_pollReset(ctx uintptr, mode int) int
// func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
// func runtime_pollUnblock(ctx uintptr)
// func runtime_isPollServerDescriptor(fd uintptr) bool
//以上函數的具體實現在src/runtime/netpoll.go中
func (pd *pollDesc) init(fd *FD) error {
//調用runtime_pollServerInit創建一個epoll句柄,實際執行的就是epoll_create。此處使用sync.once來確保只創建一個epoll句柄 serverInit.Do(runtime_pollServerInit)
//調用runtime_pollOpen將文件描述符fd.Sysfd注冊到epoll句柄中,實際執行的就是epoll_ctl。此處返回的ctx為pollDesc結構體(src/runtime/netpoll.go) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) if errno != 0 {
//處理失敗需要做如下動作:
// 1:unblock阻塞的goroutine
// 2:回收pd
// 3:close fd,文件描述符在init函數返回失敗后close if ctx != 0 {
//在注冊epoll事件失敗后需要unblock goroutine,繼續執行清理工作 runtime_pollUnblock(ctx)
//此處刪除注冊的事件並回收pd節點 runtime_pollClose(ctx) } return syscall.Errno(errno) }
//保存pollDesc pd.runtimeCtx = ctx return nil }
runtime_pollServerInit的實現如下。從注釋上可以看到該函數使用編譯器鏈接到internal/poll.runtime_pollServerInit,即上述使用的函數。
它調用netpollinit創建epoll句柄,並調用atomic.Store將netpollInited設置為1,表示已經初始化epoll文件句柄。
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() { netpollinit() atomic.Store(&netpollInited, 1) }
netpollinit函數內容如下,首先調用epollcreate1創建epfd,如果失敗則調用epollcreate創建。前者對應系統調用int epoll_create(int size);后者對應系統調用int epoll_create1(int flag);具體可以參見epoll函數深入講解
func netpollinit() {
//此處創建的epfd為全局變量 epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd >= 0 { return } epfd = epollcreate(1024) if epfd >= 0 {
//調用fcntl給epfd設置FD_CLOEXEC,用於防止文件描述符泄露。參見使用FD_CLOEXEC實現close-on-exec,關閉子進程無用文件描述符 closeonexec(epfd) return } println("runtime: epollcreate failed with", -epfd) throw("runtime: netpollinit failed") }
epoll相關的實現位於src/runtime/sys_linux_amd64.s中(注意系統類型),可以看到其運行的系統調用為$SYS_epoll_create1,其值定義在同文件中。可以看到其實際值為291,在系統的/usr/include/asm/unistd_64.h中可以看到291的系統調用對應的內容為 #define __NR_epoll_create1 291
#define SYS_epoll_create 213 #define SYS_epoll_ctl 233 #define SYS_epoll_pwait 281 #define SYS_epoll_create1 291
TEXT runtime·epollcreate1(SB),NOSPLIT,$0 MOVL flags+0(FP), DI MOVL $SYS_epoll_create1, AX SYSCALL MOVL AX, ret+8(FP) RET
在創建完epoll之后,注冊epoll事件
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
//獲取一個pollDesc。pollcache為全局pd鏈表,可以為listen和accept過程提供pd。 pd := pollcache.alloc() lock(&pd.lock)
//pd全局鏈表中的節點都應該是可用或初始狀態,0為初始化狀態 if pd.wg != 0 && pd.wg != pdReady { throw("runtime: blocked write on free polldesc") } if pd.rg != 0 && pd.rg != pdReady { throw("runtime: blocked read on free polldesc") }
//pollDesc中保存了文件描述符 pd.fd = fd
//此處將pd的狀態初始化為false,表示該pd可用 pd.closing = false pd.rseq++ pd.rg = 0 pd.rd = 0 pd.wseq++ pd.wg = 0 pd.wd = 0 unlock(&pd.lock) var errno int32
//注冊文件描述符到epoll句柄中 errno = netpollopen(fd, pd) return pd, int(errno) }
poll節點的申請流程如下
//poll其實存儲在一個鏈表中,c.first指向下一個未使用的節點。在c.first非空時,可以直接返回該節點
// +-----+ +-----+ +-----+ +-----+ +-----+
// | pd3 +-->+ pd2 +--->+ pd1 +-->+ pd0 +--->+ nil |
// +-----+ +--^--+ +-----+ +-----+ +-----+
// |
// +---------+ |
// | c.first +--+
// +---------+
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock) if c.first == nil { const pdSize = unsafe.Sizeof(pollDesc{})
//首次會創建1024個節點 n := pollBlockSize / pdSize if n == 0 { n = 1 } // Must be in non-GC memory because can be referenced // only from epoll/kqueue internals.
//調用malloc申請內存 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
//通過指針移動來獲取節點並將節點串成鏈表。需要注意的是,末節點的link指向nil for i := uintptr(0); i < n; i++ { pd := (*pollDesc)(add(mem, i*pdSize)) pd.link = c.first c.first = pd } }
//此處有2個作用:如果c.first非空,則返回c.first指向的節點,並將c.first指向下一個可用節點;初始化鏈表后,c.first指向的是首節點,將其指向首節點之后
//的節點並返回首節點使用。如果鏈表節點全部被使用,會重新創建1024個節點
//當刪除注冊的event事件時,會回收該節點. pd := c.first c.first = pd.link unlock(&c.lock) return pd }
netpollopen將文件描述符注冊到epoll中,此處可以看到pollDesc的作用是作為epollctl的參數成員,即用戶數據部分。runtime/netpoll_epoll.go的netpoll函數中會處理此處注冊的事件和數據
func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent
//設置觸發事件 對應描述符上有可讀數據|對應描述符上有可寫數據|描述符被掛起|設置為邊緣觸發模式(僅在狀態變更時上報一次事件)
//epoll的事件可以參考epoll文檔
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
//構造epoll的用戶數據 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
//傳給epollctl的最后一個參數ev有2個數據,events和data,對應系統調用函數epoll_ctl的最后一個入參
// struct epoll_event {
// __uint32_t events; /* Epoll events */
// epoll_data_t data; /* User data variable */
// }; return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
runtime_pollUnblock的實現函數為poll_runtime_pollUnblock。獲取pd讀/寫阻塞的goroutine並將其狀態切換為runnable,poll_runtime_pollUnblock函數一般在關閉連接時使用。
pollDesc結構體中的rg和wg比較難理解,它們與netpoll相關,將底層緩存區的讀寫情況反映為當前讀寫對應的goroutine的狀態。當讀緩存區沒有數據時,會導致rg阻塞(非pdReady),此時調用netpollunblock返回的為讀操作所在的goroutine;而當執行write操作時,如果緩存區沒有空間,此時會導致wg阻塞,此時調用netpollunblock返回的為寫操作所在的goroutine。在非阻塞時,rg/wg表示當前讀/寫goroutine狀態,pdReady表示可以進行讀/寫操作,pdWait表示當前goroutine將會被park(調用gopark)住。
注意:用戶讀寫操作可以使用同一個goroutine。
func poll_runtime_pollUnblock(pd *pollDesc) { lock(&pd.lock) if pd.closing { throw("runtime: unblock on closing polldesc") }
//將pd狀態置為closing,poll_runtime_pollClose會停止並回收pd pd.closing = true pd.rseq++ pd.wseq++ var rg, wg *g atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
//獲取讀寫對應的goroutine。因此此處ioready設置為false,表示非底層IO的操作,底層epoll上報事件后,會通過runtime.netpollready調用
//netpollunblock函數,此時netpollunblock的第三個參數會變為true,用於將對應事件的goroutine變為非阻塞,處理epoll的讀寫事件 rg = netpollunblock(pd, 'r', false) wg = netpollunblock(pd, 'w', false)
//pd.rt.f和pd.wt.f都是定時器超時后執行的函數,如果這些函數非空,則清除定時器並置為初始值nil(后續pd需要回收)。
//定時器用於設置讀寫連接的deadline時間點,參見本文末的內容 if pd.rt.f != nil { deltimer(&pd.rt) pd.rt.f = nil } if pd.wt.f != nil { deltimer(&pd.wt) pd.wt.f = nil } unlock(&pd.lock)
//此處才是真正unblock阻塞的goroutine,netpollgoready會調用goready函數將阻塞的goroutine變為runnable狀態,繼續執行。
//此處的unblock操作對應調用netpollblock函數gopark的goroutine,如等待Accept,等待Read等。當Accept有連接到達或Read有數據讀取
//時,這時需要unblock對應的goroutine繼續處理(當然也包括處理錯誤場景) if rg != nil { netpollgoready(rg, 3) } if wg != nil { netpollgoready(wg, 3) } }
netpollunblock等待返回讀/寫操作block的goroutine,如果讀/寫狀態為pdReady(即非阻塞)或初始化狀態則返回一個空指針,表示該goroutine沒有被阻塞,可直接使用;反之返回一個阻塞的goroutine,golang調用netpollready函數可將其變為runnable。ioready表示是否由底層發起的調用,如果是則需要置為true。只有通過底層epoll事件通知的場景下才會置為true。
ps:這個函數的名字有點奇怪,它並不能主動unblock goroutine
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg }
//使用for循環用於執行atomic.Casuintptr原子操作。一個pd對於一條連接,而一條連接可能被多個goroutine操作。 for { old := *gpp
//如果gpp為pdReady,則對應的goroutine為unblock狀態,返回即可 if old == pdReady { return nil }
//此處用於初始狀態時的場景,此時並沒有調用netpollblock阻塞goroutine,直接返回即可 if old == 0 && !ioready { // Only set READY for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil } var new uintptr if ioready { new = pdReady }
//atomic.Casuintptr的實現在runtime/internal/atomic/asm_${plantform}.s中,如下處理邏輯為
// if(*val == *old){
// *val = new;
// return 1;
// } else {
// return 0;
// }
//該函數的實現了自旋鎖,for循環執行該原子操作。這里的原子循環應該是多goroutine操作同一個pd場景下等待一個相對穩定的狀態,因為按照本
//函數代碼邏輯來看,*gpp一定等於*old
if atomic.Casuintptr(gpp, old, new) {
//old == pdReady的判斷應該不會被執行,如果old為pdReady,上面代碼已經直接返回了
if old == pdReady || old == pdWait { old = 0 }
//返回阻塞在pd.rg/pd.wg上的goroutine地址 return (*g)(unsafe.Pointer(old)) } } }
runtime_pollClose實際調用的函數為poll_runtime_pollClose,用於刪除注冊的事件並回收pd節點
func poll_runtime_pollClose(pd *pollDesc) { if !pd.closing { throw("runtime: close polldesc w/o unblock") }
//執行本函數前需要調用netpollunblock將goroutine變為非阻塞狀態,以便回收pd節點 if pd.wg != 0 && pd.wg != pdReady { throw("runtime: blocked write on closing polldesc") } if pd.rg != 0 && pd.rg != pdReady { throw("runtime: blocked read on closing polldesc") }
//調用_EPOLL_CTL_DEL刪除注冊的epoll事件 netpollclose(pd.fd)
//回收fd節點 pollcache.free(pd) }
golang中使用epoll的方式比較巧妙,也比較奇怪。從上面流程可以看出,創建epoll和注冊epoll事件時,通過對API層層調用可以看到其運行了系統調用runtime.netpollinit和runtime.netpollopen,但沒有直接用到runtime.epollwait,對Accept,Read等的阻塞是通過poll_runtime_pollWait->netpollblock->gopark阻塞goroutine來實現的,即通過gopark阻塞對應的協程。
runtime.epollwait是在runtime.netpoll中調用的,而runtime.netpoll的是在單獨的線程中運行的。
func netpoll(block bool) gList { if epfd == -1 { return gList{} } waitms := int32(-1) if !block { waitms = 0 } var events [128]epollevent retry:
//這里運行系統調用阻塞等待epfd上發生的事件 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } goto retry } var toRun gList
//epoll可能一次性上報多個事件 for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32
//底層可讀事件,其他事件(如_EPOLLHUP)同時涉及到讀寫,因此讀寫的goroutine都需要通知 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' }
//底層可讀事件 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) //這里才是底層通知讀寫事件來unblock(Accept/Read等)協程的地方。netpollready會返回pd對應的讀寫goroutine鏈表(runtime.gList),
//最終在函數退出后返回給runtime.findrunnable函數調度。此處golang runtime會將podllDesc.rg/wg設置為pdready netpollready(&toRun, pd, mode) } } if block && toRun.empty() { goto retry } return toRun }
這也是為什么使用用戶API執行的協程需要使用pdReady和pdWait來表示協程狀態的原因,因為其無法直接獲得epoll_wait的事件信息。
-----------------------------------------------------------------------------------------------------------------------
accept流程
上面基本已經分析了epoll的所有底層流程,后續的就比較簡單了。
可以看到Accept使用了listen階段生成的netFD(TCPListener),接收監聽socket的TCP連接。accept返回一個TCP連接,就可以在該TCP上進行讀寫操作,連接類型為TCPConn。每accept一個連接會創建一個新的goroutine,並調用internal/poll.runtime_pollWait來等待讀事件
type TCPConn struct { conn }
type TCPListener struct { fd *netFD }
func (l *TCPListener) Accept() (Conn, error) {
//如果監聽socket已經釋放,則無法繼續執行accept,返回錯誤 if !l.ok() { return nil, syscall.EINVAL } c, err := l.accept() if err != nil { return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return c, nil }
ln.fd.accept中會返回建立的TCP連接,后續可以在該連接上執行讀寫操作
func (ln *TCPListener) accept() (*TCPConn, error) { fd, err := ln.fd.accept() if err != nil { return nil, err }
//封裝並返回建立好的tcp連接。newTCPConn同時會設置TCP_NODELAY選項來禁止Nagle算法 return newTCPConn(fd), nil }
func (fd *netFD) accept() (netfd *netFD, err error) {
//核心處理函數就是fd.pfd.Accept,返回tcp連接的文件描述符。具體見下文 d, rsa, errcall, err := fd.pfd.Accept() if err != nil { if errcall != "" { err = wrapSyscallError(errcall, err) } return nil, err } //封裝返回的tcp連接參數,netfd后續會被newTCPConn封裝。ps:此處的返回值判斷應該無用,newFD不會返回非nil的錯誤碼 if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { poll.CloseFunc(d) return nil, err }
//初始化一個pollDesc並注冊epoll事件通知,與listen不同,此處用於注冊用戶連接上的IO讀寫事件 if err = netfd.init(); err != nil {
//ps:這個地方應該close accept的連接,提了個issue,官方已經更新 fd.Close() return nil, err }
//設置netfd中表示本/對端變量的地址 lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil }
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
//注意此處的fd.readLock(),該函數會增加fd.fdmu的引用計數。此處的fd即監聽socket,如果監聽socket關閉將無法進行readLock操作,
//會直接返回錯誤,即無法創建新的連接。
//在accept的連接上進行讀操作會增加該連接的引用計數 if err := fd.readLock(); err != nil { return -1, nil, "", err }
//此處減少fd.fdmu的引用計數。accept建立后的連接並不受監聽socket控制,即使close監聽socket,已有的連接會不會被關閉。
//此處的readUnlock還有一個作用,在accept失敗后,會在引用計數為0且fd.fdmu狀態為mutexClosed時會調用poll.destory()函數close該連接。
//從代碼層面看,只能通過poll.Close(net/fd_unix.go)將網絡描述符的fd.fdmu設置為mutexClose,該函數為對外API.
//如果fd.fdmu未關閉,則此處僅僅減少引用計數。 defer fd.readUnlock() //初始化監聽socket的pd,因為監聽socket只有讀操作,僅初始化pd.rg。
//初始化的原因是當前goroutine沒有被阻塞,清除pd上的標記,進入poll_runtime_pollWait等待epoll事件喚醒goroutine,
//防止因為殘留標記導致虛假喚醒。
//需要注意,prepare類函數,如prepareRead/prepareWrite會在其prepare->runtime_pollReset->poll_runtime_pollReset->netpollcheckerr中
//對deadline(參見下文)時間進行檢驗,如果deadline的時間早於當前時間會導致read/write/accept等操作無法阻塞,直接返回timeout error
// func netpollcheckerr(pd *pollDesc, mode int32) int {
// if pd.closing {
// return 1 // errClosing
// }
// if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
// return 2 // errTimeout
// }
// return 0
// } if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for {
//調用系統函數返回accept的連接 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err {
//如果沒有連接到來且使用了epoll,則調用runtime_pollWaitd等待新的連接。 case syscall.EAGAIN:
if fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } }
//此處用於處理連接中客戶端斷開情況,建鏈過程中客戶端發送RST報文 case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return -1, nil, errcall, err } }
如上所述,runtime_pollWaitd並沒有運行epollwait系統調用,它通過判斷並循環等待goroutine變為pdReady。
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
//判斷連接是否已經超時或關閉 err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris and AIX use level-triggered IO. if GOOS == "solaris" || GOOS == "aix" { netpollarm(pd, mode) } for !netpollblock(pd, int32(mode), false) {
//在超時和關閉情況下無需等待,返回錯誤 err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return 0 }
netpollblock與netpollunblock對應,前者調用gopark函數阻塞goroutine,后者結合goread函數unpark goroutine。netpollblock的返回值用於判斷處理的goroutine是否為pdReady。從代碼實現來看,netpollblock的目的是park一個非pdReady的goroutine,而非直接pack一個goroutine。park一個pdReady的goroutine是不合理的,有可能該goroutine正在進行讀寫操作。
netpollblock首先將pd中對應mode(讀/寫)的goroutine狀態設置為pdWait,然后park該goroutine,用法與pdWait的定義一致
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } //設置gpp狀態為pdWait for { old := *gpp if old == pdReady { *gpp = 0 return true } if old != 0 { throw("runtime: double wait") } if atomic.Casuintptr(gpp, 0, pdWait) { break } } // need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
//waitio為true可用於等待ioReady if waitio || netpollcheckerr(pd, mode) == 0 {
//此處調用gopark阻塞goroutine。gopark返回可能是goroutine變為非阻塞,也可能由於其他原因(如close,timeout等)返回 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } // be careful to not lose concurrent READY notification old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady }
read流程
有了上面的基礎,read和write就非常簡單了。
func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } n, err := c.fd.Read(b) if err != nil && err != io.EOF { err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} }
//用戶需要處理返回錯誤的情況,如關閉連接 return n, err }
func (fd *netFD) Read(p []byte) (n int, err error) { n, err = fd.pfd.Read(p)
//與GC相關 runtime.KeepAlive(fd) return n, wrapSyscallError("read", err) }
可以看到read和accept的代碼邏輯基本一致
func (fd *FD) Read(p []byte) (int, error) {
//增加fd的引用計數 if err := fd.readLock(); err != nil { return 0, err }
//與accept類似,此處通常僅用於減少fd的引用計數,在read失敗后需要手動close連接 defer fd.readUnlock() if len(p) == 0 { // If the caller wanted a zero byte read, return immediately // without trying (but after acquiring the readLock). // Otherwise syscall.Read returns 0, nil which looks like // io.EOF. // TODO(bradfitz): make it wait for readability? (Issue 15735) return 0, nil } if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } for {
//如果讀取到數據,則直接返回 n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0
//如果暫時沒有數據,且使用epoll,則阻塞等待epoll的讀事件通知 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } // On MacOS we can see EINTR here if the user // pressed ^Z. See issue #22838. if runtime.GOOS == "darwin" && err == syscall.EINTR { continue } } err = fd.eofError(n, err) return n, err } }
write流程
write與read類似,在遇到IO阻塞時都需要調用runtime_pollWait等待epoll事件。不同點在於,read在有數據時會一次性讀完,而write則需要判斷底層是否有足夠的空間來寫入數據
func (c *conn) Write(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } n, err := c.fd.Write(b) if err != nil { err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} } return n, err }
func (fd *netFD) Write(p []byte) (nn int, err error) { nn, err = fd.pfd.Write(p) runtime.KeepAlive(fd) return nn, wrapSyscallError("write", err) }
func (fd *FD) Write(p []byte) (int, error) { if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return 0, err } var nn int
//循環寫入數據,發送數據的大小受發送緩存區限制,如果發送數據過大,則需要分多次發送 for { max := len(p)
//從maxRW的注釋中可以看到Darwin和FreeBSD不允許一次性讀寫超過2G的數據。此處表示當寫數據超過2G時,僅寫入前2G數據
//read函數中沒有此限制,原因是在網絡上讀取數據時,socket讀緩存區大小遠遠小於2G。 if fd.IsStream && max-nn > maxRW { max = nn + maxRW }
//返回發送成功的字節數 n, err := syscall.Write(fd.Sysfd, p[nn:max]) if n > 0 { nn += n }
//只有發送失敗或所有數據發送成功才算write結束 if nn == len(p) { return nn, err }
//遇到寫緩存區滿且使用epoll時,等待epoll上報緩存區有空間事件 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return nn, err } if n == 0 { return nn, io.ErrUnexpectedEOF } } }
設置連接的deadline
可以通過如下參數設置不同的連接終止時間,底層都調用了poll.setDeadlineImpl函數。設置t為0表示永不超時。具體用法可以參見net.Conn(net/net.go)接口注釋。
func (fd *FD) SetDeadline(t time.Time) error func (fd *FD) SetReadDeadline(t time.Time) error func (fd *FD) SetWriteDeadline(t time.Time) error
以上函數調用setDeadlineImpl實現
func setDeadlineImpl(fd *FD, t time.Time, mode int) error { var d int64
//如果設置了連接deadline時間,計算到deadline的時間差值。此處主要做一個預處理 if !t.IsZero() { d = int64(time.Until(t))
//這里表示deadline時間點為當前時間,則設置為-1 if d == 0 { d = -1 // don't confuse deadline right now with no deadline } } if err := fd.incref(); err != nil { return err } defer fd.decref() if fd.pd.runtimeCtx == 0 { return ErrNoDeadline }
//此處調用函數對定時器進行處理 runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode) return nil }
poll_runtime_pollSetDeadline中當到達deadline后執行如下函數,netpolldeadlineimpl實際執行的就是在deadline到期后運行netpollunblock+netpollgoready將阻塞的goroutine變為非阻塞,這會導致返回timeout io錯誤
func netpollDeadline(arg interface{}, seq uintptr) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
}
func netpollReadDeadline(arg interface{}, seq uintptr) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
}
func netpollWriteDeadline(arg interface{}, seq uintptr) {
netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
}
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { lock(&pd.lock) if pd.closing { unlock(&pd.lock) return } rd0, wd0 := pd.rd, pd.wd combo0 := rd0 > 0 && rd0 == wd0 if d > 0 {
//獲取deadline時間 d += nanotime()
//從注釋看,這種情況表示deadline時間小於等於當前時間。將deadline時間設置為64bit的最大值 if d <= 0 { // If the user has a deadline in the future, but the delay calculation // overflows, then set the deadline to the maximum possible value.
d = 1<<63 - 1 } }
//按照不同mode設置讀寫對應的deadline時間 if mode == 'r' || mode == 'r'+'w' { pd.rd = d } if mode == 'w' || mode == 'r'+'w' { pd.wd = d }
//combo用於表示僅設置了讀deadline還是同時設置了讀寫deadline combo := pd.rd > 0 && pd.rd == pd.wd
//讀場景下deadline時間點執行的函數 rtf := netpollReadDeadline
//讀場景下deadline時間點執行的函數 if combo { rtf = netpollDeadline }
//如果沒有設置讀deadline時間點運行的函數,則使用默認的函數 if pd.rt.f == nil { if pd.rd > 0 {
//設置deadline時間點,以及到時后運行的函數,函數參數等 pd.rt.f = rtf pd.rt.when = pd.rd // Copy current seq into the timer arg. // Timer func will check the seq against current descriptor seq, // if they differ the descriptor was reused or timers were reset. pd.rt.arg = pd pd.rt.seq = pd.rseq
//添加時間並啟動定時任務 addtimer(&pd.rt) }
//此處用於處理僅讀deadline的情況 } else if pd.rd != rd0 || combo != combo0 { pd.rseq++ // invalidate current timers if pd.rd > 0 {
//由於使用了自定義的deadline處理函數,此處調用modtimer重新賦值並啟動定時任務 modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq) } else {
//此處表示入參t為0的情況,即永不超時,刪除定時器和定時器函數 deltimer(&pd.rt) pd.rt.f = nil } }
//此處處理寫deadline相關的情況,與讀類似 if pd.wt.f == nil { if pd.wd > 0 && !combo { pd.wt.f = netpollWriteDeadline pd.wt.when = pd.wd pd.wt.arg = pd pd.wt.seq = pd.wseq addtimer(&pd.wt) } } else if pd.wd != wd0 || combo != combo0 { pd.wseq++ // invalidate current timers if pd.wd > 0 && !combo { modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd, pd.wseq) } else { deltimer(&pd.wt) pd.wt.f = nil } } // If we set the new deadline in the past, unblock currently pending IO if any. var rg, wg *g
//如果deadline時間點早於當前時間,則unblock pd的所有IO,返回timeout IO錯誤 if pd.rd < 0 || pd.wd < 0 { atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if pd.rd < 0 { rg = netpollunblock(pd, 'r', false) } if pd.wd < 0 { wg = netpollunblock(pd, 'w', false) } } unlock(&pd.lock) if rg != nil { netpollgoready(rg, 3) } if wg != nil { netpollgoready(wg, 3) } }
setDeadLine主要是防止服務端阻塞等待導致的大量冗余連接(長連接),參見Go net/http 超時機制完全手冊
TIPS:
- epoll的用法可以參見這里
參考: