探索OkHttp系列 (六) 發起請求與獲取響應


前言

接下來我們要分析的是攔截器鏈上的最后一個攔截器CallServerInterceptor,它用於寫入請求與獲取響應,這里不需要再調用攔截器責任鏈的proceed方法,CallServerInterceptorintercept方法中將自己的工作做完后,就直接將響應返回給上一攔截器。

CallServerInterceptor::intercept

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()
	// 寫入請求頭
    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    // 如果請求方法允許攜帶請求體(非GET請求,非HEAD請求),並且請求體不為空  
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // 當客戶端需要發送一個體積可能很大的消息體時,就會將Expect: 100-continue寫入請求頭,然后發送
      // 不帶請求體的請求給服務端,然后服務端開始檢查請求消息頭,如果服務端滿足條件就會返回一個狀態碼
      // 100的回復告知客戶端繼續發送消息體,否則返回一個狀態碼417(Expectation Failed)的回復來告知
      // 客戶端不滿足期待條件,別發送請求體。
      // 若客戶端包含Expect: 100-continue請求頭,則進入下面的If語句。  
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        // 構建Response.Builder,當response狀態碼為100時,返回null  
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      // 客戶端不包含"Expect: 100-continue",或者,客戶端包含"Expect: 100-continue"並且服務端
      // 滿足期待條件  
      if (responseBuilder == null) {
        // 請求體是否為雙工的,默認情況下返回false  
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          // 如果滿足Expect: 100-continue,並且請求體不是雙工的,則寫入請求體  
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        // 服務器未達到"Except: 100-continue"期望,不發送請求體  
        exchange.noRequestBody()
        // 若連接不是多路復用的  
        if (!exchange.connection.isMultiplexed) {
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      // 不發送請求體
      exchange.noRequestBody()
    }

    // 默認下,requestBody.isDuplex()返回false,進入If語句  
    if (requestBody == null || !requestBody.isDuplex()) {
      // 結束請求處理  
      exchange.finishRequest()
    }
      
    if (responseBuilder == null) {
      // 讀取響應頭  
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
      
    // 構建響應,包含:原始請求、握手信息、發送時間、接收時間  
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
      
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      // 即使我們沒有請求,服務端也返回了狀態碼為100的響應,再次嘗試去讀實際的響應狀態  
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    // forWebSocket默認為false,code為101表示切換協議  
    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      // 讀取響應體  
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
      
    // 是否要關閉TCP連接   
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    // 將響應返回給上一個攔截器  
    return response
  }

該方法的大致流程如下:

  1. 寫入請求頭(Exchange.writeRequestHeaders
  2. 寫入請求體(Exchange.createRequestBody & requestBody.writeTo
  3. 讀取響應頭(Exchange.readResponseHeaders
  4. 讀取響應體(Exchange.openResponseBody

可以看出,該攔截器主要是利用了Exchange的各種方法,完成請求和響應的寫與讀,在Exchange里面,實際的I/O操作交由ExchangeCodec處理,ExchangeCodec是一個接口,它有Http1ExchangeCodecHttp2ExchangeCodec兩個實現類,分別對應HTTP/1.xHTTP/2協議版本。

ExchangeCodec

ExchangeCodec是一個接口,負責對Http請求進行編碼和對Http響應進行解碼。有如下的變量與方法

image-20211212152409968

Exchange::writeRequestHeaders調用了ExchangeCodecwriteRequestHeaders方法;

  @Throws(IOException::class)
  fun writeRequestHeaders(request: Request) {
    try {
      eventListener.requestHeadersStart(call)
      codec.writeRequestHeaders(request)
      eventListener.requestHeadersEnd(call, request)
    } catch (e: IOException) {
      eventListener.requestFailed(call, e)
      trackFailure(e)
      throw e
    }
  }

Exchange::createRequestBody調用了ExchangeCodeccreateRequestBody方法;

  @Throws(IOException::class)
  fun createRequestBody(request: Request, duplex: Boolean): Sink {
    this.isDuplex = duplex
    val contentLength = request.body!!.contentLength()
    eventListener.requestBodyStart(call)
    val rawRequestBody = codec.createRequestBody(request, contentLength)
    return RequestBodySink(rawRequestBody, contentLength)
  }

Exchange::readResponseHeaders調用了ExchangeCodecreadResponseHeaders方法

  @Throws(IOException::class)
  fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
    try {
      val result = codec.readResponseHeaders(expectContinue)
      result?.initExchange(this)
      return result
    } catch (e: IOException) {
      eventListener.responseFailed(call, e)
      trackFailure(e)
      throw e
    }
  }

Exchange::openResponseBody調用了ExchangeCodecopenResponseBodySource方法

  @Throws(IOException::class)
  fun openResponseBody(response: Response): ResponseBody {
    try {
      val contentType = response.header("Content-Type")
      val contentLength = codec.reportedContentLength(response)
      val rawSource = codec.openResponseBodySource(response)
      val source = ResponseBodySource(rawSource, contentLength)
      return RealResponseBody(contentType, contentLength, source.buffer())
    } catch (e: IOException) {
      eventListener.responseFailed(call, e)
      trackFailure(e)
      throw e
    }
  }

Http1ExchangeCodec

writeRequestHeaders

  override fun writeRequestHeaders(request: Request) {
    // 拼接請求行,包含 method、url、HTTP/1.1  
    val requestLine = RequestLine.get(request, connection.route().proxy.type())
    // 寫入請求Header和請求行  
    writeRequest(request.headers, requestLine)
  }

該方法先是拼接請求行,接着寫入請求Header和請求行。

Http1ExchangeCodec::writeRequest如下

  fun writeRequest(headers: Headers, requestLine: String) {
    check(state == STATE_IDLE) { "state: $state" }
    // 寫入請求行  
    sink.writeUtf8(requestLine).writeUtf8("\r\n")
    // 寫入請求Header  
    for (i in 0 until headers.size) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n")
    }
    sink.writeUtf8("\r\n")
    state = STATE_OPEN_REQUEST_BODY
  }

createRequestBody

Http1ExchangeCodec::createRequestBody

  override fun createRequestBody(request: Request, contentLength: Long): Sink {
    return when {
      request.body != null && request.body.isDuplex() -> throw ProtocolException(
          "Duplex connections are not supported for HTTP/1")
      // 分塊傳輸
      request.isChunked -> newChunkedSink() // Stream a request body of unknown length.
      // 請求體的長度已確定
      contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
      else -> // Stream a request body of a known length.
        throw IllegalStateException(
            "Cannot stream a request body without chunked encoding or a known content length!")
    }
  }

若要分塊傳輸,則調用newChunkedSink方法獲取輸出流;

若不使用分塊傳輸,請求體的長度已確定,則調用newKnownLengthSink方法獲取輸出流。

newChunkedSink

Http1ExchangeCodec::newChunkedSink

  private fun newChunkedSink(): Sink {
    // 檢查狀態  
    check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
    state = STATE_WRITING_REQUEST_BODY
    return ChunkedSink()
  }

這里創建了一個ChunkedSink並返回,ChunkedSinkHttp1ExchangeCodec的普通內部類,且實現了Sink接口

  /**
   * An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's 
   * responsibility to buffer chunks; typically by using a buffered sink with this sink.
   */
  // ChunkedSink含有兩個信息:分塊的數據長度 和 分塊的具體數據	
  private inner class ChunkedSink : Sink {
	...
    override fun write(source: Buffer, byteCount: Long) {
      check(!closed) { "closed" }
      if (byteCount == 0L) return
      sink.writeHexadecimalUnsignedLong(byteCount)
      sink.writeUtf8("\r\n") 
      sink.write(source, byteCount)
      sink.writeUtf8("\r\n")
    }
	...
  }

write方法中,首先寫入了十六進制的數據大小,接着寫入了數據。

newKnownLengthSink

Http1ExchangeCodec::newKnownLengthSink

  private fun newKnownLengthSink(): Sink {
    // 檢查狀態  
    check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
    state = STATE_WRITING_REQUEST_BODY
    return KnownLengthSink()
  }

這里創建了一個KnownLengthSink並返回,KnownLengthSinkHttp1ExchangeCodec的普通內部類,且實現了Sink接口

  /** An HTTP request body. */
  private inner class KnownLengthSink : Sink {
    ...
    override fun write(source: Buffer, byteCount: Long) {
      check(!closed) { "closed" }
      checkOffsetAndCount(source.size, 0, byteCount)
      sink.write(source, byteCount)
    }
	...
  }

KnownLengthSinkwrite方法中,完成了對數據的寫入。

readResponseHeaders

Http1ExchangeCodec::readResponseHeaders如下:

  override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
    check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
      "state: $state"
    }

    try {
      // 讀取並解析響應行內容  
      val statusLine = StatusLine.parse(headersReader.readLine())

      // 根據響應行和響應頭,構建Response.Builder  
      val responseBuilder = Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(headersReader.readHeaders())

      return when {
        // 若expectContinue為true且響應碼為100,返回null
        // (若用戶的請求頭包含Expect: 100-continue,則返回的響應碼有可能為100)  
        expectContinue && statusLine.code == HTTP_CONTINUE -> {
          null
        }
        statusLine.code == HTTP_CONTINUE -> {
          state = STATE_READ_RESPONSE_HEADERS
          responseBuilder
        }
        else -> {
          state = STATE_OPEN_RESPONSE_BODY
          responseBuilder
        }
      }
    } catch (e: EOFException) {
      // Provide more context if the server ends the stream before sending a response.
      val address = connection.route().address.url.redact()
      throw IOException("unexpected end of stream on $address", e)
    }
  }

該方法主要做的事情

  1. 調用readLine方法讀取響應行信息,並且調用StatusLine.parse方法將其解析為StatusLine對象
  2. 根據響應行和響應頭,構建Response.Builder

openResponseBodySource

Http1ExchangeCodec::openResponseBodySource如下

  override fun openResponseBodySource(response: Response): Source {
    return when {
      // 根據響應行和響應頭,判斷是否有響應體  
      !response.promisesBody() -> newFixedLengthSource(0)
      // 判斷是否使用分塊傳輸  
      response.isChunked -> newChunkedSource(response.request.url)
      else -> {
        // 響應體長度  
        val contentLength = response.headersContentLength()
        if (contentLength != -1L) {
          // 長度已知  
          newFixedLengthSource(contentLength)
        } else {
          // 長度未知  
          newUnknownLengthSource()
        }
      }
    }
  }

這里根據不同的情況構造不同的輸入流,分別有newFixedLengthSourcenewChunkedSourcenewUnknownLengthSource

newChunkedSource

  private fun newChunkedSource(url: HttpUrl): Source {
    check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
    state = STATE_READING_RESPONSE_BODY
    return ChunkedSource(url)
  }

返回了一個ChunkedSource對象,該對象如下

  /** An HTTP body with alternating chunk sizes and chunk bodies. */
  private inner class ChunkedSource(private val url: HttpUrl) :
      AbstractSource() {
    ...         
    override fun read(sink: Buffer, byteCount: Long): Long {
      ...
      if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) {
        // 讀取分塊數據大小  
        readChunkSize()
        if (!hasMoreChunks) return -1
      }
	
      // 調用父類的read方法,讀取分塊數據內容  
      val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk))
      ...
      return read
    }

    private fun readChunkSize() {
      ...
    }
	...
  }

newFixedLengthSource

  private fun newFixedLengthSource(length: Long): Source {
    check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
    state = STATE_READING_RESPONSE_BODY
    return FixedLengthSource(length)
  }

返回了一個FixedLengthSource對象,該對象如下

  /** An HTTP body with a fixed length specified in advance. */
  private inner class FixedLengthSource(private var bytesRemaining: Long) :
      AbstractSource() {
    ...      
    override fun read(sink: Buffer, byteCount: Long): Long {
      ...
      // 調用父類的read方法讀取數據  
      val read = super.read(sink, minOf(bytesRemaining, byteCount))
      ...
      return read
    }   
    ...      
  }

HTTP/2 流量控制

HTTP/2利用流來實現多路復用,這引入了對TCP連接的使用爭奪,會造成流被阻塞。流量控制方案確保在同一連接上的多個流之間不會造成破壞性的干擾流量控制會用於各個獨立的流,也會用於整個連接

HTTP/2"流"的流量控制的目標是:在不改變協議的情況下允許使用多種流量控制算法

HTTP/2的流量控制具有以下特征:

  1. 流量控制是特定於一個連接的。每種類型的流量控制都是在單獨的一跳的兩個端點之間的,並不是在整個端到端的路徑上的。(這里的一跳指的是HTTP連接的一跳,而不是IP路由的一跳)
  2. 流量控制是基於WINDOW_UPDATE幀的。接收方公布自己打算在每個流以及整個連接上分別接收多少字節。這是一個以信用為基礎的方案。
  3. 流量控制是有方向的,由接收者全面控制接收方可以為每個流和整個連接設置任意的窗口大小。發送方必須尊重接收方設置的流量控制限制。客戶方、服務端和中間代理作為接收方時都獨立地公布各自的流量控制窗口,作為發送方時都遵守對端的流量控制設置。
  4. 無論是新流還是整個連接,流量控制窗口的初始值是65535字節。
  5. 幀的類型決定了流量控制是否適用於幀。目前,只有DATA幀服從流量控制,所有其它類型的幀並不消耗流量控制窗口的空間。這保證了重要的控制幀不會被流量控制阻塞。
  6. 流量控制不能被禁用
  7. HTTP/2只定義了WINDOW_UPDATE幀的格式和語義,並沒有規定接收方如何決定何時發送幀、發送什么樣的值,也沒有規定發送方如何選擇發送包。具體實現可以選擇任何滿足需求的算法。

發送端持有一個流量控制窗口(window),初始值為65536,發送端每發送一個DATA幀,就會把window遞減,遞減量為DATA幀的大小,如果window為0則不能發送任何幀。接收端可以發送WINDOW_UPDATE幀給發送端,發送端以幀內的Window Size Increment作為增量,加到window上。

在TCP中已經有流量控制了,為什么在HTTP/2中還需要進行流量控制?

答:在HTTP/2中利用流來實現多路復用,在一個TCP連接中有多條流用於傳輸數據,在TCP中的流量控制是針對於整個連接的,而HTTP/2中的流量控制既針對各個獨立的流,也針對整個TCP連接。HTTP/2對每條流都進行了流量控制,確保某個流不會阻塞其他流。

Http2ExchangeCodec

writeRequestHeaders

  override fun writeRequestHeaders(request: Request) {
    if (stream != null) return

    val hasRequestBody = request.body != null
    // 獲取存放Header的List  
    val requestHeaders = http2HeadersList(request)
    // 獲取Http2Stream對象  
    stream = http2Connection.newStream(requestHeaders, hasRequestBody)
    // We may have been asked to cancel while creating the new stream and sending the request
    // headers, but there was still no stream to close.
    if (canceled) {
      stream!!.closeLater(ErrorCode.CANCEL)
      throw IOException("Canceled")
    }
    stream!!.readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
    stream!!.writeTimeout().timeout(chain.writeTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
  }

該方法主要做了兩件事情:

  1. 將請求頭放入List集合中
  2. 通過剛剛創建的請求頭集合初始化並獲取一個Http2Stream對象

http2HeadersList方法如下

    fun http2HeadersList(request: Request): List<Header> {
      val headers = request.headers
      val result = ArrayList<Header>(headers.size + 4)
      result.add(Header(TARGET_METHOD, request.method))
      result.add(Header(TARGET_PATH, RequestLine.requestPath(request.url)))
      val host = request.header("Host")
      if (host != null) {
        result.add(Header(TARGET_AUTHORITY, host)) // Optional.
      }
      result.add(Header(TARGET_SCHEME, request.url.scheme))

      for (i in 0 until headers.size) {
        // header names must be lowercase.
        val name = headers.name(i).toLowerCase(Locale.US)
        if (name !in HTTP_2_SKIPPED_REQUEST_HEADERS ||
            name == TE && headers.value(i) == "trailers") {
          result.add(Header(name, headers.value(i)))
        }
      }
      return result
    }

該方法將request.headers轉化為了ArrayList<Header>,並且添加了methodpathauthorityschemeHeader

Http2Connection::newStream如下

  /**
   * Returns a new locally-initiated stream.
   *
   * @param out true to create an output stream that we can use to send data to the 
   * remote peer.Corresponds to `FLAG_FIN`.
   */
  // 返回一個本地初始化的流
  @Throws(IOException::class)
  fun newStream(
    requestHeaders: List<Header>,
    out: Boolean
  ): Http2Stream {
    return newStream(0, requestHeaders, out)
  }

  @Throws(IOException::class)
  private fun newStream(
    associatedStreamId: Int,
    requestHeaders: List<Header>,
    out: Boolean
  ): Http2Stream {
    val outFinished = !out
    val inFinished = false
    val flushHeaders: Boolean
    val stream: Http2Stream
    val streamId: Int

    synchronized(writer) {
      synchronized(this) {
        ...
        // streamId 以2遞增  
        streamId = nextStreamId
        nextStreamId += 2
        // 初始化一個新的Http2Stream  
        stream = Http2Stream(streamId, this, outFinished, inFinished, null)
        ...
      }
      if (associatedStreamId == 0) {
        // 寫入請求頭  
        writer.headers(outFinished, streamId, requestHeaders)
      } else {
        require(!client) { "client streams shouldn't have associated stream IDs" }
        // HTTP/2 has a PUSH_PROMISE frame.
        // 發送PUSH_PROMISE幀  
        writer.pushPromise(associatedStreamId, streamId, requestHeaders)
      }
    }
	...
    return stream
  }

在該方法中,首先計算當前請求對應的streamId,然后初始化一個Http2Stream對象。因為傳入的associatedStreamId為0,所以會調用writer.header寫入請求頭(否則會調用writer.pushPromise方法發送PUSH_PROMISE幀)

寫入請求頭的Http2Writer::headers方法如下:

  @Synchronized @Throws(IOException::class)
  fun headers(
    outFinished: Boolean,
    streamId: Int,
    headerBlock: List<Header>
  ) {
    if (closed) throw IOException("closed")
    hpackWriter.writeHeaders(headerBlock)

    val byteCount = hpackBuffer.size
    val length = minOf(maxFrameSize.toLong(), byteCount)
    var flags = if (byteCount == length) FLAG_END_HEADERS else 0
    if (outFinished) flags = flags or FLAG_END_STREAM
    frameHeader(
        streamId = streamId,
        length = length.toInt(),
        type = TYPE_HEADERS,
        flags = flags
    )
    sink.write(hpackBuffer, length)

    if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
  }

這里調用了 hpackWriter.writeHeadersHeader 進行了 HPACK 壓縮編碼,然后調用 frameHeader 方法寫入幀頭,再將壓縮編碼后的 Header 數據寫入輸出流sink 中。(在 HTTP/2 中會對 Header 的信息進行 HPACK 壓縮編碼)

createRequestBody

Http2ExchangeCodec::createRequestBody

  override fun createRequestBody(request: Request, contentLength: Long): Sink {
    return stream!!.getSink()
  }

stream在前面「寫入請求頭writeRequestHeaders」中被創建,這里得到了stream的輸出流sink

Http2Stream::getSink

  fun getSink(): Sink {
    synchronized(this) {
      check(hasResponseHeaders || isLocallyInitiated) {
        "reply before requesting the sink"
      }
    }
    return sink
  }

sink變量:

  internal val sink = FramingSink(
      finished = outFinished
  )

FramingSinkHttp2Stream的一個普通內部類,實現了Sink接口:

  /*A sink that writes outgoing data frames of a stream. This class is not thread safe.*/
  internal inner class FramingSink(
    /** True if either side has cleanly shut down this stream. We shall send no more bytes. */
    var finished: Boolean = false
  ) : Sink {

	// 數據緩沖區
    private val sendBuffer = Buffer()

    /** Trailers to send at the end of the stream. */
    var trailers: Headers? = null

    var closed: Boolean = false

    @Throws(IOException::class)
    override fun write(source: Buffer, byteCount: Long) {
      this@Http2Stream.assertThreadDoesntHoldLock()

      // 將數據寫入緩沖區  
      sendBuffer.write(source, byteCount)
      while (sendBuffer.size >= EMIT_BUFFER_SIZE) {
        // 發出數據幀  
        emitFrame(false)
      }
    }

      
    /**
     * Emit a single data frame to the connection. The frame's size be limited by this stream's
     * write window. This method will block until the write window is nonempty.
     */
    // 向連接發出單個數據幀。幀的大小受此流的寫窗口的限制。此方法將阻塞,直到寫窗口非空為止
    @Throws(IOException::class)
    private fun emitFrame(outFinishedOnLastFrame: Boolean) {
      val toWrite: Long
      val outFinished: Boolean
      synchronized(this@Http2Stream) {
        writeTimeout.enter()
        try {
          // 如果應用程序已發送的字節數 >= 允許發送的字節總數,阻塞,等待WINDOW_UPDATE幀
          // 更新流量窗口大小  
          while (writeBytesTotal >= writeBytesMaximum &&
              !finished &&
              !closed &&
              errorCode == null) {
            waitForIo() // Wait until we receive a WINDOW_UPDATE for this stream.
          }
        } finally {
          writeTimeout.exitAndThrowIfTimedOut()
        }

        checkOutNotClosed() // Kick out if the stream was reset or closed while waiting.
        // 計算能夠寫入的字節數  
        toWrite = minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
        writeBytesTotal += toWrite
        outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size && errorCode == null
      }

      writeTimeout.enter()
      try {
        // 寫入數據  
        connection.writeData(id, outFinished, sendBuffer, toWrite)
      } finally {
        writeTimeout.exitAndThrowIfTimedOut()
      }
    }
    ...  
  }

FramingSinkwrite方法首先將數據寫入緩沖區中,如果緩沖區的數據大小大於等於EMIT_BUFFER_SIZE,就會調用emitFrame方法發出數據幀。

emitFrame方法中,出現了兩個變量:

  • writeBytesTotal : 應用程序已發送的字節數

  • writeBytesMaximumstream流量窗口的大小

emitFrame方法中,如果應用程序已發送的字節數超過了流量窗口的大小,就會阻塞,等待接收方(此時接收方是服務端)發送WINDOW_UPDATE幀,且writeBytesTotal < writeBytesMaximum才會發送數據幀。發送數據幀的時候,會先調用minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)計算能夠發送的數據大小,它會取「寫窗口的剩余空間」和「寫緩沖區大小」的最小值,然后調用connection.writeData寫入數據,其中connection是一個Http2Connection對象。

Http2Connection::writeData如下:

 /**
   * ...(省略)
   * Zero [byteCount] writes are not subject to flow control and will not block. 
   * The only use case for zero [byteCount] is closing a flushed output stream.
   */
  // ...(省略)零字節計數寫入不受流控制,也不會阻塞,byteCount為0的唯一用例是關閉一個刷新的輸出流
  @Throws(IOException::class)
  fun writeData(
    streamId: Int,
    outFinished: Boolean,
    buffer: Buffer?,
    byteCount: Long
  ) {
    // Empty data frames are not flow-controlled.
    // 空數據幀不受流控制  
    if (byteCount == 0L) {
      writer.data(outFinished, streamId, buffer, 0)
      return
    }

    var byteCount = byteCount
    while (byteCount > 0L) {
      var toWrite: Int
      synchronized(this@Http2Connection) {
        try {
          // 當前連接不允許發送更多數據,進入阻塞狀態,直到連接的流量控制窗口更新
          while (writeBytesTotal >= writeBytesMaximum) {
            // Before blocking, confirm that the stream we're writing is still open. It's possible
            // that the stream has since been closed (such as if this write timed out.)
            if (!streams.containsKey(streamId)) {
              throw IOException("stream closed")
            }
            this@Http2Connection.wait() // Wait until we receive a WINDOW_UPDATE.
          }
        } catch (e: InterruptedException) {
          Thread.currentThread().interrupt() // Retain interrupted status.
          throw InterruptedIOException()
        }

        // 計算能夠寫入的數據大小  
        toWrite = minOf(byteCount, writeBytesMaximum - writeBytesTotal).toInt()
        toWrite = minOf(toWrite, writer.maxDataLength())
        // 更新寫入的總數據大小  
        writeBytesTotal += toWrite.toLong()
      }
	
      // 減去將要寫入的數據大小  
      byteCount -= toWrite.toLong()
      // 寫入數據  
      writer.data(outFinished && byteCount == 0L, streamId, buffer, toWrite)
    }
  }

writeData方法中,主要工作如下:

  1. 判斷當前連接是否允許發送數據(writeBytesTotal < writeBytesMaximum),若不允許則進入阻塞狀態直到收到WINDOW_UPDATE幀更新連接的流量窗口。
  2. 計算可發送數據大小(不超過剩余窗口大小以及每個幀的限制大小),調用writer.data方法寫入數據。
  3. 若當前還有需要發送的數據,重復1、2步驟。

Http2Writer::data方法如下

  /**
   * ...
   *
   * @param source the buffer to draw bytes from. May be null if byteCount is 0.
   * @param byteCount must be between 0 and the minimum of `source.length` and [maxDataLength].
   */
  // source:緩存的字節,如果byteCount為0,source可能為null
  // byteCount:值必須在[0,min(source.length,maxDataLength)]之間
  @Synchronized @Throws(IOException::class)
  fun data(outFinished: Boolean, streamId: Int, source: Buffer?, byteCount: Int) {
    if (closed) throw IOException("closed")
    var flags = FLAG_NONE
    if (outFinished) flags = flags or FLAG_END_STREAM
    dataFrame(streamId, flags, source, byteCount)
  }

data方法主要是調用了Http2Writer::dataFrame方法:

  @Throws(IOException::class)
  fun dataFrame(streamId: Int, flags: Int, buffer: Buffer?, byteCount: Int) {
    frameHeader(
        streamId = streamId,
        length = byteCount,
        type = TYPE_DATA,
        flags = flags
    )
    if (byteCount > 0) {
      sink.write(buffer!!, byteCount.toLong())
    }
  }

先調用frameHeader方法寫入幀頭,然后再寫入相應的數據。

readResponseHeaders

Http2ExchangeCodec::readResponseHeaders如下

  override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
    // 獲取響應的Header  
    val headers = stream!!.takeHeaders()
    // 構建一個Response.Builder對象  
    val responseBuilder = readHttp2HeadersList(headers, protocol)
    // 若expectContinue為true,且響應碼為100,返回null,否則返回responseBuilder  
    return if (expectContinue && responseBuilder.code == HTTP_CONTINUE) {
      null
    } else {
      responseBuilder
    }
  }

該方法首先獲取了響應的Header,然后構建了一個Response.Builder對象,接着根據情況,返回null,或者responseBuilder。我們在CallServerInterceptor::intercept提到過,若用戶的請求頭包含Expect: 100-continue,則返回的響應碼有可能為100

Http2Stream::takeHeaders如下

  @Synchronized @Throws(IOException::class)
  fun takeHeaders(): Headers {
    readTimeout.enter()
    try {
      // 若當前接收響應Header的隊列是空的,則阻塞 
      // headersQueue的類型是ArrayDeque<Headers>
      while (headersQueue.isEmpty() && errorCode == null) {
        waitForIo()
      }
    } finally {
      readTimeout.exitAndThrowIfTimedOut()
    }
    // 取隊列的第一個Headers信息返回  
    if (headersQueue.isNotEmpty()) {
      return headersQueue.removeFirst()
    }
    throw errorException ?: StreamResetException(errorCode!!)
  }

headersQueue的類型是ArrayDeque<Headers>,表示響應Header 。如果headersQueue里面沒有元素,就阻塞,否則取其第一個元素返回。

構建Response.Builder對象的Http2ExchangeCodec::readHttp2HeadersList 方法如下:

    /** Returns headers for a name value block containing an HTTP/2 response. */
    fun readHttp2HeadersList(headerBlock: Headers, protocol: Protocol): Response.Builder {
      var statusLine: StatusLine? = null
      val headersBuilder = Headers.Builder()
      // 遍歷Headers的信息  
      for (i in 0 until headerBlock.size) {
        val name = headerBlock.name(i)
        val value = headerBlock.value(i)
        // 提取狀態行信息  
        if (name == RESPONSE_STATUS_UTF8) {
          statusLine = StatusLine.parse("HTTP/1.1 $value")
        // 過濾響應頭,HTTP_2_SKIPPED_RESPONSE_HEADERS包括:CONNECTION,HOST,KEEP_ALIVE,
        // PROXY_CONNECTION,TE,TRANSFER_ENCODING,ENCODING,UPGRADE    
        } else if (name !in HTTP_2_SKIPPED_RESPONSE_HEADERS) {
          headersBuilder.addLenient(name, value)
        }
      }
      if (statusLine == null) throw ProtocolException("Expected ':status' header not present")

      // 構建Response.Builder對象  
      return Response.Builder()
          .protocol(protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(headersBuilder.build())
    }

該方法主要是對Headers對象進行遍歷,遍歷的過程中,會提取狀態行信息,並且符合要求的響應頭記錄下來,最后構建一個Response.Builder對象。

openResponseBodySource

Http2ExchangeCodec::openResponseBodySource如下:

  override fun openResponseBodySource(response: Response): Source {
    return stream!!.source
  }

這里返回了Http2Streamsource對象,Http2Streamsource對象定義如下:

  internal val source = FramingSource(
      maxByteCount = connection.okHttpSettings.initialWindowSize.toLong(),
      finished = inFinished
  )

這里返回了一個FramingSource對象,FramingSourceHttp2Stream的普通內部類,實現了okioSource接口:

  // 該Source用於讀取流上即將到來的數據幀
  inner class FramingSource internal constructor(
    /** Maximum number of bytes to buffer before reporting a flow control error. */
    private val maxByteCount: Long,
    internal var finished: Boolean
  ) : Source {
    // 接收數據緩沖區  
    val receiveBuffer = Buffer()
    // 讀取數據緩沖區  
    val readBuffer = Buffer()

    var trailers: Headers? = null

    /** True if the caller has closed this stream. */
    internal var closed: Boolean = false

    @Throws(IOException::class)
    override fun read(sink: Buffer, byteCount: Long): Long {
      require(byteCount >= 0L) { "byteCount < 0: $byteCount" }

      while (true) {
        var tryAgain = false
        var readBytesDelivered = -1L
        var errorExceptionToDeliver: IOException? = null

        // 1. Decide what to do in a synchronized block.

        synchronized(this@Http2Stream) {
          readTimeout.enter()
          try {
            if (errorCode != null) {
              // Prepare to deliver an error.
              errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!)
            }

            if (closed) {
              throw IOException("stream closed")
            } else if (readBuffer.size > 0L) {
              // Prepare to read bytes. Start by moving them to the caller's buffer.
              // 讀取數據  
              readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size))
              readBytesTotal += readBytesDelivered

              // 未被WINDOW_UPDATE幀確認的字節總數  
              val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
              if (errorExceptionToDeliver == null &&
                  unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
                // Flow control: notify the peer that we're ready for more data! Only 
                // send a WINDOW_UPDATE if the stream isn't in error.
                // 流控制:通知對方我們已經准備好接收更多數據!只在流沒有錯誤的情況下發送
                // WINDOW_UPDATE幀。    
                connection.writeWindowUpdateLater(id, unacknowledgedBytesRead)
                readBytesAcknowledged = readBytesTotal
              }
            } else if (!finished && errorExceptionToDeliver == null) {
              // Nothing to do. Wait until that changes then try again.
              // 沒有數據可讀,阻塞,被喚醒之后再次嘗試讀取數據  
              waitForIo()
              tryAgain = true
            }
          } finally {
            readTimeout.exitAndThrowIfTimedOut()
          }
        }

        // 2. Do it outside of the synchronized block and timeout.

        // 是否要再次嘗試  
        if (tryAgain) {
          continue
        }

        if (readBytesDelivered != -1L) {
          // Update connection.unacknowledgedBytesRead outside the synchronized block.
          updateConnectionFlowControl(readBytesDelivered)
          return readBytesDelivered
        }

        if (errorExceptionToDeliver != null) {.
          throw errorExceptionToDeliver!!
        }

        return -1L // This source is exhausted.
      }
    }

    private fun updateConnectionFlowControl(read: Long) {
      ...
    }

    @Throws(IOException::class)
    internal fun receive(source: BufferedSource, byteCount: Long) {
      ...
    }
	...
  }

FramingSourceread方法中,若有數據可讀,則調用readBuffer.read方法讀取數據,若沒有數據可讀,則阻塞,被喚醒之后再次嘗試讀取數據。

當接收方(此時接收方是客戶端)接收的數據大小超過流量控制窗口大小,就調用connection.writeWindowUpdateLater方法通知對方增加窗口大小,增加的大小為"未被WINDOW_UPDATE幀確認的字節總數"。

Http2Connection::writeWindowUpdateLater如下

  internal fun writeWindowUpdateLater(
    streamId: Int,
    unacknowledgedBytesRead: Long
  ) {
    writerQueue.execute("$connectionName[$streamId] windowUpdate") {
      try {
        writer.windowUpdate(streamId, unacknowledgedBytesRead)
      } catch (e: IOException) {
        failConnection(e)
      }
    }
  }

這里異步發送一個WINDOW_UPDATE幀,Http2Writer::windowUpdate如下:

  /**
   * Inform peer that an additional `windowSizeIncrement` bytes can be sent on 
   * `streamId`, or the connection if `streamId` is zero.
   */
  @Synchronized @Throws(IOException::class)
  fun windowUpdate(streamId: Int, windowSizeIncrement: Long) {
    if (closed) throw IOException("closed")
    require(windowSizeIncrement != 0L && windowSizeIncrement <= 0x7fffffffL) {
      "windowSizeIncrement == 0 || windowSizeIncrement > 0x7fffffffL: $windowSizeIncrement"
    }
    frameHeader(
        streamId = streamId,
        length = 4,
        type = TYPE_WINDOW_UPDATE,
        flags = FLAG_NONE
    )
    sink.writeInt(windowSizeIncrement.toInt())
    sink.flush()
  }

先寫入WINDOW_UPDATE的幀頭,然后再寫入窗口增大數值。

總結

HTTP/1.x 的請求寫入和響應讀取中,主要是將普通請求、響應及 chunked 特性下的請求、響應進行了不同的處理。

HTTP/2 的請求寫入和響應讀取中,對流量控制進行了很好的支持,每個stream以及connection都會記錄當前發送數據大小以及流量窗口大小,通過窗口大小對寫入的數據大小進行了限制,客戶端和服務端在HTTP/2的流量控制中都可以作為接收方,當請求寫入的時候,服務端是接收方,當讀取響應的時候,客戶端是接收方。此外,在傳輸Header的時候,也使用了HTTP/2的特性,利用HPACKHeader 進行壓縮編碼。

參考

  1. 理解HTTP/2流量控制(一)_-CSDN博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM