探索OkHttp系列 (一) 請求的發起與響應


前言

OkHttp是個人使用的比較多的網絡請求庫,但是一直沒有探究它的實現原理,這次就對OkHttp的源碼進行分析,探究其實現原理。

分析的okhttp源碼版本:4.9.2

基本使用

GET

同步地發起請求,會阻塞線程,不能直接在主線程當中調用

    private fun getData() {
        thread {
            val client: OkHttpClient = OkHttpClient()
            val request: Request = Request.Builder()
                .get()
                .url("https://www.baidu.com")
                .build()
            val response: Response = client.newCall(request).execute()
            val data: String? = response.body?.string()
            Log.d(TAG, "OkHttp 同步請求:$data")
        }
    }

獲取OkHttpClient實例,有兩種方法,一是像上面代碼一樣,直接new 一個OkHttpClient對象;二是new 一個OkHttpClientBuilder對象,可以對client配置一些參數,如下

val client: OkHttpClient = OkHttpClient.Builder().build()

異步地發起請求,會自動切換線程

    private fun getData() {
        val client: OkHttpClient = OkHttpClient()
        val request: Request = Request.Builder()
            .get()
            .url("https://www.baidu.com")
            .build()
        client.newCall(request).enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {

            }

            override fun onResponse(call: Call, response: Response) {
                val data: String? = response.body?.string()
                Log.d(TAG, "OkHttp 同步請求:$data")
            }
        })
    }

POST

POST請求,相比於GET請求,多了一個構造RequestBody的步驟,並且請求方法要從get改為post

同步請求,會阻塞線程,不能直接在主線程當中調用

    private fun getData() {
        thread{
            val client: OkHttpClient = OkHttpClient()
            val requestBody: RequestBody = FormBody.Builder()
                .add("username", "EDGNB")
                .add("password", "123456")
                .build()
            val request: Request = Request.Builder()
                .url("https://www.wanandroid.com/user/login")
                .post(requestBody)
                .build()
            val response: Response = client.newCall(request).execute()
            val data: String? = response.body?.string()
            Log.d(TAG, "OkHttp 同步請求:$data")
        }
    }

異步請求

    private fun getData() {
        val client: OkHttpClient = OkHttpClient()
        val requestBody: RequestBody = FormBody.Builder()
            .add("username", "EDGNB")
            .add("password", "123456")
            .build()
        val request: Request = Request.Builder()
            .url("https://www.wanandroid.com/user/login")
            .post(requestBody)
            .build()
        client.newCall(request).enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {

            }

            override fun onResponse(call: Call, response: Response) {
                val data: String? = response.body?.string()
                Log.d(TAG, "OkHttp 同步請求:$data")
            }
        })
    }

對象的創建

OkHttpClientRequest的構造均使用到了建造者Builder模式,調用者可直接通過鏈式調用對這些對象進行自定義初始化。Request類和OkHttpClient類的本質都是一個描述對象。

OkHttpClient

前面提到,構造一個OkHttpClient有兩種方式

// 方式一
val client: OkHttpClient = OkHttpClient()
// 方式二
val client: OkHttpClient = OkHttpClient.Builder().build()

我們查看方式一,它調用了下面的構造方法

  constructor() : this(Builder())

構造了一個Builder實例,並調用了主構造函數,OkHttpClient的成員變量會利用構造的Builder,進行初始化

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {

  val dispatcher: Dispatcher = builder.dispatcher

  val connectionPool: ConnectionPool = builder.connectionPool

  val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()
	...
}

OkHttpClient.Builder定義的屬性如下

  class Builder constructor() {
    // 調度器  
    internal var dispatcher: Dispatcher = Dispatcher()
    // 連接池
    internal var connectionPool: ConnectionPool = ConnectionPool()
    // 整體流程攔截器
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    // 網絡請求攔截器
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    // 流程監聽器
    internal var eventListenerFactory: EventListener.Factory = 
      									EventListener.NONE.asFactory()
    // 連接失敗是否自動重試
    internal var retryOnConnectionFailure = true
    // 服務器認證設置
    internal var authenticator: Authenticator = Authenticator.NONE
    // http是否重定向
    internal var followRedirects = true
    // 是否可以從HTTP重定向到HTTPS
    internal var followSslRedirects = true
    // Cookie策略,是否保存Cookie  
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    // 緩存配置
    internal var cache: Cache? = null
    // Dns配置
    internal var dns: Dns = Dns.SYSTEM
    // 代理
    internal var proxy: Proxy? = null
    // 代理選擇器
    internal var proxySelector: ProxySelector? = null
    // 代理服務器認證設置
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    // socket工廠
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    // sslSocket工廠 用於https
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    // 證書信息管理
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    // 傳輸層版本和連接協議 TLS等
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    // 支持的協議
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
    // 主機名校驗
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    // 證書鏈
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    // 證書確認
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    // 調用超時時間( 0的含義? )
    internal var callTimeout = 0
    // 連接超時時間  
    internal var connectTimeout = 10_000
    // 讀取超時時間
    internal var readTimeout = 10_000
    // 寫入超時時間
    internal var writeTimeout = 10_000
    // 針對HTTP2和web socket的ping間隔
    internal var pingInterval = 0
    internal var minWebSocketMessageToCompress = 
      							RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
    // 路由數據庫
    internal var routeDatabase: RouteDatabase? = null
  	...
  }

Request

Request用於描述單次請求的參數信息,使用方法如下

val request: Request = Request.Builder()
    .get()
    .url("https://www.baidu.com")
    .build()

Request.Builderbuild方法中,利用自身保存的屬性,去構造一個Request對象

    open fun build(): Request {
      return Request(
          checkNotNull(url) { "url == null" },
          method,
          headers.build(),
          body,
          tags.toImmutableMap()
      )
    }

Request.Builder有如下屬性

  open class Builder {
    // 請求url  
    internal var url: HttpUrl? = null
    // 請求方法
    internal var method: String
    // 請求頭
    internal var headers: Headers.Builder
    // 請求體
    internal var body: RequestBody? = null

    /** A mutable map of tags, or an immutable empty map if we don't have any. */
    // 請求tag
    internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
  	...
  }

發起請求

無論是同步請求

val response: Response = client.newCall(request).execute()

還是異步請求

client.newCall(request).enqueue(...)

都需要先調用OkHttpClientnewCall方法,該方法如下

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = 
                                                        false)

該方法返回一個RealCall對象,定義如下

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {...}

它是Call接口的唯一實現類,內部保存了

  • client:也就是前面的OkHttpClient
  • originalRequest:我們前面創建的Request,用於描述單次請求的參數信息
  • forWebSocket:是否使用Web Socket,默認值為false

RealCall包裝了RequestOKHttpClient這兩個類的實例,使得后面的方法中可以很方便的使用這兩者。

插敘:ArrayDeque

我們后面會使用到ArrayDeque,這里先簡單介紹ArrayDeque是什么,以及它的作用

ArrayDequeJDK的一個容器,它有隊列的性質的抽象數據類型,繼承結構如下

image-20211109220101712

Java中,我們使用棧一般會使用Stack類,而隊列的話,Java提供了Queue接口,我們可以使用LinkedList來實現隊列的功能(LinkedList實現了Deque接口,Deque繼承了Queue接口)。

ArrayDeque繼承結構可以看出,它實現了Deque接口,因此它也有隊列的功能,在ArrayDeque類中有這樣的一段注釋

 /*
  * This class is likely to be faster than
  * {@link Stack} when used as a stack, and faster than {@link LinkedList}
  * when used as a queue.
  * /

它表達了兩層意思:

  • 當作為「棧」來使用的時候,它的性能比Stack更快
  • 當作為「隊列」來使用的時候,它的性能比LinkedList更快

Stack類的注釋中,我們也可以看到它推薦我們使用ArrayDeque

/*
 * <p>A more complete and consistent set of LIFO stack operations is
 * provided by the {@link Deque} interface and its implementations, which
 * should be used in preference to this class.  For example:
 * <pre>   {@code
 *   Deque<Integer> stack = new ArrayDeque<Integer>();}</pre>
 */

ArrayDeque實現了Deque接口,Dequedouble-ended queue的縮寫,意為「雙端隊列」,什么意思呢?我們查看Deque定義了哪些方法

public interface Deque<E> extends Queue<E> {
    /**
    * 在 首、尾 添加元素
    */
	void addFirst(E e);
    void addLast(E e);
    boolean offerFirst(E e);
    boolean offerLast(E e);
    
    /**
    * 移除並返回 首、尾 的元素
    */
    E removeFirst();
    E removeLast();
    E pollFirst();
    E pollLast();
    
    /**
    * 返回(但不移除) 首、尾 的元素
    */
    E getFirst();
    E getLast();
    E peekFirst();
    E peekLast();
    
    ...
}

我們在Deque的「首、尾」兩端都可以做元素的「插入、刪除」操作,比如

  • 我們在雙端隊列的隊頭插入,隊頭彈出,那么它就可以作為一個來使用
  • 我們在雙端隊列的隊頭插入,隊尾彈出,那么它就可以作為一個隊列來使用

或者在雙端隊列的隊尾插入,隊頭彈出,也是可以作為一個隊列來使用

ArrayDeque類的注釋中,我們還可以得到下面有關ArrayDeque的信息

  • 沒有容量限制,根據實際需要自動擴容
  • 線程不安全
  • 不允許放入null的元素

另外,ArrayDeque內部是使用循環數組來實現的。

同步請求

同步請求要調用RealCallexecute方法,定義如下

  override fun execute(): Response {
    // 使用CAS,保證這個RealCall對象只發起一次請求  
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      // 將請求添加到任務隊列當中  
      client.dispatcher.executed(this)
      // 通過一系列攔截器的請求處理和響應處理,獲取返回結果,並return  
      return getResponseWithInterceptorChain()
    } finally {
      // 不論是否成功,通知Dispatcher請求完成  
      client.dispatcher.finished(this)
    }
  }

我們首先調用了OkHttpClient的調度器dispatcherexecuted方法,來將請求添加到任務隊列當中,Dispatcher::executed如下

  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

runningSyncCalls是一個ArrayDeque,定義如下

private val runningSyncCalls = ArrayDeque<RealCall>()

前面我們已經介紹過ArrayDeque了,executed方法調用了runningSyncCallsadd方法,我們注意到executed方法使用了同步鎖,使用同步鎖的原因是因為ArrayDeque線程不安全的。

為什么在OkHttp中選擇ArrayDeque這種容器作為任務隊列?

答:因為ArrayDeque效率高,在「插敘:ArrayDeque」中,我們提到ArrayDeque可以作為棧和隊列,並且性能優於StackLinkedList

client.dispatcher.executed(this)所做的事情,就是將這個請求添加到一個雙端隊列中。

回到RealCallbackexecute方法中,又調用了getResponseWithInterceptorChain方法,該方法如下

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    // 按順序添加一系列的攔截器
    val interceptors = mutableListOf<Interceptor>()
    // 添加用戶自定義攔截器  
    interceptors += client.interceptors
    // 添加重定向攔截器
    interceptors += RetryAndFollowUpInterceptor(client)
    // 橋攔截器  
    interceptors += BridgeInterceptor(client.cookieJar)
    // 添加緩存攔截器,從緩存中拿數據
    interceptors += CacheInterceptor(client.cache)
    // 添加網絡連接攔截器,負責建立網絡連接
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    // 服務器請求攔截器,負責向服務器發送請求數據、從服務器讀取響應數據
    interceptors += CallServerInterceptor(forWebSocket)

    // 構建一條責任鏈,注意前面構建的攔截器列表interceptors作為參數傳入
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      // 執行責任鏈,得到請求的響應
      val response = chain.proceed(originalRequest)
      // 判斷請求是否取消,若沒有取消則返回響應
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

getResponseWithInterceptorChain方法會按一定的順序構建攔截器列表interceptors,接着利用interceptors創建RealInterceptorChain對象,該對象是一條責任鏈,使用了責任鏈模式,攔截器會按順序依次調用,每當一個攔截器執行完畢之后會調用下一個攔截器或者不調用並返回結果,我們最終拿到的響應就是這個鏈條執行之后返回的結果。當我們自定義一個攔截器的時候,也會被加入到這個攔截器鏈條里。

最后在RealCallbackexecute方法中調用client.dispatcher.finished(this),通知dispatcher請求完成。

這就是一個同步請求發起與響應的過程,關於Dispatcher和攔截器責任鏈是如何工作的,稍后分析。

小結

發起一個同步請求的語句是

client.newCall(request).execute()
  1. 首先調用OkHttpClientnewCall方法,傳入Request,返回一個RealCall對象
  2. 調用RealCall對象的execute方法。在該方法中,先調用OkHttpClientdispatcherexecuted方法,將請求加入dispatcher中的雙端隊列中;再調用getResponseWithInterceptorChain(),通過攔截器責任鏈獲取響應,最后通知dispatcher請求完成。

異步請求

發起異步請求的方法

client.newCall(request).enqueue(object : Callback {...})

這里也是構造一個RealCall對象,然后調用RealCall對象的enqueue方法,傳入Callback

我們查看RealCallenqueue方法

  override fun enqueue(responseCallback: Callback) {
    // 使用CAS,保證這個RealCall對象只發起一次請求    
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    // 創建AsyncCall對象,然后通過dispatcher分發任務  
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

這里創建了一個AsyncCall對象,構造參數是用戶傳入的Callback,然后調用OkHttpClientdispatcherenqueue方法,傳入我們創建的AsyncCall對象。

Dispatcherenqueue方法如下

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      // 將AsyncCall添加到請求隊列中
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running 
      // call to the same host.
      if (!call.call.forWebSocket) {
        // 尋找同一host的Call  
        val existingCall = findExistingCallWithHost(call.host)
        // 復用Call的callsPerHost
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    // 嘗試執行等待隊列中的任務  
    promoteAndExecute()
  }

readyAsyncCalls同樣是一個雙端隊列,其類型為ArrayDeque<AsyncCall>(),注意,同步任務是添加到runningSyncCalls當中,異步任務是添加到readyAsyncCalls當中。

Dispatcher::promoteAndExecute如下

  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    // 創建一個待執行任務列表  
    val executableCalls = mutableListOf<AsyncCall>()
    // 標識是否有任務在執行
    val isRunning: Boolean
    synchronized(this) {
      // 前面就是將異步任務AsyncCall添加到readyAsyncCalls當中 
      val i = readyAsyncCalls.iterator()
      // 遍歷異步任務等待隊列
      while (i.hasNext()) {
        val asyncCall = i.next()

        // 當前總請求數沒有達到上限
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        // 當前同一主機名請求數沒有達到上限  
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
		// 從等待隊列中移除AsyncCall
        i.remove()
        // 同一主機名請求數+1
        asyncCall.callsPerHost.incrementAndGet()
        // 將該請求添加到待執行任務列表
        executableCalls.add(asyncCall)
        // 將該請求添加到執行任務隊列
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    // 遍歷待執行任務列表,傳入線程池執行任務
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

到現在為止,我們接觸到了三個隊列,在這里區分一下

  • runningSyncCalls:類型是ArrayDeque<RealCall> ,「同步任務」的執行隊列
  • readyAsyncCalls:類型是ArrayDeque<AsyncCall>,「異步任務」的等待隊列
  • runningAsyncCalls:類型是ArrayDeque<AsyncCall>,「異步任務」的執行隊列

它們都是ArrayDeque,雙端隊列。

AsyncCall表示一個異步任務,它是RealCall的一個內部類,並且實現了Runnable接口,定義如下

  internal inner class AsyncCall(
    // responseCallback就是用戶傳入的獲取處理結果的回調  
    private val responseCallback: Callback
  ) : Runnable {
	
     ...

    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to 
     * clean up if the executor has been shut down by reporting the call as failed.
     */
    // 對外暴露的接口,通過傳入一個線程池來執行該任務  
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        // 嘗試在線程池中執行該任務  
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        // 線程池拒絕執行該任務,拋出異常  
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        // 回調onFailure方法,通知用戶執行失敗  
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          // 通知dispatcher請求完成  
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          // 通過攔截器責任鏈獲取請求響應  
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          // 通過回調,返回響應結果給用戶  
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            // 出現異常,回調onFailure方法,通知用戶執行失敗    
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            // 出現異常,回調onFailure方法,通知用戶執行失敗      
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          // 通知dispatcher請求完成  
          client.dispatcher.finished(this)
        }
      }
    }
  }

異步線程池

異步請求在線程池中執行,我們向AsyncCallexecuteOn方法傳入的參數就是「異步任務執行的線程池」,傳入的參數是DispatcherexecutorServiceexecutorService對應的線程池可以在構造Dispatcher的時候指定,若用戶沒有指定線程池,那么默認是使用下面的線程池

  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

構造線程池的幾個參數如下

  • 核心線程數corePoolSize:表示線程池中空閑線程也不會被回收的數量,這里設置為0說明線程池中所有空閑線程都會被回收。
  • 最大線程數maximumPoolSize:線程池最大支持創建的線程數,這里設置為Int.MAX_VALUE
  • 線程存活時間keepAliveTime:非核心線程空閑后超過該時間就會被回收,這里設置為60個時間單位(60s)。
  • 時間單位TimeUnit:空閑線程存活的時間單位,這里設置為秒TimeUnit.SECONDS
  • 線程池中的任務隊列workQueue:該隊列主要用來存儲已經被提交但是尚未執行的任務,這里設置為SynchronousQueue()
  • 線程工廠ThreadFactory:線程的創建工廠,這個線程工廠指定了創建的線程的「名字」,以及創建的線程「非守護線程」。

對於上面指定的參數,我們思考下面的幾個問題:

為什么要選用SynchronousQueue作為任務隊列?

首先我們要了解一下SynchronousQueue是什么,它是一個隊列,但它內部不存在任何的容器,它是一種經典的生產者-消費者模型,它有多個生產者和消費者,當一個生產線程進行生產操作(put)時,若沒有消費者線程進行消費(take),那么該線程會阻塞,直到有消費者進行消費。也就是說,它僅僅實現了一個傳遞的操作,這種傳遞功能由於沒有了中間的放入容器,再從容器中取出的過程,因此是一種快速傳遞元素的方式,這對於我們網絡請求這種高頻請求來說,是十分合適的。關於 SynchronousQueue 可以看這篇文章:java並發之SynchronousQueue實現原理_-CSDN博客.

線程池的最大線程數不受限制,可以無限制的進行並發網絡請求嗎?

OkHttp的設計中,線程個數的維護工作不再交給線程池,而是由Dispatcher實現,通過外部設置的maxRequests以及maxRequestsPerHost來調整異步請求的等待隊列以及執行隊列,從而實現對線程最大數量的控制。Dispatcher具體實現將在后面分析。

小結

發起一個異步請求的語句是

client.newCall(request).enqueue(object : Callback {...})
  1. 首先調用OkHttpClientnewCall方法,傳入Request,返回一個RealCall對象
  2. 調用RealCall對象的enqueue方法,並傳入結果回調Callback
  3. ReallCall::enqueue中,利用Callback構造了一個AsyncCall對象,並調用了OkHttpClientdispatcherenqueue方法,將AsyncCall作為參數傳入
  4. Dispatcher::enqueue中,先將AsyncCall添加到請求隊列當中,然后調用自己的promoteAndExecute方法,嘗試執行等待隊列中的任務
  5. Dispatcher::promoteAndExecute方法會遍歷異步任務的等待隊列,然后嘗試在線程池中去執行這些異步任務

異步任務的執行,也是調用getResponseWithInterceptorChain方法,通過攔截器責任鏈獲取請求的響應,這一點和「同步請求」是一樣的。

Dispatcher

前面多次出現了調度器Dispatcher的身影,這里對它做一個總結。

維護的隊列

Dispatcher中維護了三個隊列:

  • runningSyncCalls:類型是ArrayDeque<RealCall> ,「同步任務」的執行隊列
  • readyAsyncCalls:類型是ArrayDeque<AsyncCall>,「異步任務」的等待隊列
  • runningAsyncCalls:類型是ArrayDeque<AsyncCall>,「異步任務」的執行隊列

請求相關

同步請求中,RealCallexecute方法會調用到Dispatcherexecuted方法,將「同步任務」保存到runningSyncCalls

  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

異步請求中,RealCallenqueue方法會調用Dispatcherenqueue方法

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      // 將AsyncCall添加到請求隊列中
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running 
      // call to the same host.
      if (!call.call.forWebSocket) {
        // 尋找同一host的Call  
        val existingCall = findExistingCallWithHost(call.host)
        // 復用Call的callsPerHost
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    // 嘗試執行等待隊列中的任務  
    promoteAndExecute()
  }

Dispatcherenqueue方法中,首先將「異步任務」添加到readyAsyncCalls中,然后調用promoteAndExecute()方法,該方法會遍歷readyAsyncCalls,尋找能夠執行的AsyncCall,能夠執行的條件是「當前總請求數」以及「同一主機名的請求數」都沒有達到上限,最后調用能夠執行的AsyncCallexecuteOn方法,在Dispatcher提供的線程池中執行異步任務。

通知完成

前面我們已經知道,不管是同步請求,還是異步請求,在請求完成后,都會調用Dispatcherfinished方法,通知Dispatcher請求已執行完畢。我們前面沒有展開分析Dispatcher被通知后會做些什么,這里就對它進行分析。

「同步請求」完成后會調用Dispatcher下面的方法

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

「異步請求」完成后會調用Dispatcher下面的方法

  /** Used by [AsyncCall.run] to signal completion. */
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

「異步請求」調用的finished比「同步請求」調用的finished多了一個callsPerHost減1的操作,也就是「主機名請求數」減1的操作。接着,它們都會調用到Dispatcher下面的這個重載方法

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      // 將「同步任務/異步任務」從「執行隊列」當中移除  
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    // 嘗試執行「等待隊列」中的「異步任務」   
    val isRunning = promoteAndExecute()

    // isRunning:在「執行隊列」中有「同步/異步任務」待執行
    // idleCallback:Dispatcher空閑時的回調
    // 若「執行隊列」中沒有待執行的任務,表明當前的Dispatcher是空閑的,這時候,如果idleCallback
    // 不為null,就執行idleCallback的run方法  
    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

小結

Dispatcher 在 網絡請求發起過程中的總結:

  1. Dispatcher中維護三個任務隊列,實現方式都為雙端隊列,分別為同步請求執行隊列,異步請求等待隊列,異步請求執行隊列。
  2. 當發起一個同步/異步請求,就會將該請求添加到同步請求執行隊列/異步請求等待隊列中,然后嘗試執行這些請求。
  3. 請求執行完畢就會將這些請求從隊列中移除,若后面沒有需要執行的任務后,調用idleCallback.run()執行一些空閑任務。
  4. 異步請求執行有兩個入口:一是添加了一個異步請求,二是一個請求執行完畢。

請求時序圖

同步請求的時序圖:

image-20211113011234986

異步請求的時序圖:

image-20211113011738025

請求響應

無論是同步請求,還是異步請求,最終都會調用到RealCallgetResponseWithInterceptorChain方法,通過該方法可以拿到請求的響應。關於getResponseWithInterceptorChain方法,我們需要探究兩個關鍵的點

  • 核心機制:攔截器機制
  • 設計模式:在攔截器中如何運用責任鏈模式

什么是責任鏈模式?在典型的責任鏈設計模式里,很多對象由每一個對象對其下級的引用而連接起來形成一條鏈。請求在這個鏈上傳遞,直到鏈上的某一個對象決定處理此請求。發出這個請求的客戶端並不知道鏈上的哪一個對象最終處理這個請求,這使得系統可以在不影響客戶端的情況下動態地重新組織和分配責任。

我們回顧下getResponseWithInterceptorChain方法

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    // 按順序添加一系列的攔截器
    val interceptors = mutableListOf<Interceptor>()
    // 添加用戶自定義攔截器  
    interceptors += client.interceptors
    // 添加重定向攔截器
    interceptors += RetryAndFollowUpInterceptor(client)
    // 橋攔截器  
    interceptors += BridgeInterceptor(client.cookieJar)
    // 添加緩存攔截器,從緩存中拿數據
    interceptors += CacheInterceptor(client.cache)
    // 添加網絡連接攔截器,負責建立網絡連接
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    // 服務器請求攔截器,負責向服務器發送請求數據、從服務器讀取響應數據
    interceptors += CallServerInterceptor(forWebSocket)

    // 構建一條責任鏈,注意前面構建的攔截器列表interceptors作為參數傳入
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      // 執行責任鏈,得到請求的響應
      val response = chain.proceed(originalRequest)
      // 判斷請求是否取消,若沒有取消則返回響應
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

該方法會按一定的順序構建攔截器列表interceptors,接着利用interceptors創建RealInterceptorChain對象,該對象是一條責任鏈,使用了責任鏈模式,攔截器會按順序依次調用,每當一個攔截器執行完畢之后會調用下一個攔截器或者不調用並返回結果,我們最終拿到的響應就是這個鏈條執行之后返回的結果。當我們自定義一個攔截器的時候,也會被加入到這個攔截器鏈條里。

Interceptor

我們先看下攔截器Interceptor的定義

fun interface Interceptor {
  @Throws(IOException::class)
  fun intercept(chain: Chain): Response

  companion object {
	// 利用lambda,創建一個攔截器
    inline operator fun invoke(crossinline block: (chain: Chain) -> Response): 
      Interceptor = Interceptor { block(it) }
  }

  interface Chain {
    fun request(): Request
    @Throws(IOException::class)
    fun proceed(request: Request): Response
    fun connection(): Connection?
    fun call(): Call
    fun connectTimeoutMillis(): Int
    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
    fun readTimeoutMillis(): Int
    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
    fun writeTimeoutMillis(): Int
    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }
}

該接口里面定義了intercept方法,還有一個Chain接口,攔截器鏈RealInterceptorChain就是實現了Chain接口。

雖然不同的攔截器實現不同,但是它們往往具有相同的結構:

  @Throws(IOException::class)
  override fun intercept(chain: Chain): Response {
    // 獲取Request  
    val request = chain.request
    // 處理:Request階段,攔截器對Request進行處理

    // 調用RealInterceptorChain的proceed方法,在該方法里面,會遞歸調用下一個攔截器的intercept
    // 方法,拿到下一個攔截器的response  
    val response = (chain as RealInterceptorChain).proceed(request)

    // 處理:Response階段,完成了該攔截器獲取Response的過程,將Response返回到上一個攔截器中
    return response
}

攔截器鏈的整體執行流程

我們先來看一個生活中的例子,以便更好地理解「攔截器鏈」是如何工作的。下面是一個流水線生產的例子

image-20211114011825133

對應到OkHttp的響應過程就是:包裝了RequestChain遞歸的從每個Interceptor手中走過去,最后請求網絡得到的Response又會逆序的從每個Interceptor走回來,把Response返回到開發者手中。

我們來看看攔截器鏈RealInterceptorChain的工作流程

class RealInterceptorChain(
  internal val call: RealCall,
  private val interceptors: List<Interceptor>,
  private val index: Int,
  internal val exchange: Exchange?,
  internal val request: Request,
  internal val connectTimeoutMillis: Int,
  internal val readTimeoutMillis: Int,
  internal val writeTimeoutMillis: Int
) : Interceptor.Chain {

  ...

  internal fun copy(
    index: Int = this.index,
    exchange: Exchange? = this.exchange,
    request: Request = this.request,
    connectTimeoutMillis: Int = this.connectTimeoutMillis,
    readTimeoutMillis: Int = this.readTimeoutMillis,
    writeTimeoutMillis: Int = this.writeTimeoutMillis
  ) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,
      readTimeoutMillis, writeTimeoutMillis)

  ...
    
  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++
    ...
    // Call the next interceptor in the chain.
    // copy出一個攔截器鏈,並且將index設置為(index+1),便於之后「調用責任鏈的下一個攔截器」  
    val next = copy(index = index + 1, request = request)
    // 獲取當前的index對應的攔截器  
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    // 執行當前攔截器的intercept方法
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")
    ...
    return response
  }
}

要很好地理解攔截器鏈的工作流程,需要將RealInterceptorChainproceed方法與前面攔截器的intercept方法結合起來看。我們理一下它們的工作流程,其中RealInterceptorChain簡稱為Chain

  • RealCallgetResponseWithInterceptorChain方法中,先調用index為0的Chainproceed方法,並傳入originalRequest
  • proceed方法里,copy出一個Chain(index+1(1),request),然后調用當前index(0)Interceptorintercept方法,並且傳入新copy出來的Chain
  • Interceptorintercept方法,從Chain當中獲取request,然后對request添加一些自己的處理,接着調用Chainproceed方法,傳入處理后的request,等待返回response
  • proceed方法里,copy出一個Chain(index+1(2),request),然后調用當前index(1)Interceptorintercept方法,並且傳入新copy出來的Chain
  • Interceptorintercept方法,從Chain當中獲取request,然后對request添加一些自己的處理,接着調用Chainproceed方法,傳入處理后的request,等待返回response
  • proceed方法里,copy出一個Chain(index+1(3),request),然后調用當前index(2)Interceptorintercept方法,並且傳入新copy出來的Chain
  • ......
  • 最后一個Interceptorintercept方法處理完后,向上返回處理后的Response
  • 倒數第二個Interceptorintercept方法中,對返回的Response進行一些加工后向上返回
  • 倒數第三個Interceptorintercept方法中,對返回的Response進行一些加工后向上返回
  • ......
  • 最終在RealCallgetResponseWithInterceptorChain方法中,獲取到最終Interceptor處理完后的Response,然后將該Response傳給用戶,通知用戶請求成功完成。

copy攔截器鏈的時候,下標index設置為index+1就是為了讓攔截器責任鏈能准備地從攔截器容器中取出下一個攔截器進行處理。

整個工作過程就是,當前攔截器對Request處理完畢后,通過調用傳入的責任鏈的 proceed 方法,將請求交給下一個攔截器處理,然后獲得下一個攔截器返回的Response,再對Response處理后交給上一個攔截器。

顯然在這里使用了一種遞歸設計,請求從上往下交付,每個攔截器拿到請求都可以對請求做出相應處理,然后交給下一個攔截器,最后一個攔截器處理請求拿到響應后,將響應從下往上交付,每個攔截器拿到響應后再對響應做出響應處理,然后交給上一個攔截器。若中間的過程中若出現了錯誤,則通過拋出異常來通知上層。

「中間的某個攔截器」也可以選擇不將請求Request交給下一級攔截器處理,這時候「該攔截器」直接返回Response給上一級的攔截器即可。

Interceptor順序

RealCallgetResponseWithInterceptorChain方法中,我們可以看出OkHttp預置了哪些Interceptor、用戶可以自定義哪些Interceptor,以及這些Interceptor之間的順序

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )
	...
  }

OkHttp中預置了下面幾種Interceptor

  • RetryAndFollowUpInterceptor:負責實現重定向功能
  • BridgeInterceptor:將用戶構造的請求轉換為向服務器發送的請求,將服務器返回的響應轉換為對用戶友好的響應
  • CacheInterceptor:讀取緩存、更新緩存
  • ConnectInterceptor:建立與服務器的連接
  • CallServerInterceptor:從服務器讀取響應

可以看出,整個網絡請求的過程由各個攔截器互相配合從而實現,通過這種攔截器的機制,可以很方便地調節網絡請求的過程及先后順序,同時也能夠很方便地使用戶對其進行擴展。

其中用戶可以在兩個時機插入 Interceptor

  • 網絡請求前后:通過 OkHttpClient.Builder.addInterceptor 方法添加
  • 讀取響應前后:通過 OkHttpClient.Builder.addNetworkInterceptor 方法添加

其整體流程如圖所示:

image-20211121121326686

總結

OkHttp采用了攔截器機制責任鏈模式,它使用多個負責不同功能的攔截器,並將這些攔截器通過責任鏈連接在一起,采用遞歸的方式進行調用,每一層的攔截器在請求前和響應后都可以對 請求/響應 做出不同的處理,通過各個攔截器的協調合作,完成了網絡請求的響應過程。

參考

  1. OkHttp源碼分析系列(一)請求的發起與響應 - 養生達人不熬夜.
  2. java並發之SynchronousQueue實現原理_-CSDN博客.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM