okhttp詳解


okhttp詳解

概論

okhttp是一個網絡庫,其功能主要有兩點:

  1. 請求的發起與響應的接收
  2. 多個請求的管理

當前版本為4.9.0

請求的發起與響應的接收

這個過程是U型的,如拆輪子系列:拆 OkHttp中的流程圖所顯示的:

okhttp_full_process

先是向下一步一步地加工報文,到最底端將報文發送到服務端,然后拿到服務器返回的報文后,再向上一步一步將報文轉化為開發者友好的Response。

請求報文

請求報文分為三個部分:

  1. 請求行
  2. 請求頭
  3. 請求體

請求行是對該請求的大致描述,請求頭是對請求行的補充以及對請求體的約束,請求體是客戶端想要傳遞給服務器的數據。

請求行

請求行分為三個部分,每個部分用空格隔開:

  1. Http方法,一般為GET、POST、HEAD、PUT、DELETE、OPTIONS、TRACE、CONNECT;
  2. 請求目標,就是url,例如https://www.baidu.com
  3. HTTP版本,例如HTTP/1.1

綜合一下就是:

GET https://www.baidu.com HTTP/1.1
請求頭

請求頭的結構比較簡單,以鍵值對的形式存在,以回車符和換行符隔開,形式如下:

Bdpagetype: 2
Bdqid: 0xed9518d40011b00b
Cache-Control: private
Connection: keep-alive
Content-Encoding: gzip
Content-Type: text/html;charset=utf-8
Date: Sun, 21 Feb 2021 13:35:58 GMT
Expires: Sun, 21 Feb 2021 13:35:58 GMT
Server: BWS/1.1
Set-Cookie: BDSVRTM=42; path=/
Set-Cookie: BD_HOME=1; path=/
Set-Cookie: H_PS_PSSID=33425_33506_33403_33344_31253_26350; path=/; domain=.baidu.com
Strict-Transport-Security: max-age=172800
Traceid: 1613914558067414836217119616857332101131
X-Ua-Compatible: IE=Edge,chrome=1
請求體

請求體比較靈活,沒有固定的格式,可以根據服務的需要自定義格式;
但是有幾種常見的格式:

  1. Json
{
	"rate": 0.0867625,
	"displayable": true
}

服務器可以將這種數據格式轉化為對應的數據類;

  1. 表單 (application/x-www-form-urlencoded)
name=tom&password=1234&realme=tomson
  1. 二進制表單(multipart/form-data)
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="url"

https://www.baidu.com/
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="name"

waffle
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="desk"; filename="桌子.jpg"
Content-Type: image/jpeg

...contents of 桌子.jpg...

其中------WebKitFormBoundary7MA4YWxkTrZu0gW用於分割不同的字段;

響應報文

響應報文的格式與請求報文的格式十分相似,唯一不同的是響應行不同;

響應頭分為三個部分,HTTP版本、狀態碼和狀態碼的文本描述;

HTTP/1.1 200 OK

關於狀態碼,請看這篇文章

攔截器

val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
    interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

以上七種攔截器的具體功能稍后再說,先解釋其具體的工作過程,一個請求是如何通過這一U型過程的。

下面這一段代碼是請求的開端:

val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }

下面這一段代碼是RealInterceptorChain::proceed的具體內容;

override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // index 代表了當前調用鏈中應該使用攔截器的下標
    // 因此 next 代表了調用下一個攔截器的Chain
    val next = copy(index = index + 1, request = request)
    // 獲取當前環節所需攔截器
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

下面是Interceptor的基本結構:

override fun intercept(chain: Interceptor.Chain): Response{
    // 對Request的處理加工
    val response = chain.proceed(request)
    // 對Response的處理加工
    
    return response
}

通過這種責任鏈的設計模式,保證了報文的處理過程。

client.interceptors

這是用戶自定義的攔截器,默認是空的列表;

RetryAndFollowUpInterceptor

此攔截器並沒有對Request的加工,只有對Response的加工;

對Response的加工分為兩個部分:

  1. 失敗重試
try {
    response = realChain.proceed(request)
    newExchangeFinder = true
} catch (e: RouteException) {
      // The attempt to connect via a route failed. The request will not have been sent.
      if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
        throw e.firstConnectException.withSuppressed(recoveredFailures)
      } else {
        recoveredFailures += e.firstConnectException
      }
      newExchangeFinder = false
      continue
} catch (e: IOException) {
      // An attempt to communicate with a server failed. The request may have been sent.
      if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
        throw e.withSuppressed(recoveredFailures)
      } else {
         recoveredFailures += e
      }
      newExchangeFinder = false
       continue
  }

private fun recover(
    e: IOException,
    call: RealCall,
    userRequest: Request,
    requestSendStarted: Boolean
  ): Boolean {
    // The application layer has forbidden retries.
    if (!client.retryOnConnectionFailure) return false

    // We can't send the request body again.
    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

    // This exception is fatal.
    if (!isRecoverable(e, requestSendStarted)) return false

    // No more routes to attempt.
    if (!call.retryAfterFailure()) return false

    // For failure recovery, use the same route selector with a new connection.
    return true
  }

在處理過程中出現異常且不滿足重試條件的,直接拋出異常,不再重試;

  1. 重定向
 // Attach the prior response if it exists. Such responses never have a body.
 if (priorResponse != null) {
   response = response.newBuilder()
       .priorResponse(priorResponse.newBuilder()
    .body(null)
    .build())
       .build()
 }

 val exchange = call.interceptorScopedExchange
// 根據狀態碼,生成一個實例,如果不是重定向,則返回null
 val followUp = followUpRequest(response, exchange)

// 不是重定向直接返回response
 if (followUp == null) {
   if (exchange != null && exchange.isDuplex) {
     call.timeoutEarlyExit()
   }
   closeActiveExchange = false
   return response
 }

// 是重定向,但是只能發起一次請求,同樣直接返回response
 val followUpBody = followUp.body
 if (followUpBody != null && followUpBody.isOneShot()) {
   closeActiveExchange = false
   return response
 }

 response.body?.closeQuietly()

// 重定向次數大於20,拋出異常
 if (++followUpCount > MAX_FOLLOW_UPS) {
   throw ProtocolException("Too many follow-up requests: $followUpCount")
 }

// 進行request的刷新,進入下一次循環
 request = followUp
 priorResponse = response
BridgeInterceptor
  1. 根據Request的內容添加請求頭
val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
  1. 獲得Response后保留cookie,並且解碼響應體
    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    // 對gzip的響應體進行解碼
    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
CacheInterceptor
  1. 根據Resquest獲取緩存中的Response,如果緩存有效則直接返回;如果無效則向下執行;
    val call = chain.call()
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    // CacheStrategy 有兩個字段,networkRequest和cacheResponse
    // 當cacheResponse不為空時,緩存有效;
    // 當networkRequest不為空時,緩存無效;
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      cacheCandidate.body?.closeQuietly()
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    // 緩存有效,直接返回
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }

    // 緩存命中記錄
    if (cacheResponse != null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) {
      listener.cacheMiss(call)
    }
  1. 發起請求,更新緩存
    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }

    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            // This will log a conditional cache miss only.
            listener.cacheMiss(call)
          }
        }
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
ConnectInterceptor

這個攔截器只做了一件事:打開客戶端到服務端的Socket連接;

val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)

可以看到其關鍵就在initExchange函數上;

其主要調用棧如下:

graph TD; a(RealCall::initExchange) --> b(ExchangeFinder::find) b--> c(ExchangeFinder::findHealthyConnection) c --> d(ExchangeFinder::findConnection)
  1. RealCall::initExchange獲得一個Exchange用來發送請求和接收響應;
  2. 在開始處理請求之前,我們需要ExchangeFinder::find來獲取一個對報文編碼和解碼的工具類;
  3. 但是其編碼解碼過程,又依賴於對服務器的連接;

尋找連接的代碼如下:

  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // 復用call中connection
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
      // because we already acquired it.
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      // The call's connection was released.
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    // We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // 嘗試從連接池中獲取
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // Nothing in the pool. Figure out what route we'll try next.
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      // Use a route from a preceding coalesced connection.
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // Use a route from an existing route selection.
      routes = null
      route = routeSelection!!.next()
    } else {
      // Compute a new route selection. This is a blocking operation!
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    // Connect. Tell the call about the connecting call so async cancels work.
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }

其查找過程如下:

  1. 嘗試復用call.connection;
  2. 嘗試在連接池中查找;
  3. 更換路由再次在連接池中查找;
  4. 新建一個RealConnection
networkInterceptors

自定義的攔截器,默認為空元素的列表;

CallServerInterceptor

雖然這是責任鏈的最后一步,但是能講的確實不多,代碼都相對淺顯;

response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()

還有一些對特殊狀態碼的處理,不再贅述;

多個請求的管理

同步和異步

  1. 同步請求過程
  override fun execute(): Response {
    // 對執行狀態進行校驗
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    // 開始計算超時
    timeout.enter()
    // 調用請求開始回調
    callStart()
    try {
      // 將call添加到列表,詳細分析見下面
      client.dispatcher.executed(this)
      // 實際執行請求,上面已詳細講述
      return getResponseWithInterceptorChain()
    } finally {
      // 將call從列表中移除
      client.dispatcher.finished(this)
    }
  }
  1. 異步請求過程
  override fun enqueue(responseCallback: Callback) {
    // 對執行狀態進行校驗
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    // 調用請求開始回調,這個調用感覺有點不對,應該在真正執行請求之前,這里只是在排隊
    callStart()
    // 將call加入到排隊隊列
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
  1. Dispatcher

DispatcherCall的管理者;

下面是它的一些重要字段:

// 最大並發數量
var maxRequests = 64
// 每個域名的最大並發數量
var maxRequestsPerHost = 5
// 線程池,用於執行異步請求
val executorService: ExecutorService
get() {
   if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
 }

  /** 准備執行的異步請求隊列 */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** 正在執行的異步請求隊列 */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** 正在執行的同步請求隊列 */
  private val runningSyncCalls = ArrayDeque<RealCall>()

重要方法:

// 將異步請求添加到排隊隊列
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // 復用RealCall
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    // 開始隊列處理
    promoteAndExecute()
  }
private fun promoteAndExecute(): Boolean {
    // 確認當前線程沒有被鎖
    this.assertThreadDoesntHoldLock()

    // 此次准備執行的異步請求列表
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
		
        // 大於閾值,停止添加
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        // 大於閾值,跳過這個請求
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
		
        // 從排隊隊列中移除
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        // 添加到新的隊列
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    // 一個一個地丟到線程池里執行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM