go gRPC 客戶端內存暴漲原因分析


創建一個 gRPC 客戶端連接,會創建的幾個協程:

1)transport.loopyWriter.run 往服務端發送數據協程,流控時會阻塞,結果是數據堆積,內存上漲

2)transport.http2Client.reader 接收服務端數據協程,並會調用 t.controlBuf.throttle() 執行流控

現象描述:

客戶端到服務端單個連接,壓測時內存快速增長,直到 OOM 掛掉。在 OOM 之前停止壓測,內存會逐漸下降。客戶端到服務端改為兩個連接時,壓測時未出現內存快速增長。

問題原因:

每一個 gRPC 連接均有一個獨立的隊列,掛在該連接的所有 streams 共享,請求相當於生產,往服務端發送請求相當於消費,當生產速度大於消費速度時,就會出現內存持續上長。該隊列沒有長度限制,所以會持續上長。快速上漲的原因是協程 transport.loopyWriter).run 沒有被調度運行,隊列消費停止,導致隊列只增不減。停止壓測后,協程 transport.loopyWriter).run 會恢復執行。

當不再消費時,可觀察到大量如下協程:

grpc/internal/transport.(*Stream).waitOnHeader (0x90c8d5)
runtime/netpoll.go:220 internal/poll.runtime_pollWait (0x46bdd5)

使用 netstat 命令可觀察到發送隊列大量堆積。

解決方案:

控制生產速度,即控制單個 gRPC 客戶端連接發送的請求數量。此外,還可以啟用客戶端的 keepalive 關閉連接。

后話

go gRPC 如果提供取 controlBuffer 的隊列 list 的大小接口,可使得更為簡單和友好。

相關源碼:

  • http2Client
// 源碼所在文件:google.golang.org/grpc/http2_client.go
// http2Client 實現了接口 ClientTransport
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
  conn net.Conn // underlying communication channel
  loopy *loopyWriter // 生產和消費關聯的隊列在這里面,所在文件:controlbuf.go

  // controlBuf delivers all the control related tasks (e.g., window
	// updates, reset streams, and various settings) to the controller.
	controlBuf *controlBuffer // 所在文件:controlbuf.go
  
  maxConcurrentStreams  uint32
  streamQuota           int64
	streamsQuotaAvailable chan struct{}
  waitingStreams        uint32
  
  initialWindowSize int32
}

type controlBuffer struct {
  list *itemList // 隊列
}

type loopyWriter struct {
  // 關聯上 controlBuffer,
  // 消費 controlBuffer 中的隊列 list,
  // 生產由 http2Client 通過 controlBuffer 進行。
  cbuf *controlBuffer
}
  • 一個 gRPC 客戶端連接被創建時,即會創建一個 run 協程,run 協程為隊列的消費者
// 源碼所在文件:internal/transport/http2_client.go
// 所在包名:transport
// 打斷點方法:
// (dlv) b transport.newHTTP2Client
// 被調用:協程 grpc.addrConn.resetTransport
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  // 建立連接,注意不同於 grpc.Dial,
  // grpc.Dial 實際不包含連接,對於 block 調用也只是等待連接狀態為 Ready 。
  // transport.dial 的實現調用了 net.Dialer.DialContext,
  // 而 net.Dialer.DialContext 是更底層 Go 自帶包的組成部分,不是 gRPC 的組成部分。
  // net.Dialer.DialContext 的實現支持:TCP、UDP、Unix等:。
  conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  t.controlBuf = newControlBuffer(t.ctxDone) // 含發送隊列的初始化

  if t.keepaliveEnabled {
		t.kpDormancyCond = sync.NewCond(&t.mu)
		go t.keepalive() // 保活協程
	}
  
  // Start the reader goroutine for incoming message. Each transport has
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
	// dispatches the frame to the corresponding stream entity.
  go t.reader()
  
  // Send connection preface to server.
	n, err := t.conn.Write(clientPreface)
  
  go func() {
    t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
    err := t.loopy.run()
  }
}

0  0x00000000008f305b in google.golang.org/grpc/internal/transport.newHTTP2Client
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/http2_client.go:166
1  0x00000000009285a8 in google.golang.org/grpc/internal/transport.NewClientTransport
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/transport.go:577
2  0x00000000009285a8 in google.golang.org/grpc.(*addrConn).createTransport
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/clientconn.go:1297
3  0x0000000000927e48 in google.golang.org/grpc.(*addrConn).tryAllAddrs
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/clientconn.go:1227

   // 下列的 grpc.addrConn.resetTransport 是一個協程
4  0x000000000092737f in google.golang.org/grpc.(*addrConn).resetTransport
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/clientconn.go:1142
5  0x0000000000471821 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1374
   
// 源碼所在文件:grpc/clientconn.go
// 所在包名:grpc
// 被調用:grpc.addrConn.getReadyTransport
func (ac *addrConn) connect() error {
  // Start a goroutine connecting to the server asynchronously.
	go ac.resetTransport()
}

// 傳統類型的 RPC 調用從 grpc.ClientConn.Invoke 開始:
//    XXX.pb.go // 編譯 .proto 生成的文件
// -> main.helloServiceClient.Hello
// -> grpc.ClientConn.Invoke // 在 call.go 中,如果是 stream RPC,則從調用 grpc.ClientConn.NewStream 開始
// -> grpc.invoke // 在 call.go 中
// -> grpc.newClientStream // 在 stream.go 中
// -> grpc.clientStream.newAttemptLocked // 在 stream.go 中
// -> grpc.ClientConn.getTransport // 在 clientconn.go 中
// -> grpc.pickerWrapper.pick // 在 picker_wrapper.go 中
// -> grpc.addrConn.getReadyTransport
// -> grpc.addrConn.connect // 創建協程 resetTransport
// -> grpc.addrConn.resetTransport // ***是一個協程***
// -> grpc.addrConn.tryAllAddrs
// -> grpc.addrConn.createTransport // 在clientconn.go 中
// -> transport.NewClientTransport // 在 transport.go 中
// -> transport.newHTTP2Client
// -> transport.dial
// -> net.Dialer.DialContext // net 為 Go 自帶包,不是 gRPC 包
// -> net.sysDialer.dialSerial
// -> net.sysDialer.dialSingle
// -> net.sysDialer.dialTCP/dialUDP/dialUnix/dialIP
// -> net.sysDialer.doDialTCP // 以 dialTCP 為例
// -> net.internetSocket // 從這開始,和 C 語言的使用類似了,只不過包裝了不同平台的
// -> net.socket
// -> net.sysSocket
//
// stream 類型的 RPC 從 NewStream 開始:
// grpc.newClientStream 除被 grpc.invoke 調用外,還會被 stream.go 中的 grpc.ClientConn.NewStream 直接調用
//    XXX.pb.go // 編譯 .proto 生成的文件
// -> grpc.ClientConn.NewStream // 在 stream.go 中
// -> grpc.newClientStream // 在 stream.go 中
// -> 從這開始同上述流程

// 源碼所在文件:grpc/clientconn.go
// 所在包名:grpc
// 被調用:調用源頭為 grpc.ClientConn.NewStream,其實是 grpc.newClientStream 。
// getReadyTransport returns the transport if ac's state is READY.
// Otherwise it returns nil, false.
// If ac's state is IDLE, it will trigger ac to connect.
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
  // Trigger idle ac to connect.
	if idle {
		ac.connect()
	}
}
  • 消費協程 run 相關源代碼摘要
// 源碼所在文件:internal/transport/controlbuf.go
// Loopy receives frames from the control buffer.
// Each frame is handled individually; most of the work done by loopy goes
// into handling data frames. Loopy maintains a queue of active streams, and each
// stream maintains a queue of data frames; as loopy receives data frames
// it gets added to the queue of the relevant stream.
// Loopy goes over this list of active streams by processing one node every iteration,
// thereby closely resemebling to a round-robin scheduling over all streams. While
// processing a stream, loopy writes out data bytes from this stream capped by the min
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
type loopyWriter struct {
  // cbuf 維護了隊列(list *itemList),
  // 如果不加控制,就會導致內存大漲。
  cbuf      *controlBuffer
  sendQuota uint32
  
  // estdStreams is map of all established streams that are not cleaned-up yet.
	// On client-side, this is all streams whose headers were sent out.
	// On server-side, this is all streams whose headers were received.
  estdStreams map[uint32]*outStream // Established streams.

  // activeStreams is a linked-list of all streams that have data to send and some
	// stream-level flow control quota.
	// Each of these streams internally have a list of data items(and perhaps trailers
	// on the server-side) to be sent out.
  activeStreams *outStreamList
}

// 源碼所在文件:internal/transport/controlbuf.go
func (l *loopyWriter) run() (err error) {
  // 通過 get 間接調用 dequeue 和 dequeueAll
  for {
    it, err := l.cbuf.get(true)
    if err != nil {
			return err
		}
		if err = l.handle(it); err != nil {
			return err
		}
		if _, err = l.processData(); err != nil {
			return err
		}
  }
}

func (c *controlBuffer) get(block bool) (interface{}, error) {
  for {
    c.mu.Lock() // 隊列操作需要加鎖保護
    ......
    // 消費隊列(出隊)
    h := c.list.dequeue().(cbItem)
    ......
    if !block {
			c.mu.Unlock()
			return nil, nil
		}
    // 阻塞
    c.consumerWaiting = true
		c.mu.Unlock()
		select {
		case <-c.ch: // 對應 executeAndPut 中喚醒的:c.ch <- struct{}
		case <-c.done:
			c.finish() // 清空隊列
			return nil, ErrConnClosing // indicates that the transport is closing
		}
  }
}

func (c *controlBuffer) finish() {
  ......
  // 清空隊列
  for head := c.list.dequeueAll(); head != nil; head = head.next {
  ......
}
  • 特別說明

每一次 gRPC 調用,客戶端均會創建一個新的 Stream,
該特性使得同一 gRPC 連接可以同時處理多個調用。請求的發送並不是同步的,而是基於隊列的異步發送。
每一個 gRPC 客戶端連接均有一個自己的隊列,gRPC 並沒有直接限定隊列大小,所以如果不加任何限制則會內存暴漲,直到 OOM 發生。

  • 生產者:發起調用的客戶端*
message HelloReq { // 請求
    string text = 1;
}
message HelloRes { // 響應
    string text = 1;
}
service HelloService {
    rpc Hello(HelloReq) returns (HelloRes) {}
}

grpcClient := grpc.Dial(endpoint, opts)
helloClient := NewHelloServiceClient(grpcClient)
// Hello 調用為生產源頭
res, err := helloClient.Hello(ctx, &req)

// Hello 的實現,為 protoc 編譯生成的代碼
func (c *helloServiceClient) Hello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloRes, error) {
	out := new(HelloRes)
	err := c.cc.Invoke(ctx, "/main.HelloService/Hello", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// 源碼所在文件:google.golang.org/grpc/call.go
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
	// allow interceptor to see all applicable call options, which means those
	// configured as defaults from dial option as well as per-call options
	opts = combine(cc.dopts.callOptions, opts)

	if cc.dopts.unaryInt != nil {
		return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
	}
  // 轉調用私有的 invoke 函數
	return invoke(ctx, method, args, reply, cc, opts...)
}

// 源碼所在文件:google.golang.org/grpc/call.go
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  // newClientStream 間接往隊列中生產消息
  // pprof 顯示 newClientStream 調用的 withRetry 占用內存大頭
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}

// 源碼所在文件:google.golang.org/grpc/stream.go
// 設置斷點:(dlv) b clientStream.SendMsg
func (cs *clientStream) SendMsg(m interface{}) (err error) {
}

// 源碼所在文件:google.golang.org/grpc/stream.go
// pprof 顯示 newClientStream 消耗太多內存,而這又發生在其調用的 withRetry 中
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  // 問題出在 newStream 分配的內存越來越多,
  // 但並非嚴格的泄漏,只是不斷積累,但壓力下來后會緩慢釋放。
  // newStream 的實現是調用 NewStream:
  // cs.callHdr.PreviousAttempts = cs.numRetries
  // s, err := a.t.NewStream(cs.ctx, cs.callHdr)
  // cs.attempt.s = s
  // 這里 a 的類型為 csAttempt:
  // implements a single transport stream attempt within a clientStream
  op := func(a *csAttempt) error { return a.newStream() }
  // 內存問題所在:withRetry,進一步內存發生在非直接調用的:NewStream
	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
		cs.finish(err)
		return nil, err
	}
}

// 源碼所在文件:google.golang.org/grpc/stream.go
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
  for { // 循環,內存上漲,這兒並沒有循環
    // 調用 newStream,
    // 這里間接往隊列中生產消息。
    err := op(a)
  }
}

// 源碼所在文件:google.golang.org/grpc/stream.go
func (a *csAttempt) newStream() error {
	cs := a.cs
	cs.callHdr.PreviousAttempts = cs.numRetries
  // 下列的 t 類型為:transport.ClientTransport,
  // 但注意 transport.ClientTransport 是一個 interface,並不是 struct。
  // 而 http2Client 是一個針對 ClientTransport 接口的實現。
	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
}

// 源碼所在文件:google.golang.org/grpc/internal/transport/http2_client.go
// NewStream creates a stream and registers it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  // 內存上漲,是因為隊列中存了大量的 headerFields 和 s
  headerFields, err := t.createHeaderFields(ctx, callHdr)
  s := t.newStream(ctx, callHdr)
  // hdr 聚合了 headerFields 和 s
  hdr := &headerFrame{
    hf:        headerFields,
    initStream: func(id uint32) error {
      t.activeStreams[id] = s
    },
    wq:         s.wq,
  }
  for {
    // 調用 executeAndPut 入隊(生產)
    // 內存上漲,是因為隊列中存了大量的 hdr 。
    success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
    }, hdr)
  }
}

// 源碼所在文件:google.golang.org/grpc/internal/transport/controlbuf.go
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
  // 入隊操作(生產)
  // 當入隊快於出隊(消費)時,就會出現內存上漲。
  c.list.enqueue(it)
  if it.isTransportResponseFrame() { // 調用接口 cbItem 定義的方法
    // counts the number of queued items that represent the response of an action initiated by the peer
    // 變量 transportResponseFrames 記錄了隊列大小
    c.transportResponseFrames++
  }
}

// 源碼所在文件:google.golang.org/grpc/internal/transport/controlbuf.go
func (c *controlBuffer) put(it cbItem) error {
  // 入隊操作(生產)
	_, err := c.executeAndPut(nil, it)
	return err
}

type cbItem interface {
	isTransportResponseFrame() bool
}
func (*registerStream) isTransportResponseFrame() bool { return false }
func (h *headerFrame) isTransportResponseFrame() bool {
	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
}
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
func (*dataFrame) isTransportResponseFrame() bool { return false }
func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
	return false // window updates are throttled by thresholds
}
func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
func (*outgoingSettings) isTransportResponseFrame() bool { return false }
func (*incomingGoAway) isTransportResponseFrame() bool { return false }
func (*goAway) isTransportResponseFrame() bool { return false }
func (*ping) isTransportResponseFrame() bool { return true }
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
  • **協程 transport.loopyWriter.run **
0  0x000000000043bfa5 in runtime.gopark
   at /usr/local/go/src/runtime/proc.go:307
1  0x000000000044c10f in runtime.selectgo
   at /usr/local/go/src/runtime/select.go:338
   一個連接只有一個 run 協程
2  0x00000000008eca2f in google.golang.org/grpc/internal/transport.(*controlBuffer).get
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/controlbuf.go:417
3  0x00000000008ed76e in google.golang.org/grpc/internal/transport.(*loopyWriter).run
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/controlbuf.go:544
4  0x000000000090f13b in google.golang.org/grpc/internal/transport.newHTTP2Client.func3
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/http2_client.go:356
5  0x0000000000471821 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1374
  • 協程 transport.controlBuffer.throttle
  • 協程 transport.http2Client.reader
0  0x000000000043bfa5 in runtime.gopark
   at /usr/local/go/src/runtime/proc.go:307
1  0x000000000044c10f in runtime.selectgo
   at /usr/local/go/src/runtime/select.go:338
2  0x00000000008ec335 in google.golang.org/grpc/internal/transport.(*controlBuffer).throttle
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/controlbuf.go:319
   
   文件 http2_client.go 函數 transport.newHTTP2Client 會創建協程 reader
3  0x00000000008fdadd in google.golang.org/grpc/internal/transport.(*http2Client).reader
   at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/http2_client.go:1293
4  0x0000000000471821 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1374
  • 協程 transport.Stream.waitOnHeader
// 大量 waitOnHeader 協程
func (s *Stream) waitOnHeader() {
	if s.headerChan == nil {
		// On the server headerChan is always nil since a stream originates
		// only after having received headers.
		return
	}
	select {
	case <-s.ctx.Done():
		// Close the stream to prevent headers/trailers from changing after
		// this function returns.
		s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
		// headerChan could possibly not be closed yet if closeStream raced
		// with operateHeaders; wait until it is closed explicitly here.
		<-s.headerChan
	case <-s.headerChan:
	}
}

// 實為調用協程
0  0x000000000043bfa5 in runtime.gopark
    at /usr/local/go/src/runtime/proc.go:307
 1  0x000000000044c10f in runtime.selectgo
    at /usr/local/go/src/runtime/select.go:338

 2  0x000000000090c8d5 in google.golang.org/grpc/internal/transport.(*Stream).waitOnHeader
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/transport.go:321
 3  0x0000000000942805 in google.golang.org/grpc/internal/transport.(*Stream).RecvCompress
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/internal/transport/transport.go:336
 4  0x0000000000942805 in google.golang.org/grpc.(*csAttempt).recvMsg
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/stream.go:894
 5  0x000000000094ad06 in google.golang.org/grpc.(*clientStream).RecvMsg.func1
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/stream.go:759
 6  0x000000000094057c in google.golang.org/grpc.(*clientStream).withRetry
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/stream.go:617
 7  0x0000000000941505 in google.golang.org/grpc.(*clientStream).RecvMsg
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/stream.go:758
 8  0x0000000000921d3b in google.golang.org/grpc.invoke
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/call.go:73
 9  0x0000000000921ad3 in google.golang.org/grpc.(*ClientConn).Invoke
    at /root/go/pkg/mod/google.golang.org/grpc@v1.33.2/call.go:37
10  0x0000000000b185f4 in /root/hello/grpc/proto.(*HelloClient).Call
    at /root/hello/hello.pb.go:70
  • 協程 poll_runtime_pollWait
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
        errcode := netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
                return errcode
        }
        // As for now only Solaris, illumos, and AIX use level-triggered IO.
        if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
                netpollarm(pd, mode)
        }
        for !netpollblock(pd, int32(mode), false) {
                errcode = netpollcheckerr(pd, int32(mode))
                if errcode != pollNoError {
                        return errcode
                }
                // Can happen if timeout has fired and unblocked us,
                // but before we had a chance to run, timeout has been reset.
                // Pretend it has not happened and retry.
        }
        return pollNoError
}

函數 runtime.gopark 用於協程的切換
0  0x000000000043bfa5 in runtime.gopark
   at /usr/local/go/src/runtime/proc.go:307
1  0x000000000043447b in runtime.netpollblock
   at /usr/local/go/src/runtime/netpoll.go:436

2  0x000000000046bdd5 in internal/poll.runtime_pollWait
   at /usr/local/go/src/runtime/netpoll.go:220
3  0x00000000004d9685 in internal/poll.(*pollDesc).wait
   at /usr/local/go/src/internal/poll/fd_poll_runtime.go:87
4  0x00000000004da6c5 in internal/poll.(*pollDesc).waitRead
   at /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
5  0x00000000004da6c5 in internal/poll.(*FD).Read
   at /usr/local/go/src/internal/poll/fd_unix.go:159
6  0x00000000005327af in net.(*netFD).Read
   at /usr/local/go/src/net/fd_posix.go:55
7  0x000000000054688e in net.(*conn).Read
   at /usr/local/go/src/net/net.go:182
8  0x00000000006e14b8 in net/http.(*connReader).backgroundRead
   at /usr/local/go/src/net/http/server.go:690
9  0x0000000000471821 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1374
  • 隊列
type itemList struct {
	head *itemNode
	tail *itemNode
}

type itemNode struct {
	it   interface{}
	next *itemNode
}

// 入隊(生產)
//
// 從這可看到,
// 隊列沒有大小限制,生產(入隊)不受限,
// 所以一旦生產速度大於消費速度,就會出現堆積導致內存上漲。
func (il *itemList) enqueue(i interface{}) {
	n := &itemNode{it: i}
	if il.tail == nil {
		il.head, il.tail = n, n
		return
	}
	il.tail.next = n
	il.tail = n
}

// 出隊(消費)
func (il *itemList) dequeue() interface{} {
	if il.head == nil {
		return nil
	}
	i := il.head.it
	il.head = il.head.next
	if il.head == nil {
		il.tail = nil
	}
	return i
}

// 清空(消費),直接丟棄了
func (il *itemList) dequeueAll() *itemNode {
	h := il.head
	il.head, il.tail = nil, nil
	return h
}

func (il *itemList) isEmpty() bool {
	return il.head == nil
}
// 源碼所在文件:internal/transport/controlbuf.go
//    transport.loopyWriter.run
// -> transport.loopyWriter.handle
// -> transport.loopyWriter.headerHandler
// -> transport.loopyWriter.writeHeader // 阻塞在這了
func (l *loopyWriter) handle(i interface{}) error {
	switch i := i.(type) {
	case *incomingWindowUpdate:
		return l.incomingWindowUpdateHandler(i)
	case *outgoingWindowUpdate:
		return l.outgoingWindowUpdateHandler(i)
	case *incomingSettings:
		return l.incomingSettingsHandler(i)
	case *outgoingSettings:
		return l.outgoingSettingsHandler(i)
	case *headerFrame:
		return l.headerHandler(i) // 阻塞在這了
  ......
}

// 源碼所在文件:internal/transport/controlbuf.go
func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
  // 阻塞在這兒:
  // 結構體 frame 定義在 internal/transport/http_util.go 文件中,
  // 成員 fr 的類型為 http2.Framer,定義在 x/net/http2/frame.go 文件中
  err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  })
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM