談談golang的netpoll原理(一)


今天談談golang源碼netpoll部分實現的細節和協程阻塞調度原理

epoll原理

epoll是linux環境下i/o多路復用的模型,結合下圖簡單說明epoll工作原理

1.png
上圖說明了epoll生成描epoll表的基本流程,生成socket用來綁定和監聽新的連接,將該socket放入epoll內核表,然后調用wait等待就緒事件。
2.png
當epoll wait返回就緒事件時,判斷是否是新的連接,如果是新的連接則將描述符加入epoll表,監聽讀寫事件。如果不是新的連接,說明已建立的連接上有讀或寫就緒事件,這樣我們根據EPOLLOUT或者EPOLLIN進行寫或者讀操作,上圖是echo server的基本原理,實際生產中監聽EPOLLIN還是EPOLLOUT根據實際情況而定。以上是單線程下epoll工作原理。

golang 網絡層如何封裝的epoll

golang 網絡層封裝epoll核心文件在系統文件src/runtime/netpoll.go, 這個文件中調用了不同平台封裝的多路復用api,linux環境下epoll封裝的文件在src/runtime/netpoll_epoll.go中,windows環境下多路復用模型實現在src/runtime/netpoll_windows.go。golang的思想意在將epoll操作放在runtime包里,而runtime是負責協程調度的功能模塊,程序啟動后runtime運行時是在單獨的線程里,個人認為是MPG模型中M模型,epoll模型管理放在這個單獨M中調度,M其實是運行在內核態的,在這個內核態線程不斷輪詢檢測就緒事件,將讀寫就緒事件拋出,從而觸發用戶態協程讀寫調度。而我們常用的read,write,accept等操作其實是在用戶態操作的,也就是MPG模型中的G,舉個例子當read阻塞時,將該協程掛起,當epoll讀就緒事件觸發后查找阻塞的協程列表,將該協程激活,用戶態G激活后繼續讀,這樣在用戶態操作是阻塞的,在內核態其實一直是輪詢的,這就是golang將epoll和協程調度結合的原理。

golang 如何實現協程和描述符綁定

golang 在internal/poll/fd_windows.go和internal/poll/fd_unix.go中實現了基本的描述符結構

type netFD struct {
	pfd poll.FD

	// immutable until Close
	family      int
	sotype      int
	isConnected bool // handshake completed or use of association with peer
	net         string
	laddr       Addr
	raddr       Addr
}

  netFD中pfd結構如下

type FD struct {
	// Lock sysfd and serialize access to Read and Write methods.
	fdmu fdMutex

	// System file descriptor. Immutable until Close.
	Sysfd syscall.Handle

	// Read operation.
	rop operation
	// Write operation.
	wop operation

	// I/O poller.
	pd pollDesc

	// Used to implement pread/pwrite.
	l sync.Mutex

	// For console I/O.
	lastbits       []byte   // first few bytes of the last incomplete rune in last write
	readuint16     []uint16 // buffer to hold uint16s obtained with ReadConsole
	readbyte       []byte   // buffer to hold decoding of readuint16 from utf16 to utf8
	readbyteOffset int      // readbyte[readOffset:] is yet to be consumed with file.Read

	// Semaphore signaled when file is closed.
	csema uint32

	skipSyncNotif bool

	// Whether this is a streaming descriptor, as opposed to a
	// packet-based descriptor like a UDP socket.
	IsStream bool

	// Whether a zero byte read indicates EOF. This is false for a
	// message based socket connection.
	ZeroReadIsEOF bool

	// Whether this is a file rather than a network socket.
	isFile bool

	// The kind of this file.
	kind fileKind
}

  FD是用戶態基本的描述符結構,內部幾個變量通過注釋可以讀懂,挑幾個難理解的
fdmu 控制讀寫互斥訪問的鎖,因為可能幾個協程並發讀寫
Sysfd 系統返回的描述符,不會更改除非系統關閉回收
rop 為讀操作,這個其實是根據不同系統網絡模型封裝的統一類型,比如epoll,iocp等都封裝為統一的operation,根據不同的系統調用不同的模型
wop 為寫操作封裝的類型
pd 這個是最重要的結構,內部封裝了協程等基本信息,這個變量會和內核epoll線程通信,從而實現epoll通知和控制用戶態協程的效果。
下面我們着重看看pollDesc結構

type pollDesc struct {
	runtimeCtx uintptr
}

  pollDesc內部存儲了一個unintptr的變量,uintptr為四字節大小的變量,可以存儲指針。runtimeCtx顧名思義,為運行時上下文,其初始化代碼如下

func (pd *pollDesc) init(fd *FD) error {
	serverInit.Do(runtime_pollServerInit)
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	if errno != 0 {
		if ctx != 0 {
			runtime_pollUnblock(ctx)
			runtime_pollClose(ctx)
		}
		return errnoErr(syscall.Errno(errno))
	}
	pd.runtimeCtx = ctx
	return nil
}

  runtime_pollOpen實際link的是runtime包下的poll_runtime_pollOpen函數,具體實現在runtime/netpoll.go

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	pd := pollcache.alloc()
	lock(&pd.lock)
	if pd.wg != 0 && pd.wg != pdReady {
		throw("runtime: blocked write on free polldesc")
	}
	if pd.rg != 0 && pd.rg != pdReady {
		throw("runtime: blocked read on free polldesc")
	}
	pd.fd = fd
	pd.closing = false
	pd.everr = false
	pd.rseq++
	pd.rg = 0
	pd.rd = 0
	pd.wseq++
	pd.wg = 0
	pd.wd = 0
	unlock(&pd.lock)

	var errno int32
	errno = netpollopen(fd, pd)
	return pd, int(errno)
}

  可以看出通過pollcache.alloc返回*pollDesc類型的變量pd,並且用pd初始化了netpollopen,這里我們稍作停留,談談pollcache

func (c *pollCache) alloc() *pollDesc {
	lock(&c.lock)
	if c.first == nil {
		const pdSize = unsafe.Sizeof(pollDesc{})
		n := pollBlockSize / pdSize
		if n == 0 {
			n = 1
		}
		// Must be in non-GC memory because can be referenced
		// only from epoll/kqueue internals.
		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
		for i := uintptr(0); i < n; i++ {
			pd := (*pollDesc)(add(mem, i*pdSize))
			pd.link = c.first
			c.first = pd
		}
	}
	pd := c.first
	c.first = pd.link
	unlock(&c.lock)
	return pd
}

  alloc函數做了這樣的操作,如果鏈表頭為空則初始化pdSize個pollDesc節點,並pop出頭部,如果不為空則直接pop出頭部節點,每個節點的類型就是*pollDesc類型,具體實現在runtime/netpoll.go中

type pollDesc struct {
	link *pollDesc // in pollcache, protected by pollcache.lock

	// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
	// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
	// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
	// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
	// in a lock-free way by all operations.
	// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
	// that will blow up when GC starts moving objects.
	lock    mutex // protects the following fields
	fd      uintptr
	closing bool
	everr   bool    // marks event scanning error happened
	user    uint32  // user settable cookie
	rseq    uintptr // protects from stale read timers
	rg      uintptr // pdReady, pdWait, G waiting for read or nil
	rt      timer   // read deadline timer (set if rt.f != nil)
	rd      int64   // read deadline
	wseq    uintptr // protects from stale write timers
	wg      uintptr // pdReady, pdWait, G waiting for write or nil
	wt      timer   // write deadline timer
	wd      int64   // write deadline
}

  其中rt和wt分別是讀寫定時器,用來防止讀寫超時。
fd為描述符指針,lock負責保護pollDesc內部成員變量讀寫防止多線程操作導致並發問題。
除此之外最重要的是rg和wg兩個變量,rg保存了用戶態操作pollDesc的讀協程地址,wg保存了用戶態操作pollDesc寫協程地址。
舉個例子,當我們在在用戶態協程調用read阻塞時rg就被設置為該讀協程,當內核態epoll_wait檢測read就緒后就會通過rg找到這個協程讓后恢復運行。
rg,wg默認是0,rg為pdReady表示讀就緒,可以將協程恢復,為pdWait表示讀阻塞,協程將要被掛起。wg也是如此。
所以golang其實是通過pollDesc實現用戶態和內核態信息的共享的。
回到之前poll_runtime_pollOpen函數,我們就理解了其內部生成*pollDesc,並且傳入netpollopen函數,netpollopen對應實現了epoll的init和wait,從而達到了用戶態信息和內核態的關聯。

netpollopen函數不同模型的實現不相同,epoll的實現在runtime/netpoll_epoll.go中

func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

  

從而實現了epoll將fd添加至內核epoll表里,同樣pd作為event的data傳入內核表,從而實現內核態和用戶態協程的關聯。
runtime/netpoll_epoll.go實現了epoll模型的基本操作,詳見源碼。

golang如何將一個描述符加入epoll表中

傳統的流程為:
生成socket–> bind socket–> listen–> accept
在golang中生成socket,bind,以及listen統一封裝好了
Listen–> lc.Listen –> sl.listenTCP –> internetSocket
internetSocket –> socket –> newFD && listenStream
在newFD中完成了描述符創建,在listenStream完成了bind和listen。newFD只初始化了基本的結構,未完成pollDesc類型變量pd的初始化。
我們跟隨源碼查看listen的綁定流程

unc (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
	addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
	if err != nil {
		return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
	}
	sl := &sysListener{
		ListenConfig: *lc,
		network:      network,
		address:      address,
	}
	var l Listener
	la := addrs.first(isIPv4)
	switch la := la.(type) {
	case *TCPAddr:
		l, err = sl.listenTCP(ctx, la)
	case *UnixAddr:
		l, err = sl.listenUnix(ctx, la)
	default:
		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
	}
	if err != nil {
		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
	}
	return l, nil
}

 可以看出Listen函數返回的類型為Listener接口類型,其內部根據la類型調用不同的listen函數,這里查看listenTCP 

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
	fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
	if err != nil {
		return nil, err
	}
	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

  internetSocket內部調用socket生成描述符返回

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	s, err := sysSocket(family, sotype, proto)
	if err != nil {
		return nil, err
	}
	if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
		poll.CloseFunc(s)
		return nil, err
	}
	if fd, err = newFD(s, family, sotype, net); err != nil {
		poll.CloseFunc(s)
		return nil, err
	}
	if laddr != nil && raddr == nil {
		switch sotype {
		case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
			if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		case syscall.SOCK_DGRAM:
			if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		}
	}
	if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
		fd.Close()
		return nil, err
	}
	return fd, nil
} 

socket函數做了這樣幾件事
1 調用sysSocket生成描述符
2 調用newFD封裝描述符,構造netFD類型變量
3 調用netFD的listenDatagram方法,實現bind和listen

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
	var err error
	if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
		return err
	}
	var lsa syscall.Sockaddr
	if lsa, err = laddr.sockaddr(fd.family); err != nil {
		return err
	}
	if ctrlFn != nil {
		c, err := newRawConn(fd)
		if err != nil {
			return err
		}
		if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
			return err
		}
	}
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
	if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
	if err = fd.init(); err != nil {
		return err
	}
	lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
	fd.setAddr(fd.addrFunc()(lsa), nil)
	return nil
}

  listenStream除了bind和listen操作之外,還執行了netFD的init操作,這個init操作就是將netFD和epoll關聯,將描述符和協程信息寫入epoll表

func (fd *netFD) init() error {
	errcall, err := fd.pfd.Init(fd.net, true)
	if errcall != "" {
		err = wrapSyscallError(errcall, err)
	}
	return err
}

  

前文講過fd.pfd為FD類型,是和epoll通信的核心結構,FD的Init方法內完成了pollDesc類型成員變量pd和epoll的關聯。
其內部調用了fd.pd.init(fd),pd就是fd的pollDesc類型成員變量,其init函數上面已經解釋過了調用了runtime_pollOpen,runtime_pollOpen是link到
runtime/netpoll.go中poll_runtime_pollOpen函數,這個函數將用戶態協程的pollDesc信息寫入到epoll所在的單獨線程,從而實現用戶態和內核態的關聯。
總結下bind和listen后續的消息流程就是:
listenStream –> bind&listen&init –> pollDesc.Init –> runtime_pollOpen
–> poll_runtime_pollOpen –> epollctl(EPOLL_CTL_ADD)

到此為止golang網絡描述符從生成到綁定和監聽,以及寫入epoll表的流程分析完畢,下一篇分析accept流程以及用戶態協程如何掛起,epoll就緒后如何喚醒協程。
感謝關注我的公眾號
wxgzh.jpg

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM