這篇博客側重於了解OkHttp的網絡部分,包括Socket的創建、連接,連接池等要點。OkHttp對Socket的流操作使用了Okio進行了封裝,本篇博客不做介紹,想了解的朋友可以參考拆輪子系列:拆Okio。
OkHttp中關於網絡的幾個概念
下面的主要翻譯自OkHttp的官方文檔,查看原文.
URL
URLs(比如https://github.com/square/okhttp)是HTTP和網絡的基礎,不止指定了Web上的資源,還指定了如何獲取該資源。
Address
Address(比如github.com)指定了一個webserver和所有連接到該服務器的必需的靜態配置:端口、HTTPS設置和首選網絡協議(HTTP/2或SPDY)。
URLs屬於同一個address的可以共享同一個底層的Socket連接。共享一個連接具有顯著的性能優勢:低延遲、高吞吐量(由於TCP慢啟動)和省電。OkHttp使用連接池自動再利用HTTP/1.x的連接,復用HTTP/2和SPDY的連接。
在OkHttp中,address的一些字段來自URL(模式、主機名、端口),剩下的部分來自OkHttpClient。
Routes
Routes提供真正連接到一個網絡服務器所需的動態信息。這指定了嘗試的IP地址(或者進過DNS查詢得到的地址)、使用的代理服務器(如果使用了ProxySelector)和使用哪個版本的TLS進行談判。(對於HTTPS連接)
對於一個地址,可能有多個路由。舉個例子,一個網路服務器托管在多個數據中心,那么在DNS中可能會產生多個IP地址。
Connections
當請求一個URL時,OkHttp會做以下幾件事情:
1. 使用URL和配置好的OkHttpClient創建一個address。這個地址指明了我們將如何連接網絡服務器。
2. 嘗試從連接池中得到該地址的一條連接
3. 如果在連接池中沒有找到一條連接,那么選擇一個route進行嘗試。通常這意味着做一個DNS請求得到服務器IP的地址,必要時會選擇一個TLS版本和一個代理服務器。
4. 如果是一條新的路由,那么建立一條直接的socket連接或TLS通道(HTTPS使用HTTP代理)或一個直接的TLS連接。
5. 發送HTTP請求,讀取響應。
如果連接出現了問題,OkHttp會選擇另外一條路由進行再次嘗試。這使得OkHttp在一個服務器的一些地址不可到達時仍然可用。
一旦讀取到響應后,連接將會退還到連接池中以便可以復用。連接在池中閑置一段時間后將會被釋放。
結合源碼進行分析
Address的創建
Address的創建在RetryAndFollowupInterceptor中的createAddress方法中,代碼如下:
private Address createAddress(HttpUrl url) { SSLSocketFactory sslSocketFactory = null; HostnameVerifier hostnameVerifier = null; CertificatePinner certificatePinner = null; //如果是HTTPS協議 if (url.isHttps()) { sslSocketFactory = client.sslSocketFactory(); hostnameVerifier = client.hostnameVerifier(); certificatePinner = client.certificatePinner(); } //可以看到Address的構造方法中的一部分參數由URL提供,一部分由OkHttpClient提供 return new Address(url.host(), url.port(), client.dns(), client.socketFactory(), sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(), client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector()); }
從代碼中可以看出,Address的信息一部分由URL提供,主要包括主機名和端口;另一部分由OkHttpClient提供,如dns、socketFactory等等。
根據HttpUrl是否是HTTPS,創建sslSocketFactory等字段,而在Address的構造方法中,則根據sslSocketFactory是否為null判斷是HTTP模式還是HTTPS模式。
StreamAllocation的創建
StreamAllocation類負責管理連接、流和請求三者之間的關系。其創建在RetryAndFollowupInterceptor的intercept方法中,使用OkHttpClient的連接池以及上面創建的Address進行初始化,代碼如下:
streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url()))
其中client的連接池是在OkHttpClient.Builder中設置的,而其設置在Builder的構造方法中,調用的是ConnectionPool的默認構造方法,代碼如下:
public Builder() {
... //默認連接池 connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; }
- 1
- 2
- 3
下面是ConnectionPool的構造方法:
/** * Create a new connection pool with tuning parameters appropriate for a single-user application. * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity. */ public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); } public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.maxIdleConnections = maxIdleConnections; this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration); // Put a floor on the keep alive duration, otherwise cleanup will spin loop. if (keepAliveDuration <= 0) { throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } }
從上面可以看到,默認的連接池的最大空閑連接數為5,最長存活時間為5min。
HttpStream和Connection的創建
在深入理解OkHttp源碼(二)——獲取響應中,我們知道了HttpStream以及Connection的創建都是在ConnectInterceptor攔截器中,代碼如下:
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpStream, connection); }
從上面的代碼可以看到,首先調用StreamAllocation的newStream方法就可以得到HttpStream對象,同時也就得到了Connection對象。下面首選從StreamAllocation的newStream()方法看起:
public HttpStream newStream(OkHttpClient client, boolean doExtensiveHealthChecks) { //得到連接時長、讀超時以及寫超時參數 int connectTimeout = client.connectTimeoutMillis(); int readTimeout = client.readTimeoutMillis(); int writeTimeout = client.writeTimeoutMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { //得到一個健康的連接 RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks); HttpStream resultStream; //如果協議是HTTP 2.x協議 if (resultConnection.framedConnection != null) { resultStream = new Http2xStream(client, this, resultConnection.framedConnection); } //協議是HTTP 1.x,設置連接底層的Socket屬性 else { resultConnection.socket().setSoTimeout(readTimeout); resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS); resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS); resultStream = new Http1xStream( client, this, resultConnection.source, resultConnection.sink); } synchronized (connectionPool) { stream = resultStream; return resultStream; } } catch (IOException e) { throw new RouteException(e); } }
從上面的代碼可以看出,首先從OkHttpClient中獲取連接超時、讀取超時、寫超時和是否連接失敗重試參數,然后試圖找到一條健康的連接,接下來是根據連接的framedConnection字段是否為null,得到Http2xStream或Http1xStram,前者是HTTP/2的實現,后者是HTTP/1.x的實現。
可以看到主要的邏輯肯定都在findHealthyConnection方法中,下面是findHeadlthyConnection方法的實現:
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { //死循環 while (true) { //得到一個候選的連接 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); // 如果是一個全新的連接,跳過額外的健康檢查 synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } } //如果候選連接通不過額外的健康檢查,那么繼續尋找一個新的候選連接 if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); continue; } return candidate; } }
從注釋中可以看到,該方法用於查找一條健康的連接並返回,如果連接不健康,那么會重復查找,直到查找到健康的連接。可以看到方法內是一個死循環,首先調用findConnection方法得到候選的連接,如果該連接是一個全新的連接,那么就直接返回不需要驗證是否健康,如果不是則需要驗證是否健康,如果不健康調用noNewStreams()方法后繼續下一次循環,否則返回。對於候選連接,總結一下就是下面幾種情況:
1. 候選連接是一個全新的連接,那么直接返回;
2. 候選連接不是一個全新的連接,但是是健康的,那么直接返回;
3. 候選連接不是一個全新的連接,並且不健康,那么繼續下一輪循環
經過上面的分析,我們查看findConnection()方法:
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) throws IOException { Route selectedRoute; //對連接池加鎖,因為可能會有別的線程加入連接或移除連接 synchronized (connectionPool) { if (released) throw new IllegalStateException("released"); if (stream != null) throw new IllegalStateException("stream != null"); if (canceled) throw new IOException("Canceled"); //首先嘗試使用本實例的連接 RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && !allocatedConnection.noNewStreams) { return allocatedConnection; } //其次,嘗試從連接池中得到連接 RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this); if (pooledConnection != null) { this.connection = pooledConnection; return pooledConnection; } selectedRoute = route; } if (selectedRoute == null) { selectedRoute = routeSelector.next(); synchronized (connectionPool) { route = selectedRoute; refusedStreamCount = 0; } } //根據路由創建新的連接 RealConnection newConnection = new RealConnection(selectedRoute); acquire(newConnection); //將得到的新連接加入連接池中並設置本實例的連接 synchronized (connectionPool) { Internal.instance.put(connectionPool, newConnection); this.connection = newConnection; if (canceled) throw new IOException("Canceled"); } //底層Socket連接 newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(), connectionRetryEnabled); routeDatabase().connected(newConnection.route()); return newConnection; }
從注釋中可以看出,該方法返回一個擁有新流的連接。首先檢查已存在的連接,其次連接池,最后建立一個新的連接。
從代碼中可以看出,首先對連接池加鎖,這兒的連接池是在創建StreamAllocation中傳入的,而那個參數是在創建OkHttpClient時就創建的,我們一般使用OkHttpClient時,都會將其做成單例,那么連接池就是唯一的,由於可能存在別的線程從連接池中執行插入以及連接池自身連接的清除工作,所以需要對其進行加鎖。首先獲取本對象的connection,如果不為null並且noNewStreams為false,那么直接使用本連接;如果不能使用本連接,那么嘗試從連接池中獲取連接,如果可以得到,那么直接返回,否則將進行下一步創建新連接;首先根據路由創建一個新的連接,然后調用acquire方法使連接持有該StreamAllocation對象,接下來將新的連接添加就連接池,最后調用connect方法進行連接。
這里面有一個Internal.instance的實例,Internal是一個抽象類,其具體實現instance初始化是在OkHttpClient的靜態初始化塊中,如下:
static { Internal.instance = new Internal() { @Override public void addLenient(Headers.Builder builder, String line) { builder.addLenient(line); } @Override public void addLenient(Headers.Builder builder, String name, String value) { builder.addLenient(name, value); } @Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) { builder.setInternalCache(internalCache); } @Override public boolean connectionBecameIdle( ConnectionPool pool, RealConnection connection) { return pool.connectionBecameIdle(connection); } @Override public RealConnection get( ConnectionPool pool, Address address, StreamAllocation streamAllocation) { return pool.get(address, streamAllocation); } @Override public void put(ConnectionPool pool, RealConnection connection) { pool.put(connection); } @Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) { return connectionPool.routeDatabase; } @Override public StreamAllocation callEngineGetStreamAllocation(Call call) { return ((RealCall) call).streamAllocation(); } @Override public void apply(ConnectionSpec tlsConfiguration, SSLSocket sslSocket, boolean isFallback) { tlsConfiguration.apply(sslSocket, isFallback); } @Override public HttpUrl getHttpUrlChecked(String url) throws MalformedURLException, UnknownHostException { return HttpUrl.getChecked(url); } @Override public void setCallWebSocket(Call call) { ((RealCall) call).setForWebSocket(); } };
首先看put方法,因為一開始時連接池中肯定是沒有連接的,Internal.instance的put方法調用了連接池的put方法,下面是ConnectionPool的put方法:
void put(RealConnection connection) { assert (Thread.holdsLock(this)); //如果清理線程沒有開啟,則開啟 if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); }
從代碼中可以看出,當第一個連接被添加就線程池時,開啟清除線程,主要清除那些連接池中過期的連接,然后將連接添加就connections對象中。下面看一下cleanupRunnable和connections的定義,其中connections是一個阻塞隊列。
private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { //得到下一次清除的等待時長 long waitNanos = cleanup(System.nanoTime()); //沒有連接了,清除任務終結 if (waitNanos == -1) return; //需要等待一定時長 if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; private final Deque<RealConnection> connections = new ArrayDeque<>();
可以看到cleadupRunnbale是一個死循環,調用cleanup方法進行清理工作並返回一個等待時長,如果有等待時長,那么讓連接池進行休眠。其中清理工作在cleanup方法中,代碼如下:
/** * Performs maintenance on this pool, evicting the connection that has been idle the longest if * either it has exceeded the keep alive limit or the idle connections limit. * * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns * -1 if no further cleanups are required. */ long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; // Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { //檢查每個連接 for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); //如果該連接正在運行,則跳過 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; } idleConnectionCount++; //查找出空閑時間最長的連接 long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } //如果時間超出規定的空閑時間或者數量達到最大空閑樹,那么移除。關閉操作在后面 if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { connections.remove(longestIdleConnection); } //如果時間和數量都沒到達上限,那么得到存活時間 else if (idleConnectionCount > 0) { return keepAliveDurationNs - longestIdleDurationNs; } //如果所有連接都在使用中,返回最大存活時間 else if (inUseConnectionCount > 0) { return keepAliveDurationNs; } //沒有連接,關閉清除線程 else { cleanupRunning = false; return -1; } } //關閉連接底層的Socket closeQuietly(longestIdleConnection.socket()); // 再次執行清除 return 0; }
從代碼中可以看出,對當前連接池中保存的所有連接進行遍歷,然后調用pruneAndGetAllocationCount()方法獲取連接上可用的StreamAllocation的數量以及刪除不可用的StreamAllocation,如果數量大於0,則表示該連接還在使用,那么繼續下一次遍歷;否則空閑連接數+1,需要查找出所有不可用的連接中最大的空閑時間。遍歷做完后,根據不同情況不同的值返回不同的結果,一旦找到了最大的空閑連接,那么在同步塊外部調用closeQuietly關閉連接。
pruneAndGetAllocationCount()方法用於刪除連接上不可用的StreamAllocation以及可用的StreamAllocation的數量,下面是其具體實現:
/** * Prunes any leaked allocations and then returns the number of remaining live allocations on * {@code connection}. Allocations are leaked if the connection is tracking them but the * application code has abandoned them. Leak detection is imprecise and relies on garbage * collection. */ private int pruneAndGetAllocationCount(RealConnection connection, long now) { //得到關聯在連接上StramAllocation對象列表 List<Reference<StreamAllocation>> references = connection.allocations; for (int i = 0; i < references.size(); ) { Reference<StreamAllocation> reference = references.get(i); //可用 if (reference.get() != null) { i++; continue; } // We've discovered a leaked allocation. This is an application bug. Platform.get().log(WARN, "A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?", null); references.remove(i); connection.noNewStreams = true; // If this was the last allocation, the connection is eligible for immediate eviction. if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return 0; } } return references.size(); }
需要注意的是for循環,i的控制在循環內部,如果StreamAllocation為null,那么直接刪除,如果連接沒有一個可用的StreamAllocation,那么設置連接的idleAtNanos為now-keepAliveDurationNs,即5分鍾之前。
至此,我們分析完了當創建了一個新連接,是如何被添加到線程池中的以及線程池的自動清除線程是如何工作的。下面看連接是如何建立連接的,在findConnection方法中,當創建了一個新的Connection后,調用了其connect方法,connect負責將客戶端Socket連接到服務端Socket,代碼如下:
public void connect(int connectTimeout, int readTimeout, int writeTimeout, List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) { if (protocol != null) throw new IllegalStateException("already connected"); RouteException routeException = null; ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); //不是HTTPS協議 if (route.address().sslSocketFactory() == null) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client")); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy")); } } while (protocol == null) { try { if (route.requiresTunnel()) { buildTunneledConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector); } else { buildConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector); } } catch (IOException e) { closeQuietly(socket); closeQuietly(rawSocket); socket = null; rawSocket = null; source = null; sink = null; handshake = null; protocol = null; if (routeException == null) { routeException = new RouteException(e); } else { routeException.addConnectException(e); } if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException; } } } }
主要看while循環處,如果HTTPS通道使用HTTP代理,那么調用buildTunneledConnection方法,否則調用buildConnection方法,如果出現異常,那么就在catch中做了一些清理工作,然后會繼續進入循環,因為將protocol置為了null。一般的請求都是直接調用buildConnection方法的,下面我們看buildConnection方法:
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */ private void buildConnection(int connectTimeout, int readTimeout, int writeTimeout, ConnectionSpecSelector connectionSpecSelector) throws IOException { connectSocket(connectTimeout, readTimeout); establishProtocol(readTimeout, writeTimeout, connectionSpecSelector); }
該方法做在raw socket上連接HTTP或HTTPS連接的准備工作,方法內部又是調用了另外兩個方法,下面分別介紹。
connectSocket為創建Socket以及連接Socket,代碼如下:
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy(); Address address = route.address(); //創建Socket rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); rawSocket.setSoTimeout(readTimeout); try { //連接Socket Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { throw new ConnectException("Failed to connect to " + route.socketAddress()); } //使用Okio封裝Socket的輸入輸出流 source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); }
從代碼可以看出,首先獲取代理和地址,然后根據代理的類型是使用SocketFactory工廠創建無參的rawSocket還是使用帶代理參數的Socket構造方法,得到了rawSocket對象后,設置讀超時,然后調用connectSocket進行Socket的連接,服務器的信息在route的socketAddress中,最后,得到rawSocket的輸入流和輸出流,這里使用了Okio進行了封裝,就不做過多設計了。
其中Plateform.get()方法返回不同平台的信息,因為OkHttp是可以用於Android和Java平台的,而Java又有多個版本,所以進行了平台判斷。get()是一個單例,其初始化在findPlatform方法中,如下:
private static Platform findPlatform() { Platform android = AndroidPlatform.buildIfSupported(); if (android != null) { return android; } Platform jdk9 = Jdk9Platform.buildIfSupported(); if (jdk9 != null) { return jdk9; } Platform jdkWithJettyBoot = JdkWithJettyBootPlatform.buildIfSupported(); if (jdkWithJettyBoot != null) { return jdkWithJettyBoot; } // Probably an Oracle JDK like OpenJDK. return new Platform(); }
可以看到findPlatform分為了android平台、jdk9、有JettyBoot的jdk還有默認的平台幾類。這邊看默認的Platform就可以了。下面看其socket方法:
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException { socket.connect(address, connectTimeout); }
可以看到就是調用socket的connect方法,至此,本地Socket與后台Socket建立了連接,並得到了輸入輸出流。
buildConnection方法中還有一個establishProtocol方法,該方法用於建立協議,設置protocol的值,這樣上面的循環就可以跳出了。代碼如下:
private void establishProtocol(int readTimeout, int writeTimeout, ConnectionSpecSelector connectionSpecSelector) throws IOException { //如果是HTTPS協議 if (route.address().sslSocketFactory() != null) { connectTls(readTimeout, writeTimeout, connectionSpecSelector); } //默認HTTP 1.1協議 else { protocol = Protocol.HTTP_1_1; socket = rawSocket; } if (protocol == Protocol.SPDY_3 || protocol == Protocol.HTTP_2) { socket.setSoTimeout(0); // Framed connection timeouts are set per-stream. FramedConnection framedConnection = new FramedConnection.Builder(true) .socket(socket, route.address().url().host(), source, sink) .protocol(protocol) .listener(this) .build(); framedConnection.start(); // Only assign the framed connection once the preface has been sent successfully. this.allocationLimit = framedConnection.maxConcurrentStreams(); this.framedConnection = framedConnection; } else { this.allocationLimit = 1; } }
可以看到該方法主要就是給protocol賦值,另外對於SPDY或HTTP/2協議有別的處理,這兒就不多介紹了。(==因為我自己目前也不懂,不過分析到這兒就已經足夠了==)。
至此,我們分析完了是如何新建一個連接,然后將其放入連接池以及真正地與后台建立連接的,這一切都是發生在ConnectInterceptor中,所以也就可以理解為什么這個攔截器要命名為連接攔截器了。
上面的代碼主要分析了新建連接,從上面的分析我們知道,還可以直接使用StreamAllocation的連接或從連接池中獲取連接。我們知道當提交請求后,每個請求被封裝成RealCall對象,而每個RealCall對象都只能被執行一次,RealCall對象持有RetryAndFollowupInterceptor,Connection又是RetryAndFollowupInterceptor持有的,那么如果發生重定向時,但是主機名相同,只是路徑不同時,那么將會是重用之前創建的Connection;而如果是兩個相同主機的不同請求,那么在第一個連接被創建放進線程池后,第二個請求的連接就可以從連接池中得到了。
findConnection方法中通過調用Internal.instance的get方法從連接池中獲取連接,而其get方法又是通過調用連接池的get方法,具體代碼如下:
/** Returns a recycled connection to {@code address}, or null if no such connection exists. */ RealConnection get(Address address, StreamAllocation streamAllocation) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.allocations.size() < connection.allocationLimit && address.equals(connection.route().address) && !connection.noNewStreams) { streamAllocation.acquire(connection); return connection; } } return null; }
從上面代碼中可以看出,get方法對連接池隊列遍歷,如果連接的StreamAllocation小於allocationLimit參數並且地址相等且連接的noNewStreams為false,那么將streamAllocation賦給連接。其中allocationLimit在協議為HTTP/1.x時為1,這也就意味着同一個Connection只能與一個StreamAllocation綁定,這就解釋了為什么官方文檔文檔說連接池重用HTTP/1.x連接,復用HTTP/2或SPDY連接。
發送請求和獲取響應
經過ConnectInterceptor后,為請求創建了Connection對象以及HttpStream對象,下面進入到CallServerInterceptor中發送請求和獲取響應,首先看CallServerInterceptor的intercept方法:
@Override public Response intercept(Chain chain) throws IOException {
HttpStream httpStream = ((RealInterceptorChain) chain).httpStream(); StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation(); Request request = chain.request(); long sentRequestMillis = System.currentTimeMillis(); //發送HTTP首部信息 httpStream.writeRequestHeaders(request); //如果HTTP方法允許有請求主體並且請求不為null,發送HTTP請求主體信息 if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { //Okio進行封裝發送數據 Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength()); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } httpStream.finishRequest(); //讀響應首部構建Response對象 Response response = httpStream.readResponseHeaders() .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); if (!forWebSocket || response.code() != 101) { response = response.newBuilder() .body(httpStream.openResponseBody(response)) .build(); } //服務端不支持HTTP持久連接,那么需要關閉該連接 if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } int code = response.code(); if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; }
可以看到寫請求和讀響應都是通過HttpStream對象,在前面的分析中知道了HttpStream的具體實現是Http1xStream或Http2xStream。我們主要看Http1xStream的各個實現,首先看寫頭部信息的writeRequestHeaders方法,下面是Http1xStream的具體實現:
@Override public void writeRequestHeaders(Request request) throws IOException { //得到請求行 String requestLine = RequestLine.get( request, streamAllocation.connection().route().proxy().type()); writeRequest(request.headers(), requestLine); }
該方法用戶將頭信息發送給服務端,首先獲取HTTP請求行(類似於“GET / HTTP/1.1”),然后調用writeRequest方法進行具體的寫操作,下面是writeRequest的實現:
public void writeRequest(Headers headers, String requestLine) throws IOException { if (state != STATE_IDLE) throw new IllegalStateException("state: " + state); sink.writeUtf8(requestLine).writeUtf8("\r\n"); for (int i = 0, size = headers.size(); i < size; i++) { sink.writeUtf8(headers.name(i)) .writeUtf8(": ") .writeUtf8(headers.value(i)) .writeUtf8("\r\n"); } sink.writeUtf8("\r\n"); state = STATE_OPEN_REQUEST_BODY; }
從代碼中可以看出,首先判斷狀態,狀態初始值為STATE_IDLE,表明如果在寫頭部信息之前做了別的操作,那么將會報錯,也就意味着必須首先進行寫頭部信息的操作;然后寫入請求行以及換行符,接下來就是對頭部信息做遍歷,逐個寫入,最后將狀態置為STATE_OPEN_REQUEST_BODY。
在寫完頭部信息之后,如果需要寫請求的主體部分,還會進行寫主體部分操作,當請求發送完成后,調用finishRequest方法就行刷新輸出流。
@Override public void finishRequest() throws IOException { sink.flush(); }
發送完請求之后,首先調用readResponseHeaders()獲取響應的頭部信息,然后構造Response對象,readResponseHeaders代碼如下:
@Override public Response.Builder readResponseHeaders() throws IOException { return readResponse(); } ** Parses bytes of a response header from an HTTP transport. */ public Response.Builder readResponse() throws IOException { if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) { throw new IllegalStateException("state: " + state); } try { while (true) { StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict()); Response.Builder responseBuilder = new Response.Builder() .protocol(statusLine.protocol) .code(statusLine.code) .message(statusLine.message) .headers(readHeaders()); if (statusLine.code != HTTP_CONTINUE) { state = STATE_OPEN_RESPONSE_BODY; return responseBuilder; } } } catch (EOFException e) { // Provide more context if the server ends the stream before sending a response. IOException exception = new IOException("unexpected end of stream on " + streamAllocation); exception.initCause(e); throw exception; } }
可以看到readResponseHeaders方法又調用了readResponse方法,而readResponse方法中首先對狀態進行判斷,然后進入一個死循環。首先獲取響應的狀態行(比如“H T T P / 1 . 1 2 0 0 T e m p o r a r y R e d i r e c t”)得到協議類型、狀態碼和消息,然后再調用readHeaders()方法讀取頭部信息,最后比較狀態碼不是100,那么說明請求發送完整了,那么將狀態置為STATE_OPEN_RESPONSE_BODY,然后返回響應,這時的響應中只有協議類型、狀態碼、消息和頭部信息。下面看一下readHeaders()方法是如何獲取頭部信息的:
/** Reads headers or trailers. */ public Headers readHeaders() throws IOException { Headers.Builder headers = new Headers.Builder(); // parse the result headers until the first blank line for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) { Internal.instance.addLenient(headers, line); } return headers.build(); }
- 1
可以看到每行遍歷直到第一個空行,然后調用Internal.instance的addLenient方法將這一行的信息解析並添加到頭部中,下面是addLenient方法的實現:
@Override public void addLenient(Headers.Builder builder, String line) { builder.addLenient(line); }
可以看到只是簡單的調用Builder的addLenient方法,那么繼續看Builder的addLenient方法:
Builder addLenient(String line) {
int index = line.indexOf(":", 1); if (index != -1) { return addLenient(line.substring(0, index), line.substring(index + 1)); } else if (line.startsWith(":")) { // Work around empty header names and header names that start with a // colon (created by old broken SPDY versions of the response cache). return addLenient("", line.substring(1)); // Empty header name. } else { return addLenient("", line); // No header name. } }
- 1
從上面的代碼可以看到,首先獲取“:”的位置,如果存在“:”,那么調用addLenient將名和值添加進列表中,如果以”:”開宇,則頭信息的名稱為空,有值;如果都沒有,那么沒有頭部信息名。三種情況都是調用addLenient方法,如下:
/** * Add a field with the specified value without any validation. Only appropriate for headers * from the remote peer or cache. */ Builder addLenient(String name, String value) { namesAndValues.add(name); namesAndValues.add(value.trim()); return this; }
其中,nameAndValues是一個字符串的列表。
到上面為此,讀取響應的頭部信息已經完成,接下來在CallServerInterceptor中做的是調用openResponseBody方法讀取響應的主體部分,方法如下:
@Override public ResponseBody openResponseBody(Response response) throws IOException { Source source = getTransferStream(response); return new RealResponseBody(response.headers(), Okio.buffer(source)); }
從代碼中可以看出,首先調用getTransferStream方法就行流轉換,因為傳入的Response中有頭部信息,而頭部信息中可能會有編碼的信息,所以需要就行轉換,然后再創建RealResponseBody對象返回。先看getTransferStream()方法的實現:
private Source getTransferStream(Response response) throws IOException { if (!HttpHeaders.hasBody(response)) { return newFixedLengthSource(0); } if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) { return newChunkedSource(response.request().url()); } long contentLength = HttpHeaders.contentLength(response); if (contentLength != -1) { return newFixedLengthSource(contentLength); } // Wrap the input stream from the connection (rather than just returning // "socketIn" directly here), so that we can control its use after the // reference escapes. return newUnknownLengthSource(); }
從代碼中可以看到一共可能有四種返回值,分別是以下四種情況:
1. 如果響應主體部分不應有內容,那么返回newFixedLengthSource(0)
2. 如果響應頭部中Transfer-Encoding為chunked,即分塊了,那么返回newChunkedSource
3. 如果響應中有個具體長度,那么返回newFixedLengthSource,並且指定長度
4. 以上情況均不滿足,返回newUnknownLengthSource
總結
至此,OkHttp的網絡部分講解結束。OkHttp中涉及到了幾個重要的類,StreamAllocation負責根據請求創建連接,可能是新建一個連接,可能是重用自己內部的連接,也有可能是從連接池中獲取連接;而連接的建立就涉及到了Socket的創建以及連接;當連接創建好后,就創建了HttpStream對象,負責操作底層Socket的輸出輸入流。
在整個OkHttp的工作流程中,在RetryAndFollowupInterceptor中創建StreamAllocation,在ConnectInterceptor中創建連接以及HttpStream對象,在CallServerInterceptor中操作HttpStream進行發送請求和讀取響應。