探索OkHttp系列 (五) 連接建立與復用


前言

上一篇文章我們介紹了CacheInterceptor攔截器,這篇文章我們要介紹的攔截器是ConnectInterceptor,該攔截器的作用是獲得一個健康可用的與目標服務器的連接,然后就將請求交給下一個攔截器處理。

該攔截器的內部實現非常的復雜,涉及到OkHttp許多的機制,例如路由選擇機制、連接的建立與復用機制,我們在下面的分析中,先對其大體流程進行分析,然后再一個一個點地深入分析,逐個突破。

ConnectInterceptor::intercept

ConnectInterceptor::intercept

/**
 * Opens a connection to the target server and proceeds to the next interceptor. The network might
 * be used for the returned response, or to validate a cached response with a conditional GET.
 */
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}

從該方法的注釋可以看出,該攔截器的主要作用是:打開一個到目標服務器的連接,然后將請求交給下一個攔截器處理。這個連接其實就是TCP連接,用於Http的請求和響應

獲取連接的大體流程

intercept方法的關鍵代碼是RealCall::initExchange,該方法會返回一個Exchange對象,Exchange是什么呢?我們后面會提到。

RealCall::initExchange

  /** Finds a new or pooled connection to carry a forthcoming request and response. */
  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }

    val exchangeFinder = this.exchangeFinder!!
    // 創建ExchangeCodec對象  
    val codec = exchangeFinder.find(client, chain)
    // 利用ExchangeCodec實例,創建了一個Exchange對象  
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    // 返回創建的Exchange對象
    return result
  }

從該方法的注釋可以看出:該方法用於找到一個新的、或者連接池里一個健康可用的連接,來承載一個即將到來的請求和響應。上面出現了ExchangeCodecExchange兩個類,分別查看它們的注釋。

ExchangeCodec

ExchangeCodec

/** Encodes HTTP requests and decodes HTTP responses. */
interface ExchangeCodec {
	...
}

它是一個接口,負責對Http請求進行編碼和對Http響應進行解碼,它有兩個實現類,分別是Http1ExchangeCodecHttp2ExchangeCodec。以Http1ExchangeCodec為例,查看其中的writeRequestHeaders方法

  override fun writeRequestHeaders(request: Request) {
    val requestLine = RequestLine.get(request, connection.route().proxy.type())
    writeRequest(request.headers, requestLine)
  }
  ...	
  fun writeRequest(headers: Headers, requestLine: String) {
    check(state == STATE_IDLE) { "state: $state" }
    sink.writeUtf8(requestLine).writeUtf8("\r\n")
    for (i in 0 until headers.size) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n")
    }
    sink.writeUtf8("\r\n")
    state = STATE_OPEN_REQUEST_BODY
  }  	

這就是其中的一個編碼操作,負責將Request轉化為報文的格式,以便發送給服務端。另外,ExchangeCodec包含了一個RealConnection對象。

Exchange

Exchange

/**
 * Transmits a single HTTP request and a response pair. This layers connection management and events
 * on [ExchangeCodec], which handles the actual I/O.
 */
class Exchange(...){
    ...
}

該類傳輸單個的Http請求和響應對,負責連接管理的工作。Exchange類的內部包含了一個ExchangeCodec對象,我們查看Exchange類內部的writeRequestHeaders方法

  fun writeRequestHeaders(request: Request) {
    try {
      eventListener.requestHeadersStart(call)
      // 調用了ExchangeCodec對象的方法  
      codec.writeRequestHeaders(request)
      eventListener.requestHeadersEnd(call, request)
    } catch (e: IOException) {
      eventListener.requestFailed(call, e)
      trackFailure(e)
      throw e
    }
  }

可以看到,它內部的I/O操作,實際上是調用了ExchangeCodec對象的方法來實現的,我們可以認為Exchange是封裝ExchangeCodec的一個工具類,Exchange負責連接管理,而ExchangeCodec負責處理實際的I/O

ExchangeFinder::find

RealCall::initExchange調用了ExchangeFinder::find,獲取一個ExchangeCodec對象,我們查看該方法

  fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      // 獲取一個健康可用的連接  
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      // 利用獲取的連接,創建了一個ExchangeCodec對象  
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

上面的方法中,獲取了一個健康可用的連接,並且利用該連接,創建了一個編碼解碼的ExchangeCodec對象,有了與服務器的連接處理I/O的ExchangeCodec對象,我們其實就可以和服務器進行通信了。

ExchangeFinder::findHealthyConnection

上面調用了ExchangeFinder::findHealthyConnection方法,獲取了一個健康可用的連接,該方法如下

  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      // 獲取一個可用的連接,RealConnection對象代表一個連接  
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Confirm that the connection is good.
      // 檢查連接是否健康,該連接是否已准備好去承載新的流  
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      // If it isn't, take it out of the pool.
      // 如果該連接不健康,就給該連接做一個標記,不再使用該連接  
      candidate.noNewExchanges()

      // Make sure we have some routes left to try. One example where we may exhaust all the routes
      // would happen if we made a new connection and it immediately is detected as unhealthy.
      // 確保我們還有可嘗試的路由  
      if (nextRouteToTry != null) continue

      val routesLeft = routeSelection?.hasNext() ?: true
      if (routesLeft) continue

      val routesSelectionLeft = routeSelector?.hasNext() ?: true
      if (routesSelectionLeft) continue

      // 已耗盡所有的路由  
      throw IOException("exhausted all routes")
    }
  }

從該方法的注釋可以看出:該方法會查找到一個可用且健康的連接並將其返回,如果找到的可用連接是不健康的,那么會一直重復查找可用連接的這個過程,直到一個可用且健康的連接被找到

該方法內部有一個while(true)的循環,在循環代碼里面,會不斷地去獲取一個可用連接,並檢查該連接是否健康,如果該連接健康,就將其返回,如果連接不健康,在有可嘗試的路由的前提下,會重復前面查找可用連接的過程。注意,這里說的可用和健康是兩個不同的指標

判斷連接是否健康使用了RealConnection::isHealthy方法,如下

  // 如果該連接准備好去承載新的流,就返回true
  fun isHealthy(doExtensiveChecks: Boolean): Boolean {
    assertThreadDoesntHoldLock()

    val nowNs = System.nanoTime()

    val rawSocket = this.rawSocket!!
    val socket = this.socket!!
    val source = this.source!!
    // 判斷socket是否可用  
    if (rawSocket.isClosed || socket.isClosed || socket.isInputShutdown ||
            socket.isOutputShutdown) {
      return false
    }

    // 如果是HTTP/2連接,檢測該HTTP/2連接是否是健康的  
    val http2Connection = this.http2Connection
    if (http2Connection != null) {
      return http2Connection.isHealthy(nowNs)
    }
	  
    val idleDurationNs = synchronized(this) { nowNs - idleAtNs }
    // 若空閑時間達到某個值,則檢測socket是否是健康的
    if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
      return socket.isHealthy(source)
    }

    return true
  }

ExchangeFinder::findConnection

上面調用了findConnection方法,該方法用於獲取一個可用連接。該方法的實現邏輯較為復雜,我們先介紹它的大體執行流程,后面再對它的各個點進行詳細地分析

  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   *
   * This checks for cancellation before each blocking operation.
   */
  // 返回一個連接去承載新的流,優先使用現有連接,接着是連接池中的連接,最后是創建一個新的連接
  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    // 檢查取消事件  
    if (call.isCanceled()) throw IOException("Canceled")

    // 1.嘗試去重用call的連接  
    val callConnection = call.connection 
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        // 檢查這個連接是否可用和可復用  
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          // 連接不可用,在call中移除該連接,並返回該連接對應的Socket,隨后要關閉它  
          toClose = call.releaseConnectionNoEvents()
        }
      }

      // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
      // because we already acquired it.
      // 如果連接可以使用,那么就返回該連接  
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      // The call's connection was released.
      // 關閉Socket  
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    // We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // 2.嘗試從連接池中獲取連接(第一次)
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      // 返回連接
      return result
    }

    // Nothing in the pool. Figure out what route we'll try next.
    // 連接池里沒有東西,計算下一條要嘗試的路由  
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      // Use a route from a preceding coalesced connection.
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // 從現有的routeSelection中獲取一個路由  
      routes = null
      route = routeSelection!!.next()
    } else {
      // 計算一個新的routeSelector,這是一個阻塞操作  
      var localRouteSelector = routeSelector
      // 如果routeSelector為null,那么就先創建一個RouteSelector  
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      // 從routeSelector中獲取一個新的routeSelection  
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      // 獲取routeSelection中的路由列表  
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      // 3.現在我們有了一組IP地址,再次嘗試從連接池中獲取連接,由於連接合並,這次我們有更大的希望
      // 從連接池里獲取一個連接(第二次)  
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        // 返回連接
        return result
      }

      // 從routeSelection中獲取一個路由,用於新連接的創建  
      route = localRouteSelection.next()
    }

    // Connect. Tell the call about the connecting call so async cancels work.
    // 4.創建新連接  
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      // 執行TCP+TLS握手(Https請求才會做TLS握手),這是一個阻塞的操作    
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!
    // 5.如果有另一個調用也是連接到相同的主機,並且該調用已經創建了新連接,將連接放到了連接池里,
    // 那么就使用連接池里的連接(第三次)  
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      // 保存路由  
      nextRouteToTry = route
      // 將前面新創建的連接的Socket關閉  
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      // 返回連接池中的連接
      return result
    }

    /* 第三次在連接池中獲取連接,依然沒找到,意味着要使用新創建的連接 */  
        
    synchronized(newConnection) {
      // 6.將先創建的連接放進連接池里面
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    // 返回新創建的連接  
    return newConnection
  }

獲取一個可用的連接,分為了5步

  1. 重用call當中的連接
  2. 第一次嘗試從連接池獲取連接
  3. 第二次嘗試從連接池獲取連接
  4. 自己新創建一個連接
  5. 第三次嘗試從連接池獲取連接

重用call的連接的邏輯:在獲取了call的連接之后,對該連接做了兩個判斷,分別是

  • 判斷是否不再接受新的連接
  • 判斷和當前請求是否有相同的主機名和端口號

關於第二點,既然要復用連接,那么就需要「該請求需要連接到的地方」和「已有連接指向的地方」是同一個地方,同一個地方怎么判斷?通過主機名和端口號判斷。

如果call中的連接無法復用,就會調用callreleaseConnectionNoEvents方法,釋放該連接,如下

  # RealCall
  internal fun releaseConnectionNoEvents(): Socket? {
    val connection = this.connection!!
	...
    // 將RealCall的connection屬性置空  
    this.connection = null
	...
  }

還有一個問題,為什么有可能在call中就已經存在了一個連接呢,我們才剛開始尋找連接呢?還記得我們前面提到的RetryAndFollowUpInterceptor攔截器嗎?當請求失敗需要重試或者重定向的時候,這時候連接還在呢,是可以直接進行復用的

在上面獲取連接的時候,還存在的問題是

  • 為什么要三次從連接池當中獲取連接?它們之間有什么區別?
  • 出現了routerouteSelectionrouteSelector的概念,它們分別是什么?

要解答上面的問題,我們需要一些前置知識:OkHttp中的代理與路由、Http2的合並連接機制。

代理與路由

代理即代理服務器(Proxy Server),代理服務器是介於客戶端和服務器之間的一台服務器,客戶端發送給服務器的請求都由代理服務器進行轉發,如果沒有代理,則客戶端直接與服務器進行交互。通過代理服務器,客戶端可以隱藏身份,防止受到外來攻擊。

OkHttp中出現兩種代理類型:

  • HTTP代理:能夠代理客戶端進行HTTP訪問,主要是代理瀏覽器訪問網頁,它的端口號一般為808080
  • SOCKS代理:SOCKS代理與其他類型的代理不同,它只是簡單地傳遞數據包,並不關心是何種應用協議,因此SOCKS代理服務器比其他類型的代理服務器速度要快得多。

其中SOCKS4只支持TCP協議,而SOCKS5既支持TCP協議又支持UDP協議。

OkHttp中,對於SOCKS代理,代理服務器完成TCP數據包的轉發工作,而HTTP代理,除了轉發數據之外,還會解析HTTP的請求及響應,並根據請求及響應的內容做一些處理。

代理

Proxy

Java中,通過Java.net.Proxy類來描述一個代理服務器:

public class Proxy {
    public enum Type {
        // 不使用代理    
        DIRECT,
        // HTTP代理    
        HTTP,
        // SOCKS代理    
        SOCKS
    };
	
    // 代理類型
    private Type type;
    // Socket地址
    private SocketAddress sa;
    ...
}

該類主要包含了代理類型以及代理服務器對應的SocketAddress,其中代理類型有三種:

  • DIRECT:不使用代理
  • HTTP:使用HTTP代理
  • SOCKS:使用SOCKS代理

ProxySelector

ProxySelector可以根據用戶傳入的URI,返回該URI對應的代理服務器列表,即List<Proxy>

public abstract class ProxySelector {
    private static ProxySelector theProxySelector;

    static {
        try {
            Class<?> c = Class.forName("sun.net.spi.DefaultProxySelector");
            if (c != null && ProxySelector.class.isAssignableFrom(c)) {
                // 默認的代理選擇器
                theProxySelector = (ProxySelector) c.newInstance();
            }
        } catch (Exception e) {
            theProxySelector = null;
        }
    }

    // getDefault方法用於獲取當前已注冊的代理選擇器,若用戶沒有注冊代理選擇器,那么該方法返回一個
    // 一個系統提供的代理選擇器
    public static ProxySelector getDefault() {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(SecurityConstants.GET_PROXYSELECTOR_PERMISSION);
        }
        return theProxySelector;
    }

    // setDefault方法用於注冊一個代理選擇器
    public static void setDefault(ProxySelector ps) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(SecurityConstants.SET_PROXYSELECTOR_PERMISSION);
        }
        theProxySelector = ps;
    }

    // 子類實現該方法,外界通過調用select方法,可以獲取該URI所有適用的代理
    public abstract List<Proxy> select(URI uri);

    // 子類實現該方法,外界調用該方法,通知代理服務器不可用
    public abstract void connectFailed(URI uri, SocketAddress sa, IOException ioe);
}

ProxySelector是一個抽象類,在OkHttp中只有一個實現類NullProxySelector

object NullProxySelector : ProxySelector() {
  override fun select(uri: URI?): List<Proxy> {
    requireNotNull(uri) { "uri must not be null" }
    return listOf(Proxy.NO_PROXY)
  }

  override fun connectFailed(uri: URI?, sa: SocketAddress?, ioe: IOException?) {
  }
}

無論外界傳入什么URI,該代理選擇器都會返回一個包含Proxy.NO_PROXY的列表,Proxy.NO_PROXYProxy內部已經預定義好的,如下

public class Proxy {
    ...
    public final static Proxy NO_PROXY = new Proxy();
    
    private Proxy() {
        type = Type.DIRECT;
        sa = null;
    }
    ...    
}

可以看出,Proxy.NO_PROXY表示的代理類型為Type.DIRECT,也就是不使用代理。

所以,對於NullProxySelector,外界調用它的select方法,無論傳入的URI是什么,它都會返回一個不使用代理的Proxy NO_PROXY

用戶可以在初始化OkHttpClient的時候,對proxyproxySelector進行配置

  @get:JvmName("proxy") val proxy: Proxy? = builder.proxy

  @get:JvmName("proxySelector") val proxySelector: ProxySelector =
      when {
        // Defer calls to ProxySelector.getDefault() because it can throw a SecurityException.
        builder.proxy != null -> NullProxySelector
        else -> builder.proxySelector ?: ProxySelector.getDefault() ?: NullProxySelector
      }
用戶設置的Proxy 用戶設置的ProxySelector OkHttpClient中的Proxy OkHttpClient中的ProxySelector
null null null 系統提供的ProxySelectorNullProxySelector
null ProxySelector null 用戶設置的ProxySelector
Proxy null Proxy NullProxySelector
Proxy ProxySelector Proxy NullProxySelector

路由

Route

OkHttp中抽象出Route來描述網絡數據包的傳輸路徑,最主要還是描述直接與其建立TCP連接的目標端點,它表示一個路由信息

class Route(
  // 記錄請求url相關信息,包括請求的源服務器主機名、端口等信息  
  @get:JvmName("address") val address: Address,
  // 此路由的代理服務器信息  
  @get:JvmName("proxy") val proxy: Proxy,
  // 連接目標地址  
  @get:JvmName("socketAddress") val socketAddress: InetSocketAddress
) {
	...
}

Route中主要記錄了這條路由通過的代理服務器信息Proxy、連接目標地址InetSocketAddress,根據代理協議的不同,這里的InetSocketAddress會有不同的含義:

  • 不使用代理:它包含的信息是HTTP服務器經過了DNS解析的IP地址以及協議的端口號。
  • HTTP代理:它包含的信息是代理服務器經過DNS解析的IP地址以及端口號。
  • SOCKS代理:它包含的信息是HTTP服務器的域名和協議端口號。

RouteSelector

OkHttp中通過RouteSelector類來管理所有的路由信息,並選擇可用路由。RouteSelector的主要工作:

  1. 收集所有可用的路由
  2. 選擇可用的路由
  3. 維護連接失敗的路由信息

收集所有可用的路由

收集路由分為兩個步驟:

  1. 收集所有的代理

  2. 收集特定代理服務器選擇情況下的所有路由

    a. 收集該代理對應的連接目標地址

    b. 利用address、代理、連接目標地址等信息,構建路由

先看第一步如何收集所有的代理

class RouteSelector(
  private val address: Address,
  private val routeDatabase: RouteDatabase,
  private val call: Call,
  private val eventListener: EventListener
) {
  /* State for negotiating the next proxy to use. */
  private var proxies = emptyList<Proxy>()
  private var nextProxyIndex: Int = 0

  /* State for negotiating the next socket address to use. */
  private var inetSocketAddresses = emptyList<InetSocketAddress>()

  /* State for negotiating failed routes */
  private val postponedRoutes = mutableListOf<Route>()

  init {
    resetNextProxy(address.url, address.proxy)
  }
  ...  
}

RouteSelector的構造方法中,調用到了resetNextProxy,該方法就是用於收集所有代理,該方法傳入了兩個參數,分別是address.urladdress.proxy,其實就是用戶要請求的url和用戶手動設置的代理,后面會提到這個address的構造時機

  private fun resetNextProxy(url: HttpUrl, proxy: Proxy?) {
    fun selectProxies(): List<Proxy> {
      // 如果用戶指定了代理,則使用用戶指定的代理
      if (proxy != null) return listOf(proxy)

      // 如果URI缺少了host,那么就不使用代理  
      val uri = url.toUri()
      if (uri.host == null) return immutableListOf(Proxy.NO_PROXY)

      // address的proxySelector其實就是OkHttpClient的proxySelector,它的類型在上面已經
      // 提過了,這里使用proxySelector尋找URI可用的代理  
      val proxiesOrNull = address.proxySelector.select(uri)
      // 代理為空,則不使用代理  
      if (proxiesOrNull.isNullOrEmpty()) return immutableListOf(Proxy.NO_PROXY)
	  // 返回代理列表
      return proxiesOrNull.toImmutableList()
    }

    eventListener.proxySelectStart(call, url)
    // 記錄代理  
    proxies = selectProxies()
    // 代理下標  
    nextProxyIndex = 0
    eventListener.proxySelectEnd(call, url, proxies)
  }

在這個方法中,就完成了代理的收集

當我們調用RouteSelectornext方法的時候,就可以獲取一個Selection,這個方法其實就是收集一個特定代理服務器選擇情況下的所有路由

  @Throws(IOException::class)
  operator fun next(): Selection {
    // 沒有下一個代理,並且沒有被延遲的路由,則拋出異常  
    if (!hasNext()) throw NoSuchElementException()

    // Compute the next set of routes to attempt.
    // 計算下一組要嘗試的路由  
    val routes = mutableListOf<Route>()
    // 存在需要嘗試的代理  
    while (hasNextProxy()) {
      // 延遲路由總是最后被嘗試  
      val proxy = nextProxy()
      // 遍歷某個代理下的連接目標地址  
      for (inetSocketAddress in inetSocketAddresses) {
        // 根據連接目標地址創建路由 
        val route = Route(address, proxy, inetSocketAddress)
        // 如果該路由最近連接失敗,則延遲它  
        if (routeDatabase.shouldPostpone(route)) {
          postponedRoutes += route
        } else {
          routes += route
        }
      }

      // 如果沒有可用路由,繼續嘗試下一個代理服務器  
      if (routes.isNotEmpty()) {
        break
      }
    }

    // 遍歷完所有代理服務器,依然沒有找到可用路由,使用之前記錄的延遲路由(最近連接失敗的路由)  
    if (routes.isEmpty()) {
      // We've exhausted all Proxies so fallback to the postponed routes.
      routes += postponedRoutes
      postponedRoutes.clear()
    }

    // 返回可用的路由集合
    return Selection(routes)
  }

我們查看nextProxy方法,該方法用於獲取下一個代理,並且計算該代理對應的連接目標地址

  // 返回下一個嘗試的代理,可能是PROXY.NO_PROXY但不可能為null
  @Throws(IOException::class)
  private fun nextProxy(): Proxy {
    if (!hasNextProxy()) {
      throw SocketException(
          "No route to ${address.url.host}; exhausted proxy configurations: $proxies")
    }
    // 獲取下一個proxy  
    val result = proxies[nextProxyIndex++]
    // 計算該proxy對應的連接目標地址
    resetNextInetSocketAddress(result)
    // 返回proxy
    return result
  }

我們查看resetNextInetSocketAddress方法,該方法用於計算某個代理對應的連接目標地址

  // 為當前的proxy收集socket addresses
  @Throws(IOException::class)
  private fun resetNextInetSocketAddress(proxy: Proxy) {
    // 通過新建一個列表,清空上一個代理服務器的連接地址  
    val mutableInetSocketAddresses = mutableListOf<InetSocketAddress>()
    inetSocketAddresses = mutableInetSocketAddresses

    // 記錄主機名和端口號  
    val socketHost: String
    val socketPort: Int
    // 不使用代理或者使用SOCKS代理,記錄HTTP服務器主機名以及協議端口號  
    if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
      socketHost = address.url.host
      socketPort = address.url.port
    } else {
      // 使用HTTP代理,記錄代理服務器的主機名以及端口號  
      val proxyAddress = proxy.address()
      require(proxyAddress is InetSocketAddress) {
        "Proxy.address() is not an InetSocketAddress: ${proxyAddress.javaClass}"
      }
      socketHost = proxyAddress.socketHost
      socketPort = proxyAddress.port
    }

    // 端口號不合法  
    if (socketPort !in 1..65535) {
      throw SocketException("No route to $socketHost:$socketPort; port is out of range")
    }

    // 使用SOCKS代理  
    if (proxy.type() == Proxy.Type.SOCKS) {
      // 根據HTTP服務器的主機名和協議端口號,創建連接目標地址,此時創建的InetSocketAddress
      // 實例是未經解析的  
      mutableInetSocketAddresses += InetSocketAddress.createUnresolved(socketHost, socketPort)
    } else {
      // 不使用代理或者使用HTTP代理,則會進行DNS解析  
      eventListener.dnsStart(call, socketHost)

      // Try each address for best behavior in mixed IPv4/IPv6 environments.
      // 對Http服務器的主機名進行DNS解析  
      val addresses = address.dns.lookup(socketHost)
      if (addresses.isEmpty()) {
        throw UnknownHostException("${address.dns} returned no addresses for $socketHost")
      }

      eventListener.dnsEnd(call, socketHost, addresses)
      // 為每個解析到的IP地址創建連接目標地址  
      for (inetAddress in addresses) {
        mutableInetSocketAddresses += InetSocketAddress(inetAddress, socketPort)
      }
    }
  }

從上面的代碼可以看出,一個特定代理服務器選擇下的連接目標地址因代理類型不同而不同

  • 直接連接:對HTTP服務器域名進行解析,並為每個解析到的IP地址創建連接目標地址
  • 使用SOCKS代理:直接以HTTP服務器域名以及協議端口號創建連接目標地址
  • 使用HTTP代理:對代理服務器的域名進行解析,並為每個解析到的IP地址創建連接目標地址

通過以上對RouteSelector的分析,我們可以知道

  1. 在創建RouteSelector的時候,它會先在構造函數中收集所有的代理
  2. 當調用RouteSelectornext方法的時候,它會返回一個Selection對象,Selection對象代表的是某個代理服務器下可用的路由集合,代理服務器有三種類型:不使用代理、使用HTTP代理、使用SOCKS代理
  3. 在獲取Selection對象的時候,會先獲取該代理對應的連接目標地址列表,再根據連接目標地址列表去構造該代理對應的路由Route列表
  4. RouteSelector會先嘗試某個代理對應的Selection,如果所有代理下都找不到可用的路由,那么就使用「延遲路由列表」去構建一個Selection對象,也就是說,延遲路由總是最后被嘗試

SelectionRouteSelector的一個內部類,它的實現很簡單

  class Selection(val routes: List<Route>) {
    private var nextRouteIndex = 0

    operator fun hasNext(): Boolean = nextRouteIndex < routes.size

    operator fun next(): Route {
      if (!hasNext()) throw NoSuchElementException()
      return routes[nextRouteIndex++]
    }
  }

調用Selectionnext方法,即可獲取該Selection的下一個路由Route

另外,一個域名是可以對應多個IP地址的,可以參考 一個域名最多能對應幾個IP地址?。結合上面的分析,我們可以畫出下面的這個圖

image-20211130094719937

當然,前面提到過,Proxy有三種類型,每種類型的連接目標地址InetSocketAddress會有不同,也就是上面的IPPort會有不同,根據Proxy類型的不同,InetSocketAddress會有不同的含義:

  • 不使用代理:它包含的信息是HTTP服務器經過了DNS解析的IP地址以及協議的端口號。
  • HTTP代理:它包含的信息是代理服務器經過DNS解析的IP地址以及端口號。
  • SOCKS代理:它包含的信息是HTTP服務器的域名和協議端口號。

現在還有一個問題,ExchangeFinderAddressRouteSelector類分別是什么時候創建的呢?ExchangeFinder是在RetryAndFollowUpInterceptor當中被創建的

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
	@Throws(IOException::class)
    override fun intercept(chain: Interceptor.Chain): Response {
    	...
        var newExchangeFinder = true
        ...
        while (true) {
        	call.enterNetworkInterceptorExchange(request, newExchangeFinder)
            ...
        }
    }
    ...    
}

這里調用了RealCallenterNetworkInterceptorExchange方法,該方法會根據newExchangeFinder的值,判斷是否要創建一個ExchangeFindernewExchangeFinder表示是否要創建一個ExchangeFinder

  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
	...
    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

這里創建了一個ExchangeFinder,並將其引用存儲在RealCall當中,另外createAddress方法會去創建一個Address對象,如下

  private fun createAddress(url: HttpUrl): Address {
    var sslSocketFactory: SSLSocketFactory? = null
    var hostnameVerifier: HostnameVerifier? = null
    var certificatePinner: CertificatePinner? = null
    if (url.isHttps) {
      sslSocketFactory = client.sslSocketFactory
      hostnameVerifier = client.hostnameVerifier
      certificatePinner = client.certificatePinner
    }

    return Address(
        uriHost = url.host,
        uriPort = url.port,
        dns = client.dns,
        socketFactory = client.socketFactory,
        sslSocketFactory = sslSocketFactory,
        hostnameVerifier = hostnameVerifier,
        certificatePinner = certificatePinner,
        proxyAuthenticator = client.proxyAuthenticator,
        proxy = client.proxy,
        protocols = client.protocols,
        connectionSpecs = client.connectionSpecs,
        proxySelector = client.proxySelector
    )
  }

其中url參數就是請求的url。從上面可以看出,我們創建的Address對象會存儲urlhostport信息,另外Address對象還有一些其它的信息,這些信息均取自OkHttpClient

另外,一個RouteSelector對象,可能會在ExchangeFinderfindConnection方法中被創建

  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
  	...
    if (nextRouteToTry != null) {
    	...
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
    	...
    } else {
        // Compute a new route selection. This is a blocking operation!
        var localRouteSelector = routeSelector
        if (localRouteSelector == null) {
            localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
            this.routeSelector = localRouteSelector
        }
        val localRouteSelection = localRouteSelector.next()
        routeSelection = localRouteSelection
        routes = localRouteSelection.routes
        ...  
    }
  }

插敘:邏輯梳理

在介紹了上面的知識后,我們將目前邏輯梳理一下。

首先,在RetryAndFollowUpInterceptorintercept方法里面,會調用RealCallenterNetworkInterceptorExchange方法,在該方法里面,會根據newExchangeFinder的值,去判斷是否要創建一個新的ExchangeFinder對象,若創建,則將ExchangeFinder對象的引用存於RealCall

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
	@Throws(IOException::class)
    override fun intercept(chain: Interceptor.Chain): Response {
        ...
		call.enterNetworkInterceptorExchange(request, newExchangeFinder)
    	...
    }
    ...    
}
class RealCall(
  val client: OkHttpClient,
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
	...
    fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    	...
        if (newExchangeFinder) {
          this.exchangeFinder = ExchangeFinder(
              connectionPool,
              createAddress(request.url),
              this,
              eventListener
          )
        }
    }
    ...
}

創建ExchangeFinder對象的同時,也會調用createAddress方法,去創建一個Address對象,Address會記錄請求urlhostport

接着在獲取一個健康可用連接的時候,調用鏈為 RealCall::initExchange -> ExchangeFinder::find -> ExchangeFinder::findHealthyConnection -> ExchangeFinder::findConnection,在ExchangeFinderfindConnection方法里面,若RouteSelector還沒創建,就會去創建RouteSelector,邏輯如下

  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
  	...
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      ...
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      routes = null
      route = routeSelection!!.next()
    } else {
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes
	  
      ...
      route = localRouteSelection.next()
    }  
    ...  
  }

我們重點關注If的這大段語句,nextRouteToTry先不去管它,暫時認為它為null。上面routeSelectorrouteSelection的類型分別是RouteSelectorRouteSelector.Selection,它們的含義在上面已經介紹過了。

ExchangeFinderfindHealthyConnection方法我們可以知道,findConnection方法是有可能多次被調用的,在第一次被調用的時候,routeSelectorrouteSelection均為null,因此會在else分支下創建RouteSelector對象,並且會調用routeSelectornext方法,獲取一個Selection並將其賦值給routeSelection,然后再調用routeSelectionnext方法,獲取一個路由route。在第二次進入findConnection方法的時候,會先在else if語句判斷routeSelection是否還有可用的路由,如果有,則直接獲取routeSelection的下一個路由,如果當前routeSelection的路由已經耗盡,則會在else分支中,使用routeSelectornext方法獲取下一個routeSelection,然后再從routeSelection中獲取一個路由route

選擇可用路由

RouteSelector選擇可用路由的邏輯主要在next()方法中,上面已經對next()方法進行了分析:

  1. 如果當前有可被嘗試的代理服務器,獲取該代理服務器對應的所有連接目標地址。
  2. 根據連接目標地址創建對應路由,如果該路由最近連接失敗,將該路由加入延遲路由集合,否則加入正常路由集合。
  3. 如果當前有正常路由,返回正常路由集合,否則繼續1、2步驟。
  4. 遍歷完所有可嘗試代理服務器仍然沒有正常路由,則返回延遲路由集合(最近連接失敗的路由)。

維護路由連接失敗信息

RouteSelector借助RouteDatabase維護失敗的路由信息,避免浪費時間去連接一些不可用的路由:

class RouteDatabase {
  private val failedRoutes = mutableSetOf<Route>()

  /** Records a failure connecting to [failedRoute]. */
  @Synchronized fun failed(failedRoute: Route) {
    failedRoutes.add(failedRoute)
  }

  /** Records success connecting to [route]. */
  @Synchronized fun connected(route: Route) {
    failedRoutes.remove(route)
  }

  /** Returns true if [route] has failed recently and should be avoided. */
  @Synchronized fun shouldPostpone(route: Route): Boolean = route in failedRoutes
}

該類實現非常簡單,里面維護了一個連接失敗路由容器,當路由連接失敗,調用failed方法將該路由加入容器中,當路由連接成功,調用connected方法將該路由從容器中刪除,通過調用shouldPostpone方法判斷該路由是否在連接失敗路由容器中,從而知道該路由最近是否連接失敗。

HTTP中的連接復用機制

OkHttp中出現了「連接合並」的概念,它和HTTP/2的多路復用機制有關。我們先介紹HTTP中的連接復用機制,然后再介紹OkHttp中的連接復用機制。

雖然說 HTTP 是一個無連接、無狀態的協議,但由於它基於 TCP,因此也存在着管理 TCP 連接的管理問題。

HTTP/1.0 及之前

使用TCP協議往往存在粘包問題,出現粘包問題的核心原因是:TCP 協議是面向字節流的,它的數據並沒有邊界。

由於 TCP 連接的數據之間沒有數據邊界,從而導致 HTTP 數據包粘連,無法處理,因此 HTTP/1.0 及之前,采用了一種非常暴力的方式進行解決:

每進行一次 HTTP 通信,就要先建立一條 TCP 連接,在通信結束后,就需要斷開一次 TCP 連接,而每次TCP 連接的建立需要經過三次握手,而 TCP 連接的關閉需要四次揮手,這顯然非常浪費資源。

image-20211201093318325

其實 HTTP/1.0 中還引入了 Connection:Keep-Alive 這個 Header,它就是用來支持 TCP 連接的維持的,但在 大部分基於 HTTP/1.0 的服務器中並沒有實現。

HTTP/1.1

HTTP/1.1真正引入Keep-Alive機制,默認開啟,可以通過Connection:close關閉。在 HTTP 通信結束時,若啟用了 Keep-Alive 機制,則該連接並不會立即關閉,此時如果有新的請求到來,且 host 和 port 相同,則會復用這條 TCP 連接進行請求,減少了 TCP 連接的頻繁建立與關閉的資源消耗。

image-20211201093637597

這樣就避免了每次HTTP通信都要建立和關閉 TCP 連接,從而也就避免了頻繁的握手和揮手(三次握手和四次揮手),使得 HTTP 請求的效率大大提高

image-20211201093810241

粘包問題

現在多個HTTP請求可以復用同一條TCP連接,而HTTP數據包也沒有通過分隔符確定數據的頭和尾,那么它是如何解決粘包的問題的呢?

實際上它是通過 Content-Length 這個 Header 解決的,它標明了數據部分所占用的大小,從而可以通過它來確定這個數據包的邊界,避免粘包。這也是 HTTP/1.1 引入 Content-Length 的原因。

同時,HTTP/1.1 中還有一個 Keep-Alive 請求頭,可以對 timeoutmax 進行設置,用來指定空閑連接保持打開的時間以及連接關閉前這條連接可以發送請求數的最大值。

HTTP/2.0

HTTP1.1 存在的問題

HTTP/1.1 中,雖然實現了 TCP 連接的復用,但仍有如下幾個缺陷:

  1. 如果客戶端想要發起並行的請求,則必須建立多個 TCP 連接,這對網絡資源的消耗也是十分嚴重的。
  2. 不會讀對請求及響應的 Header 進行壓縮,造成了網絡流量的浪費。

關於第一點:例如,打開一個網頁,瀏覽器會先向服務器請求一份HTML文件,在拿到HTML文件后,就需要向服務器請求HTML中的圖片等資源。若是使用HTTP 1.1協議,雖然TCP連接是可以復用的,但是TCP連接上無法並行請求,需要等到上一個請求結束后,才能復用該 TCP 連接進行下一個請求,如果想並行請求,只能建立多個TCP連接,在每個TCP連接上都進行一個HTTP請求;若是使用HTTP2.0協議,則可以在同一個TCP連接上並行地發送多個HTTP請求,同時去請求HTML文件中的多份資源,然后等待服務端的響應,這就是HTTP2.0的多路復用機制

多路復用

HTTP/2.0 引入了一種多路復用機制,同時引入了幾個新的概念:

  • 數據流:基於 TCP 連接上的一個雙向的字節流,每發起一個請求,就會建立一個數據流,后續的請求過程的數據傳遞都通過該流進行
  • 數據幀:HTTP/2 中的數據最小切片單位,其中又分為了 Header FrameData Frame 等等。
  • 消息:一個請求或響應對應的一系列數據幀。

引入了這些概念之后,在 HTTP 請求的過程中,服務端/客戶端首先會將我們的請求/響應切分為不同的數據幀,當另一方接收到后再將其組裝從而形成完整的請求/響應,如下所示

image-20211201095807774

這樣,就實現了對 TCP 連接的多路復用,將一個請求或響應分為了一個個的數據幀,使得多個請求可以並行地進行。

多路復用與 Keep-Alive 的區別

  1. Keep-Alive 機制雖然解決了復用 TCP 連接問題,但沒有解決請求阻塞的問題,需要等到上一個請求結束后,才能復用該 TCP 連接進行下一個請求。
  2. HTTP/1.x 對數據的傳遞仍然是以一個整體進行傳遞,而在 HTTP/2 中引入了數據幀的概念,使得多個請求可以同時在流中進行傳遞。
  3. HTTP/2 采用了 HPACK 壓縮算法對 Header 進行壓縮,降低了請求的流量消耗。
image-20211201100007939

連接合並

英文名為:connection coalescing。

「連接合並」出現的背景:在 HTTP/1.1 中,瀏覽器為了更快地獲取數據,對於每個「主機名+端口號」會使用 6 個連接。有些網站站點還會啟用更多的主機名,這些主機名往往被解析為相同的 IP 地址或者 IP 地址集合,網站站點這么做的目的是為了觸發瀏覽器使用更多的連接。在 HTTP/2 引入了多路復用后,它允許多個流在一個連接上同時進行傳輸,那么原先為了讓瀏覽器使用更多連接而使用多個主機名的網站站點,與現在 HTTP/2 瀏覽器使用一個 TCP 連接進行多路復用的意思,就背道而馳了。那些網站站點不希望切換回單一的主機名,主要是基於下面的考慮

  • 這會是一個重大的架構變化。
  • 還有一些瀏覽器仍然是在使用 HTTP/1.1。

「連接合並」:對於連接合並,不同的瀏覽器有不同的做法,一些瀏覽器甚至不去處理它。下面是個例子:假設某個站點 "example.com" 在 DNS 中有兩個主機名,分別是 "A.example.com" 和 "B.example.com",每個主機名都對應着一個 IP 列表。另外,HTTP/2 是基於 HTTPS 的,使用 HTTP/2 的瀏覽器,在 TLS 握手過程中可以獲取服務端的兩個主機名 "A.example.com"、"B.example.com"。假設兩個主機名 DNS 解析后的 IP 列表為:

# A.example.com
192.168.0.1 and 192.168.0.2
# B.example.com
192.168.0.2 and 192.168.0.3

下面是 Chrome 瀏覽器的連接合並的做法

  • 假設一開始瀏覽器連接到了A站點的 192.168.0.1,現在有個要連接 "B.example.com" 的請求,瀏覽器會先去 DNS 解析獲取它的 IP 列表,發現其中並沒有 192.168.0.1,於是會為 "B.example.com" 創建一個新的連接。
  • 假設一開始瀏覽器連接到了A站點的 192.168.0.2,現在有個要連接 "B.example.com" 的請求,瀏覽器會先去 DNS 解析獲取它的 IP 列表,發現B對應的 IP 地址也有 192.168.0.2,於是瀏覽器會重用到A站點的 TCP 連接,去請求到B站點的數據。

OkHttp的連接復用機制

OkHttp使用連接池實現了連接復用機制,我們看下連接池ConnectionPool的實現。

ConnectionPool

ConnectionPool只是連接池的入口,真正的實現其實是RealConnectionPool

初始化

OkHttpClient.Builder中默認創建一個ConnectionPool對象,用戶也可以自己傳入一個ConnectionPool對象

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
	...
    val connectionPool: ConnectionPool = builder.connectionPool
    ...
    class Builder constructor() {
        ...
        internal var connectionPool: ConnectionPool = ConnectionPool()
        ...
        fun connectionPool(connectionPool: ConnectionPool) = apply {
      		this.connectionPool = connectionPool
    	}
        ...
    }
}

ConnectionPool的實現如下

class ConnectionPool internal constructor(
  internal val delegate: RealConnectionPool
) {
  constructor(
    maxIdleConnections: Int,
    keepAliveDuration: Long,
    timeUnit: TimeUnit
  ) : this(RealConnectionPool(
      taskRunner = TaskRunner.INSTANCE,
      maxIdleConnections = maxIdleConnections,
      keepAliveDuration = keepAliveDuration,
      timeUnit = timeUnit
  ))

  constructor() : this(5, 5, TimeUnit.MINUTES)

  /** Returns the number of idle connections in the pool. */
  fun idleConnectionCount(): Int = delegate.idleConnectionCount()

  /** Returns total number of connections in the pool. */
  fun connectionCount(): Int = delegate.connectionCount()

  /** Close and remove all idle connections in the pool. */
  fun evictAll() {
    delegate.evictAll()
  }
}

可以看出,其內部依靠了RealConnectionPool實現。

RealConnectionPool

我們查看RealConnectionPool的構造參數和成員變量

class RealConnectionPool(
  taskRunner: TaskRunner,
  /** The maximum number of idle connections for each address. */
  private val maxIdleConnections: Int,
  keepAliveDuration: Long,
  timeUnit: TimeUnit
) {
  private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)

  private val cleanupQueue: TaskQueue = taskRunner.newQueue()
  private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }

  private val connections = ConcurrentLinkedQueue<RealConnection>()
  ...
}

ConnectionPoolRealConnectionPool對象的構造,我們可以知道默認情況下,OkHttp中的連接池最大空閑連接的數量為5,並且最大的空閑時間為5分鍾,這里的最大空閑連接數量是相對於一個address而言。

keepAliveDurationNs也就是將空閑時間使用納秒來表示。

connections用於存放連接。

上面還出現了Task cleanupTaskTaskQueue cleanupQueue這兩個對象。

Task類:從Task類的注釋可以得知,它代表一個可以執行一次或多次的任務。

  • 復發性:Task是一個抽象類,它有一個抽象方法runOnce,子類需要去實現該方法,在該方法中定義要執行的任務,runOnce返回一個Long值,該值表示該Task下一次被調度執行的延時,特別地,若該值為-1,則表示該Task不再被調度執行。
  • 可取消:Task存放於TaskQueue當中,當Task在隊列中等待被執行或者正在執行時,是可以被取消的,取消一個在等待被執行的Task,那么該Task就不會被執行了,取消一個正在執行的Task,不會影響該Task的本次執行,但之后該Task就不會再被調度執行了。
  • TaskQueue關系:一個Task會被綁定到一個TaskQueue,一個TaskQueue中可以有多個Task,但一個Task只可以對應一個TaskQueueTaskQueue中的Task是有順序的,隊列中的Task是不會並發執行的。

TaskQueue類:里面存放着一組任務,這組任務按順序執行,不是並發執行。

Task類的runOnce方法中,調用了cleanup方法,該方法執行「清理空閑連接」的任務,后面會提到。

RealConnectionPool中幾個比較重要的操作方法為:putcleanupcallAcquirePooledConnectionconnectionBecameIdle,分別對應放入連接清理空閑連接獲取可用連接通知連接空閑操作。

放入連接

放入連接對應put方法,RealConnectionPool::put如下

  fun put(connection: RealConnection) {
    connection.assertThreadHoldsLock()
	// 存入隊列
    connections.add(connection)
    // 啟動清理空閑連接的任務  
    cleanupQueue.schedule(cleanupTask)
  }

該方法會將connection存入連接緩存池中,並且啟動清理空閑連接的任務。

清理空閑連接

cleanupTask表示一個清理空閑連接的任務,該任務會執行cleanup方法

  private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }

cleanup方法用於清理空閑連接,如下

  // 該方法用於維護連接池,如果有「空閑時間」超過「允許的最長空閑時間」的連接,或者「空閑連接的數量」
  // 超過「允許的最大空閑連接數量」,那么就從連接緩存池中移除「空閑時間最長的連接」
  fun cleanup(now: Long): Long {
    // 正在使用的連接數量  
    var inUseConnectionCount = 0
    // 空閑的連接數量  
    var idleConnectionCount = 0
    // 空閑時間最長的連接  
    var longestIdleConnection: RealConnection? = null
    // 空閑時間最長的連接 的 空閑時間
    var longestIdleDurationNs = Long.MIN_VALUE

    // Find either a connection to evict, or the time that the next eviction is due.
    // 找到需要移除的連接,或者計算下一次需要清理的時間  
    for (connection in connections) {
      synchronized(connection) {
        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          // 如果當前連接正在使用  
          inUseConnectionCount++
        } else {
          // 如果當前連接是空閑的  
          idleConnectionCount++

          // If the connection is ready to be evicted, we're done.
          // 計算連接的空閑時間  
          val idleDurationNs = now - connection.idleAtNs
          // 與之前的空閑時間進行對比,記錄空閑時間最長的連接、以及其空閑時間  
          if (idleDurationNs > longestIdleDurationNs) {
            longestIdleDurationNs = idleDurationNs
            longestIdleConnection = connection
          } else {
            Unit
          }
        }
      }
    }

    when {
      // 最長空閑時間 超過 允許的最長空閑時間,或者,空閑連接的數量 超過 允許的最大空閑數量  
      // 表示要清理連接了,這時候會選擇「空閑時間最久」的連接進行清理  
      longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections -> {
       
        // 空閑時間最長的連接
        val connection = longestIdleConnection!!
        synchronized(connection) {
          // 不再是空閑連接,立即執行下次清理任務  
          if (connection.calls.isNotEmpty()) return 0L
          // 不再是空閑時間最長的,立即執行下次清理任務  
          if (connection.idleAtNs + longestIdleDurationNs != now) return 0L
          // 標記該連接不可用  
          connection.noNewExchanges = true
          // 從連接緩存池中移除該連接  
          connections.remove(longestIdleConnection)
        }
	    
        connection.socket().closeQuietly()
        // 若當前的連接池為空,那么就沒有必要再清理連接了,所以調用TaskQueue的cancelAll方法,
        // 取消所有Task,也就是取消「所有清理連接的」任務      
        if (connections.isEmpty()) cleanupQueue.cancelAll()

        // 立即執行下次清理任務 
        return 0L
      }

      // 有空閑的連接,但是還沒有達到清理的標准(空閑連接 || 空閑連接數)  
      idleConnectionCount > 0 -> {
        // 返回下次執行清理任務的時間  
        return keepAliveDurationNs - longestIdleDurationNs
      }

      // 沒有連接是空閑的,所有連接都在使用  
      inUseConnectionCount > 0 -> {
        // 待「允許的最長空閑時間」之后,再次執行清理連接的任務
        return keepAliveDurationNs
      }

      // 沒有任何的連接  
      else -> {          
        return -1
      }
    }
  }

由於該方法在TaskrunOnce方法中調用,並且其返回值作為runOnce方法的返回值,前面我們提到過,runOnce方法的返回值表示該Task下一次被調度執行的延時,或者返回-1表示該Task不再被調度執行,因此這里cleanup方法的返回值意味着下一次執行「清理空閑連接」任務的延時,或者返回-1,表示不再執行該「清理空閑連接」的任務。

我們再總結一下cleanup方法所做的事情:

  1. 遍歷連接緩存池中的連接,記錄當前空閑連接數量、正在使用連接數量、當前空閑時間最長的連接及其空閑時間。

  2. 若最長空閑時間 超過 允許的最長空閑時間,或者,空閑連接的數量 超過 允許的最大空閑數量 ,這時候會從連接緩存池中移除 空閑時間最久 的連接,同時檢測當前連接緩存池是否為空,如果為空則取消所有「清理空閑連接」的任務。立即執行下次「清理空閑連接」的任務。 否則往下走。

  3. 若 有空閑的連接,但是還沒有達到清理的標准(空閑連接 || 空閑連接數) ,返回下次執行「清理空閑連接」任務的延時。否則往下走。

  4. 若 沒有連接是空閑的,所有連接都在使用 ,那么就返回keepAliveDurationNs,表示下次執行「清理空閑連接」任務的延時。否則往下走。

  5. 當前連接緩存池中沒有任何連接,該「清理空閑連接」任務不需要再執行。

獲取可用連接

從連接池獲取可用連接對應着callAcquirePooledConnection方法,RealConnectionPoolcallAcquirePooledConnection如下

  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?, // 請求方路由選擇后得到的路由列表
    requireMultiplexed: Boolean // 是否要求要「Http/2.0多路復用的連接」
  ): Boolean { 
    // 遍歷連接緩存池中的連接  
    for (connection in connections) {
      synchronized(connection) {
        // 請求方要求要多路復用連接,但該連接不是多路復用的連接,則跳過該連接  
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        // 判斷該連接是否符合條件,若不符合則跳過該連接  
        if (!connection.isEligible(address, routes)) return@synchronized
        // RealCall使用該連接  
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }

該方法會遍歷連接緩存池中的連接,為請求者尋找一個滿足條件的連接,找到了則返回true,未找到則返回false。如果該連接滿足請求者的條件,則會調用RealCallacquireConnectionNoEvents方法,表示RealCall獲取該連接

  fun acquireConnectionNoEvents(connection: RealConnection) {
    connection.assertThreadHoldsLock()

    check(this.connection == null)
    // RealCall中記錄該連接  
    this.connection = connection
    connection.calls.add(CallReference(this, callStackTrace))
  }

我們還要關注RealConnectionisEligible方法,該方法用於判斷連接是否符合請求者的條件

  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
    assertThreadHoldsLock()

    // 該connection承載的流數量已經到達最大值,或者該連接不可用,那么返回false  
    if (calls.size >= allocationLimit || noNewExchanges) return false

    // 比較Address的非host字段,如果有不相同的,那么返回false  
    if (!this.route.address.equalsNonHost(address)) return false

    // 如果主機名host也匹配,那么意味着該連接可以復用,返回true
    if (address.url.host == this.route().address.url.host) {
      return true // This connection is a perfect match.
    }

    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
	
    // 此時 該連接對應的地址 和 請求者的目標地址,雖然它們的host沒有匹配上,但是如果它們滿足
    // 「連接合並」的要求,該連接仍然可以被復用,承載請求  
      
    // 若該連接不是Http/2(多路復用)連接,則返回false  
    if (http2Connection == null) return false

    // 2. The routes must share an IP address.
    // 用戶需要傳入routes,並且傳入的routes至少要有一個與該連接的路由相匹配  
    if (routes == null || !routeMatchesAny(routes)) return false

    // 3. This connection's server certificate's must cover the new host.
    // 此連接的服務器證書必須包含新主機  
    if (address.hostnameVerifier !== OkHostnameVerifier) return false
    // 是否支持該url  
    if (!supportsUrl(address.url)) return false

    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
    } catch (_: SSLPeerUnverifiedException) {
      return false
    }

    // 滿足「連接合並」的要求,可以復用該連接 
    return true // The caller's address can be carried by this connection.
  }

上面的routeMatchesAny方法如下:

  private fun routeMatchesAny(candidates: List<Route>): Boolean {
    return candidates.any {
      it.proxy.type() == Proxy.Type.DIRECT &&
          route.proxy.type() == Proxy.Type.DIRECT &&
          route.socketAddress == it.socketAddress
    }
  }

可以看出,它返回true的條件:該coneection對應的路由是直連,並且要求用戶傳入的routes中至少要有一條路由為直連,並且該路由和coneection的路由的目標地址要相同。

supportsUrl方法:

  private fun supportsUrl(url: HttpUrl): Boolean {
    assertThreadHoldsLock()

    val routeUrl = route.address.url

    if (url.port != routeUrl.port) {
      return false // Port mismatch.
    }

    if (url.host == routeUrl.host) {
      return true // Host match. The URL is supported.
    }

    // We have a host mismatch. But if the certificate matches, we're still good.
    return !noCoalescedConnections && handshake != null && certificateSupportHost(url, handshake!!)
  }

我們再總結一下獲取可用連接的callAcquirePooledConnection方法:

  1. 遍歷緩存連接池中的連接,對於每個連接,處理如下
  2. 若用戶要求"多路復用"的連接,但該連接不是"多路復用"的連接,則跳過該連接
  3. 若該連接不滿足條件,則跳過該連接
    1. 用戶要求的addressconnectionroute,它們hostport等信息都匹配,則滿足條件
    2. 若滿足Http/2.0的「連接合並」的要求,也認為該連接滿足條件
    3. 否則該連接不滿足條件
  4. 該連接滿足用戶的要求,則RealCall獲取該連接,並返回true,表示連接池獲取連接成功
  5. 若所有連接都不符合用戶要求,則返回false,表示連接池獲取連接失敗

三次連接池的獲取的區別

回到ExchangeFinder::findConnection方法中,它三次從連接池獲取連接,它們之間有什么區別呢?

// 第一次從連接池獲取連接
connectionPool.callAcquirePooledConnection(address, call, null, false)
// 第二次從連接池獲取連接
connectionPool.callAcquirePooledConnection(address, call, routes, false)
// 第三次從連接池獲取連接
connectionPool.callAcquirePooledConnection(address, call, routes, true)

它們的區別在於第三個參數和第四個參數,剛剛我們提到了RealConnectionPoolcallAcquirePooledConnection方法,它用於獲取可用的連接

  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    for (connection in connections) {
      synchronized(connection) {
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        if (!connection.isEligible(address, routes)) return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }
  • 第三個參數routes:客戶端到目的地(有可能是源服務器,也有可能是代理服務器)的路由列表,它由我們前面介紹的RouteSelector生成的

  • 第四個參數requireMultiplexed:表示客戶端是否需要「多路復用」的連接,多路復用連接是HTTP/2.0的特性

三次從連接池獲取連接的區別:

  • 第一次:requireMultiplexed參數為false,說明先不去管連接池中的連接是不是HTTP/2的連接(多路復用),只要在isEligible方法中,判斷目標請求地址和connection對應路由的主機名、端口等信息都匹配,就使用該連接,若不匹配,則不使用該連接(由於routes參數為null,因此不會去使用「連接合並」的特性)
  • 第二次:requireMultiplexed參數為false,說明先不去管連接池中的連接是不是HTTP/2的連接(多路復用),只要在isEligible方法中,判斷目標請求地址和connection對應路由的主機名、端口等信息都匹配,就使用該連接,若只有主機名不匹配,由於routes的參數不為null,也就是請求方傳入經過了路由選擇的路由列表,這時候會去判斷是否滿足「連接合並」的特性,滿足則使用該連接,否則不使用
  • 第三次:requireMultiplexed參數為true,表明只需要多路復用的連接,在callAcquirePooledConnection方法中會篩去不是多路復用的連接,對於連接池的每一個多路復用的連接,會先去看主機名、端口等信息是否匹配,匹配則直接使用,若只有主機名不匹配,則判斷是否滿足「連接合並」的特性,滿足則使用該連接,否則不使用

簡單總結:

  • 第一次:不使用「連接合並」的特性,只有目標請求地址和connection對應路由的主機名、端口等信息都匹配,才使用該連接
  • 第二次:使用「連接合並」的特性,不管是不是多路復用連接,優先判斷主機名和端口等信息是否匹配,匹配則直接使用,若只有主機名不匹配,判斷是否符合「連接合並」的要求,滿足則使用,否則不使用
  • 第三次:使用「連接合並」的特性,這一次只使用多路復用的連接。也是優先判斷主機名和端口等信息是否匹配,匹配則直接使用,不匹配則判斷是否符合「連接合並」的要求,滿足則使用,否則不使用

通知連接空閑

這對應RealConnectionPoolconnectionBecameIdle方法

  /**
   * Notify this pool that [connection] has become idle. Returns true if the connection has been
   * removed from the pool and should be closed.
   */
  // 通知連接池該connection已空閑,如果該connection從連接池中移除則返回true	
  fun connectionBecameIdle(connection: RealConnection): Boolean {
    connection.assertThreadHoldsLock()

    // 如果該連接無法承載新的stream,或者連接池允許的最大空閑連接數為0,則從連接池中移除該連接  
    return if (connection.noNewExchanges || maxIdleConnections == 0) {
      connection.noNewExchanges = true
      connections.remove(connection)
      if (connections.isEmpty()) cleanupQueue.cancelAll()
      true
    } else {
      // 否則執行清理空閑連接的任務  
      cleanupQueue.schedule(cleanupTask)
      false
    }
  }

PS:當調用RealConnectionPoolput方法或者connectionBecameIdle方法時都立即執行空閑連接清理任務。

建立新連接

ExchangeFinder::findConnection中,如果call中沒有可復用的連接,並且前兩次從連接池獲取連接的時候,也沒能成功地獲取到連接,那么這時候就會創建一個新的連接

    ...
	val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } 
	...

我們查看RealConnectionconnect方法

  fun connect(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    call: Call,
    eventListener: EventListener
  ) {
    check(protocol == null) { "already connected" }

    var routeException: RouteException? = null
    val connectionSpecs = route.address.connectionSpecs
    val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)

    // 一些異常處理  
 	...

    // 開始連接  
    while (true) {
      try {
        // 是否需要使用隧道技術  
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            // 我們無法連接隧道,但妥善關閉了我們的資源。  
            break
          }
        } else {
          // 不需要使用隧道技術  
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        // 建立協議  
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch (e: IOException) {
        // 出現異常,釋放一些資源
        ...
      }
    }
      
	// 默認嘗試建立隧道的次數為21,超過該最大次數則拋異常
    if (route.requiresTunnel() && rawSocket == null) {
      throw RouteException(ProtocolException(
          "Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
    }

    idleAtNs = System.nanoTime()
  }

這里使用了一個while(true)循環,直到成功建立連接,主要步驟:

  1. 判斷是否使用隧道技術

    • 是 : 調用connectTunnel方法

    • 否 : 調用connectSocket方法

  2. 調用establishProtocol方法建立協議

不使用隧道(直接連接)

直接連接,調用的是RealConnectionconnectSocket方法

  /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
  // 在原始套接字上,構建一個完整的HTTP或HTTPS連接
  @Throws(IOException::class)
  private fun connectSocket(
    connectTimeout: Int,
    readTimeout: Int,
    call: Call,
    eventListener: EventListener
  ) {
    val proxy = route.proxy
    val address = route.address

    // 根據代理類型,初始化rawSocket  
    val rawSocket = when (proxy.type()) {
      Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
      else -> Socket(proxy)
    }
    this.rawSocket = rawSocket

    eventListener.connectStart(call, route.socketAddress, proxy)
    rawSocket.soTimeout = readTimeout
    try {
      // 連接socket,之所以這樣寫是因為支持不同的平台 
      // 里面的實現調用了 socket.connect(address, connectTimeout) 方法  
      Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
    } catch (e: ConnectException) {
      throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
        initCause(e)
      }
    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      // 獲取source和sink,分別用於輸入和輸出  
      source = rawSocket.source().buffer()
      sink = rawSocket.sink().buffer()
    } catch (npe: NullPointerException) {
      if (npe.message == NPE_THROW_WITH_NULL) {
        throw IOException(npe)
      }
    }
  }

該方法的大致流程如下:

  1. 根據代理類型創建Socket對象。
  2. 調用socketconnect方法進行socket連接,這是 Java 原生的方法,在這里進行了TCP三次握手
  3. 調用Okio庫提供的方法獲取輸入輸出流source以及sink

通過隧道連接

沒有看懂 okhttp 在這一部分的操作,這里就簡單記錄下隧道的概念。

在從IPv4向IPv6遷移的過程中,我們就有提到過建隧道的方法,我們回顧下當時的應用。首先,要明確的一點是IPv6是向后兼容(backward)的,也就是可以發送、路由和接收IPv4數據報,但已部署的具有IPv4能力的系統卻不能夠處理IPv6數據報。

我們看下面這個圖

image-20211206130622802

假定圖中的B和E結點要使用IPv6數據報進行交互,但它們是經由中間IPv4路由器互聯的,我們將兩台IPv6路由器之間的中間IPv4路由器的集合稱為一個隧道,借助於隧道,在隧道發送端的B結點可將整個IPv6數據報放到一個IPv4數據報的有效載荷中,該IPv4數據報的地址設為指向隧道接收端的E結點,再發送給隧道中的第一個節點(在此例中為C)。隧道中的中間IPv4路由器在它們之間為該數據報提供路由,就像對待其他數據報一樣,完全不知道該IPv4數據報自身就含有一個完整的IPv6數據報。隧道接收端的E節點最終收到該IPv4數據報(它是該IPv4數據報的目的地),並確定該IPv4數據報含有一個IPv6數據報(通過觀察在IPv4數據報中的協議號字段是41,指示該IPv4有效載荷是IPv6數據報),從中取出IPv6數據報,然后再為該IPv6數據報提供路由,就好像它是從一個直接相連的IPv6鄰居那里接收到該IPv6數據報一樣。

說明:IPv4向IPv6遷移只是建隧道其中的一個應用場景,在其它的一些地方也有建隧道的概念與應用

Http中如何打開隧道呢

HTTP 提供了一個特殊的 method—— CONNECT,它是 HTTP/1.1 協議中預留的方法,客戶端發送一個 CONNECT 請求給隧道網關請求打開一條 TCP 連接,當隧道打通之后,客戶端通過 HTTP 隧道發送的所有數據會轉發給 TCP 連接,服務器響應的所有數據會通過隧道發給客戶端。

建立協議

無論是直接連接,還是通過隧道連接,在建立了Socket連接之后,都需要調用establishProtocol方法,建立協議。這一步發生在TCP連接建立之后,在TCP連接上發送數據之前,這一步會做TLS握手(使得數據可以加密傳輸)、HTTP/2的協議協商等。

RealConnection::establishProtocol

  @Throws(IOException::class)
  private fun establishProtocol(
    connectionSpecSelector: ConnectionSpecSelector,
    pingIntervalMillis: Int,
    call: Call,
    eventListener: EventListener
  ) {
    // 普通的HTTP請求,而非HTTPS請求,則協議版本定義為HTTP/1.1  
    if (route.address.sslSocketFactory == null) {
      // 協議中包含h2_prior_knowledge  
      if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
        socket = rawSocket
        protocol = Protocol.H2_PRIOR_KNOWLEDGE
        startHttp2(pingIntervalMillis)
        return
      }

      socket = rawSocket
      protocol = Protocol.HTTP_1_1
      return
    }

    eventListener.secureConnectStart(call)
    // TLS握手  
    connectTls(connectionSpecSelector)
    eventListener.secureConnectEnd(call, handshake)
    // 若同時為HTTPS請求和HTTP/2
    if (protocol === Protocol.HTTP_2) {
      startHttp2(pingIntervalMillis)
    }
  }

該方法主要進行以下工作:

  1. 如果是HTTP請求,就將協議protocol定義為HTTP/1.1,如果協議中包含了h2_prior_knowledge,則采用HTTP/2進行請求,調用startHttp2方法。
  2. 如果是HTTPS請求,就調用connectTls方法進行TLS握手,再判斷是不是HTTP/2協議,從而決定是否調用startHttp2方法開啟HTTP2連接

開啟HTTP2連接

查看RealConnection::startHttp2

  @Throws(IOException::class)
  private fun startHttp2(pingIntervalMillis: Int) {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    socket.soTimeout = 0 // HTTP/2 connection timeouts are set per-stream.
    // 初始化一個Http2連接  
    val http2Connection = Http2Connection.Builder(client = true, taskRunner = TaskRunner.INSTANCE)
        .socket(socket, route.address.url.host, source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build()
    this.http2Connection = http2Connection
    // 最大並發流數  
    this.allocationLimit = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
    // 開啟Http2連接
    http2Connection.start()
  }

我們在RealConnectionisEligible方法中曾見過allocationLimit的身影,

  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
	...
    if (calls.size >= allocationLimit || noNewExchanges) return false
    ...
  }

當時主要用於判斷connection承載的流數量是否已經到達最大值,從而決定該connection是否可用,allocationLimit的初始值是1,也就是說,在HTTP/1中,一個connection同時最多承載一個流,但是在HTTP/2中,由於有多路復用的機制,它的allocationLimit值就不再是1了。

startHttp2的主要工作就是初始化一個Http2Connection,將Http2Connectionlistener設置為RealConnection本身,接着設置allocationLimit的值,最后調用Http2Connection.start方法開啟Http2連接。我們查看Http2Connection::start方法

  // Sends any initial frames and starts reading frames from the remote peer.
  // This should be called after Builder.build for all new connections.
  // Params:sendConnectionPreface - true to send connection preface frames.
  // 該方法必須在Builder.build方法之后調用
  @Throws(IOException::class) @JvmOverloads
  fun start(sendConnectionPreface: Boolean = true, taskRunner: TaskRunner = TaskRunner.INSTANCE) {
    if (sendConnectionPreface) {
      // 發送初始幀  
      writer.connectionPreface()
      // 將OkHttp的設置以設置幀的形式發送給對等方  
      writer.settings(okHttpSettings)
      // 窗口更新  
      val windowSize = okHttpSettings.initialWindowSize
      if (windowSize != DEFAULT_INITIAL_WINDOW_SIZE) {
        writer.windowUpdate(0, (windowSize - DEFAULT_INITIAL_WINDOW_SIZE).toLong())
      }
    }
   
    // 開啟一個線程,用於讀取對等方發送的初始幀和設置幀  
    taskRunner.newQueue().execute(name = connectionName, block = readerRunnable)
  }

該方法主要內容如下:

  1. 發送初始幀給服務器
  2. OkHttp的設置以設置幀的形式發送給服務器
  3. 開啟一個線程,讀取服務器發送的初始幀和設置幀

HTTP/2中,每個終端在使用HTTP/2協議之前都要發送一個初始幀作為最終的確認,同時初始幀后面還需跟着一個設置幀,作為HTTP/2連接建立的初始化設置。

HTTP/2協議中的基本單位是幀,每個幀都有不同的類型和用途, 例如,報頭(HEADERS)和數據(DATA)幀組成了基本的HTTP 請求和響應,其他幀例如 設置(SETTINGS)、窗口更新(WINDOW_UPDATE)和推送承諾(PUSH_PROMISE) 是用來實現HTTP/2的其他功能。

TLS握手

如果是HTTPS請求,則需要調用connectTls方法進行TLS握手,RealConnection::connectTls

  @Throws(IOException::class)
  private fun connectTls(connectionSpecSelector: ConnectionSpecSelector) {
    val address = route.address
    val sslSocketFactory = address.sslSocketFactory
    var success = false
    var sslSocket: SSLSocket? = null
    try {
      // Create the wrapper over the connected socket.
      // 基於前面TCP連接的Socket包裝一個SSLSocket對象  
      sslSocket = sslSocketFactory!!.createSocket(
          rawSocket, address.url.host, address.url.port, true /* autoClose */) as SSLSocket

      // Configure the socket's ciphers, TLS versions, and extensions.
      // 配置TLS相關信息  
      val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket)
      if (connectionSpec.supportsTlsExtensions) {
        Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols)
      }

      // Force handshake. This can throw!
      // 進行TLS握手  
      sslSocket.startHandshake()
      // block for session establishment
      // 獲取SSLSession  
      val sslSocketSession = sslSocket.session
      val unverifiedHandshake = sslSocketSession.handshake()

      // Verify that the socket's certificates are acceptable for the target host.
      // 驗證socket證書對該主機是否有效  
      if (!address.hostnameVerifier!!.verify(address.url.host, sslSocketSession)) {
        ...
      }

      ...

      // Success! Save the handshake and the ALPN protocol.
      val maybeProtocol = if (connectionSpec.supportsTlsExtensions) {
        Platform.get().getSelectedProtocol(sslSocket)
      } else {
        null
      }
      socket = sslSocket
      // 獲取輸入輸出流  
      source = sslSocket.source().buffer()
      sink = sslSocket.sink().buffer()
      protocol = if (maybeProtocol != null) Protocol.get(maybeProtocol) else Protocol.HTTP_1_1
      success = true
    } finally {
      if (sslSocket != null) {
        Platform.get().afterHandshake(sslSocket)
      }
      if (!success) {
        sslSocket?.closeQuietly()
      }
    }
  }

connectTls方法主要進行以下工作:

  1. 基於前面TCP連接創建的Socket包裝為一個SSLSocket對象。
  2. 配置TLS相關信息。
  3. 進行TLS握手。
  4. 驗證證書對該主機是否有效。
  5. 獲取輸入輸出流source以及sink

連接獲取流程梳理

我們再梳理一下ExchangeFinder::findConnection獲取連接的流程,它一共有五步:

  1. 重用call當中的連接
  2. 第一次嘗試從連接池獲取連接
  3. 第二次嘗試從連接池獲取連接
  4. 自己新創建一個連接
  5. 第三次嘗試從連接池獲取連接

重用call當中的連接:存在RetryAndFollowUpInterceptor攔截器,當請求失敗需要重試或者重定向的時候,這時候call中的連接還在,如果該連接滿足條件,則可以進行復用

第一次嘗試從連接池獲取連接:不使用「連接合並」的特性,只有目標請求地址和connection對應路由的主機名、端口等信息都匹配,才使用該連接;

第二次嘗試從連接池獲取連接:使用「連接合並」的特性,不管是不是多路復用連接,優先判斷主機名和端口等信息是否匹配,匹配則直接使用,若只有主機名不匹配,判斷是否符合「連接合並」的要求,滿足則使用,否則不使用;

自己新創建一個連接:首先創建一個RealConnection對象,然后調用該對象的connect方法,在該方法中,會進行TCP連接(三次握手),並建立協議(根據情況,判斷是否要進行TLS握手、開啟HTTP2連接);

第三次嘗試從連接池獲取連接:使用「連接合並」的特性,這一次只使用多路復用的連接。也是優先判斷主機名和端口等信息是否匹配,匹配則直接使用,不匹配則判斷是否符合「連接合並」的要求,滿足則使用,否則不使用

為什么在新創建了一個連接后,還要第三次嘗試從連接池獲取連接呢

ExchangeFinder::findConnection方法中對於第三次從連接池獲取連接的注釋如下:

    // If we raced another call connecting to this host, coalesce the connections. This 
    // makes for 3 different lookups in the connection pool!

有可能另外一個call也創建了一個連接到hostconnection,並且那個call先於當前的call將該連接放到了連接池當中,這時候我們這個call就嘗試從連接池中去獲取連接,如果滿足「連接合並」的特性,則復用另外一個call的連接,當前call新建的連接則將其關閉,節省資源。

另外,如果在新建了連接后,第三次從連接池又成功獲取了連接,那么會將成功新建的連接的路由 route 記錄在 nextRouteToTry 變量里面,如果連接池獲取的連接被判定為不健康的話,那么重新調用 ExchangeFinder::findConnection 方法獲取連接的時候,可以直接使用 nextRouteToTry 記錄的路由去新建一個 TCP 連接,而不用再去路由列表里面找一個路由。

整個流程梳理

ConnectInterceptor攔截器的作用是獲得一個健康可用的與目標服務器的連接,然后就交給下一個攔截器處理。獲取健康可用的連接的調用鏈如下:

ConnectInterceptor::intercept
->RealCall::initExchange
->ExchangeFinder::find
->ExchangeFinder::findHealthyConnection
->ExchangeFinder::findConnection
  • ExchangeFinder::findConnection中,會去獲取一個可用的連接,里面又分為了5步。

  • ExchangeFinder::findHealthyConnection中,會判斷獲取到的連接是否健康,若健康則返回,若不健康,在有可以嘗試的路由的前提下,繼續調用findConnection方法獲取一個可用的連接。

  • ExchangeFinder::find方法中,會通過已獲取的健康可用的連接,去創建一個ExchangeCodec對象,該對象負責對Http請求進行編碼和對Http響應進行解碼,此時,有了與服務器的連接處理I/O的ExchangeCodec對象,我們其實就可以和服務器進行通信了。

  • RealCall::initExchange方法中,會利用ExchangeCodec實例,創建了一個Exchange對象,我們可以認為Exchange是封裝ExchangeCodec的一個工具類,Exchange負責連接管理,而ExchangeCodec負責處理實際的I/O

  • 接着在ConnectInterceptor::intercept方法中,將獲取到的Exchange對象放到攔截器鏈RealInterceptorChain里面,此時已經獲得了一個健康可用的與目標服務器的連接,然后將請求交給下一個攔截器處理。

大致流程如下:

image-20211209161223068

參考

  1. OkHttp 源碼剖析系列(六)——連接復用機制及連接的建立 - 掘金.
  2. 一條鏈上七個娃—從網絡請求過程看OkHttp攔截器.
  3. HTTP/2 connection coalescing | daniel.haxx.se


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM