org.apache.kafka.common.client.Selector實現了Selectable接口,用於提供符合Kafka網絡通訊特點的異步的、非阻塞的、面向多個連接的網絡I/O.
這些網絡IO包括了連接的創建、斷開,請求的發送和接收,以及一些網絡相關的metrics統計等功能。
所以,它實際上應該至少具體以下功能

使用
首先得談一下Selector這東西是准備怎么讓人用的。這個注釋里說了一部分:
A nioSelector interface for doing non-blocking multi-connection network I/O.
This class works with NetworkSend and NetworkReceive to transmit size-delimited network requests and responses.A connection can be added to the nioSelector associated with an integer id by doing
nioSelector.connect("42", new InetSocketAddress("google.com", server.port), 64000, 64000);
The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating the connection. The successful invocation of this method does not mean a valid connection has been established. Sending requests, receiving responses, processing connection completions, and disconnections on the existing connections are all done using the poll() call.
nioSelector.send(new NetworkSend(myDestination, myBytes));
nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
nioSelector.poll(TIMEOUT_MS);
The nioSelector maintains several lists that are reset by each call to poll() which are available via various getters. These are reset by each call to poll(). This class is not thread safe!
首先,Selector的API都是非阻塞或者帶有阻塞超時時間的,這個特點直接源於Java NIO的Selector和SocketChannel的特性。這種異步非阻塞的IO帶來的問題就是,必須時不時地調用某個方法,來檢測IO完成的進度情況,對於NIO的selector,這個方法就是select,對於Kafka的Selector,這個方法就是poll.
為此,注釋里舉了一個典型的例子,這是一個發送數據的例子:
nioSelector.send(new NetworkSend(myDestination, myBytes)); nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes)); nioSelector.poll(TIMEOUT_MS);
但是Kafka Selector的poll不僅檢測IO的進度,它還執行IO操作,比如當發現有channel可讀了,它就從中讀數據出來。那么,是否可以說Kafka的Selector執行的是異步IO呢?下面來談下這個問題。
異步IO vs 同步非阻塞IO
異步IO是說實際的IO動作是由操作系統調用另外的線程或者其它的計算資源來做的。那么,想要確定Selector執行的是否是異步IO,得先看下它所構建的Channel是哪一種,畢竟不是所有的channel都支持異步IO。
Selector創建channel的動作是在#connect(String, InetSocketAddress, int, int)方法中。
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false);
它是建了一個SocketChannel.而SocketChannel並不能進行異步IO,當它被設為no-blocking模式時,進行的是非阻塞的IO。在Java7中,引入了AsynchronizedSocketChannel,它進行的才是真正的異步IO。
參見
兩種高性能I/O設計模式(Reactor/Proactor)的比較
Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations
An NIO.2 primer, Part 1: The asynchronous channel APIs
內部狀態
由於Selector的各個方法是非阻塞的,因此需要保存每個操作當前的完成進度。比如,正在寫,寫完成,讀完成,連接建立成功,等。這樣在調用者調用了poll方法以后,調用者可以檢查各個操作完成的情況。
Selector內部的確有一些集合來保存這些信息:
private final Map<String, KafkaChannel> channels; //有正在連接以及連接成功的channel,注意它的類型是KafkaChannel private final List<Send> completedSends; //已發送完的請求 private final List<NetworkReceive> completedReceives; //已接收完成的響應。注意,這個集合並沒有包括所有已接收完成的響應,stagedReceives集合也包括了一些接收完成的響應 private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; //已接收完成,但還沒有暴露給用戶的響應 private final Set<SelectionKey> immediatelyConnectedKeys; //在調用SocketChannel#connect方法時立即完成的SelectionKey.為什么保存的是SelectionKey呢? private final List<String> disconnected; //已斷開連接的節點 private final List<String> connected; //新連接成功的節點 private final List<String> failedSends; //發送失敗的節點,但並不是由於IO異常導致的失敗,而是由於SelectionKey被cancel引起的失敗,比如對一個已關閉的channel設置interestOps
但是這里的集合有些並不是按照channel來組織的。比如:completedSend, completedReceives, disconnected, connected和failedSends。因為這些集合是在一個poll之后,Selector的使用者應該處理的,它們是按照類型組織。在poll執行的最開始,它會調用clear方法,清空這些集合,因為它們是上次poll的結果。所以,在一次poll之后查看這些結果的話,看到的就是這次poll的結果。
/** * Clear the results from the prior poll */ private void clear() { this.completedSends.clear(); this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); this.failedSends.clear(); }
這里之所以把failedSends加到disconnected之中,是因為failedSends里保存的失敗的send,並不是上次poll留下來的,而是上次poll之后,此次poll之前,調用send方法時添加到failedSends集合中的。當有failedSends時,selector就會關閉這個channel,因此在clear過程中,需要把failedSends里保存的節點加到disconnected之中。
需要注意的是,這些集合里並沒有包括正在發送以及正在接收的請求。原因是KafkaChannel對象本身持有正在處理的請求和響應。
public class KafkaChannel { private final String id; private final TransportLayer transportLayer; private final Authenticator authenticator; private final int maxReceiveSize; private NetworkReceive receive; private Send send; ... }
這里需要注意是是它的setSend和read方法
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; }
當一個send正在發送的過程中,send != null, 此時調用setSend會拋出IllegalStateException。那么,Selector在可以在一個poll之前可以往一個channel發送多個請求嗎?
canSendMore
這個需要需要追溯到哪些方法會調用KafkaChannel#setSend。結果只有NetworkClient的send(ClientRequest, long)方法會最終調到它。
而NetworkClient的send方法是這樣的
public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); doSend(request, now); } private boolean canSendRequest(String node) { return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); }
這里connectionStates.isConnected用來檢測節點是否已經連接上。selector.isChannelReady()用來檢測channel是否准備完成。由於Kafka security的一些要求,當socket channel連接建立完成后,可能還需要跟server交換一些認證數據,才能認為channel准備完畢。那么,重點就在於inFlightRequest.canSendMore這個方法了。因為如果它不檢測一個channel是否有正在發送的send,就可能會在調用NetworkClient#send時,再試圖給這個channel添加一個send,就會引發異常。
InFlightRequest保存了所有已發送,但還沒收到響應的請求。
InFlightRequests的canSendMore是這樣的:
public boolean canSendMore(String node) { Deque<ClientRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
重點在於queue.peekFirst().request().completed, 即如果發給這個節點的最早的請求還沒有發送完成,是不能再往這個節點發送請求的。
但是,從canSendMore方法中也可以看出,只要沒有超過maxInFlightRequestsPerConnection,一個node可以有多個in-flight request的。這點,實際上影響到了另一個集合的數據結構的選擇——stagedReceives
stagedReceives
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
stagedRecieves用來保存已經接收完成,但是還沒有暴露給用戶(即沒有放在completedReceive列表中)的NetworkReceive(即響應).
這里有兩個問題:
- stagedRecieves使用時完全是按照FIFO隊列來用的,因此為什么用Deque,而不用Queue?
- 為什么一個KafkaChannel會有多個NetworkRecieves
第二個問題的答案就是NetworkClient的canSendMore方法並沒有限制一個node只有在所有已發送請求都收到響應的情況下才能發送新請求。因此,一個node可以有多個in-flight request,也可以有多個已發送的請求。因此,Selector也就可能會收到來自於同一個node的多個響應。因此,selector在每次poll的時候,讀取請求的操作是這樣的:
/* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); }
也就是說,只要有可以完整讀出的響應,都會把這些響應放到stagedReceives列表中。這個while循環使得在一次poll中,可能會添加多個NetworkReceive到stagedReceives里。
但是,每次poll,只會把最早的一個NetworkReceive放在completedReceives里。
* checks if there are any staged receives and adds to completedReceives */ private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { Deque<NetworkReceive> deque = entry.getValue(); NetworkReceive networkReceive = deque.poll(); //從這個channel的stagedReceives隊列中取最早的一個 this.completedReceives.add(networkReceive);//把它添加到completedRecievs列表中 this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); if (deque.isEmpty()) iter.remove(); } } } }
這個行為比較奇怪。可能的解釋是這會簡化NetworkClient的實現,造成一種"對每個channel,poll一次只發送一個請求,只接收一個響應“的假像,使得NetworkClient的用戶更容易處理請求和響應之間的對應關系。既然poll是一個非阻塞操作,用戶就可以在未收到某個請求的響應時,多次調用poll,這個也沒什么問題。因為poll一次並不保證就能收到剛才發出的請求對應的響應。
至於第一個問題,則是由於性能的考慮。
addToStagedReceives方法用於把一個NetworkReceive加到某個channel的stagedReceivs隊列中。
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque<NetworkReceive>()); Deque<NetworkReceive> deque = stagedReceives.get(channel); deque.add(receive); }
如果這個channel沒有stagedReceives隊列,會給它建一個,此時new的是ArrayDeque對象。這個ArrayDeque是JDK中性能最高的FIFO隊列的實現,優於ArrayList和linkedList.
詳見What is the fastest Java collection with the basic functionality of a Queue?
immediatelyConnectedKeys
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.channels.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); boolean connected; try { connected = socketChannel.connect(address); } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; } SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); key.attach(channel); this.channels.put(id, channel); if (connected) { // OP_CONNECT won't trigger for immediately connected channels log.debug("Immediately connected to node {}", channel.id()); immediatelyConnectedKeys.add(key); key.interestOps(0); } }
雖然在connect方法中,SocketChannel被設為non-blocking, 然后調用socketChannel.connect(address),雖然是非阻塞模式,但是connect方法仍然有可能會直接返回ture,代表連接成功。connect方法的doc是這么說的:
If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.
比如,如果是一個本地的連接,就可能在非阻模式下也會立即返回連接成功。也是挺神奇的,想一想,如果認為”執行指令“是一種阻塞的話,絕對意義上的非阻塞方法是不存在的,不存在執行時間為零的方法。也就是說,如果進行一個本地連接,OS加上JVM是可以在有限的指令數量和時間段內確定連接成功,這也可以被認為是在非阻塞狀態下進行的。
lruConnection
在前邊的connect方法中,socket被配置了keepAlive,可以檢測出來連接斷開的情況。但是,還有一種情況需要考慮,就是一個連接太久沒有用來執行讀寫操作,為了降低服務器端的壓力,需要釋放這些的連接。所以Selector有LRU機制,來淘汰這樣的連接。
在Java里,實現LRU機制最簡單的就是使用LinkedHashMap, Selector也的確是這么做的。
private final Map<String, Long> lruConnections; this.lruConnections = new LinkedHashMap<>(16, .75F, true);
lruConnection的key是node的id, value是上次訪問的時間。它的“順序”被設為access順序。Selector會用map的put操作來access這個map,當NIO的selector poll出來一批SelectionKey之后,這些key對應的node被重新put進map,以刷新它們的最近訪問順序,同時也把具體的“最近使用時間”作為entry的value放在這個map中。
這發生在會被每次poll調用的pollSelectionKeys方法中
lruConnections.put(channel.id(), currentTimeNanos);
之所以要在value中保存最近使用時間,是因為這個時間會被用於計算空閑時間,當空閑時間超過了connectionMaxIdleMs時,就會關閉這個連接。
在poll的最后,會執行maybeCloseOldestConnection方法,來檢測並關閉需要關閉的連接。
private void maybeCloseOldestConnection() { if (currentTimeNanos > nextIdleCloseCheckTime) { if (lruConnections.isEmpty()) { nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; } else { Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime) { String connectionId = oldestConnectionEntry.getKey(); if (log.isTraceEnabled()) log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); disconnected.add(connectionId); close(connectionId); } } } }
這里有幾點要注意:
- 並不是每次poll都需要執行實際的檢測。假如在某一時刻,我們得知了此時的least recently used node的access時間,那么以后最先過期的肯定是這個node,因此下一次檢測的時間應至少是這個 access time of LRU node + maxIdleTime. 所以在代碼中,使用這段代碼來重置nextIdelCloseCheckTime
Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
- maybeCloseOldestConnection每調用一次,最多只關閉一個連接。但是,在關閉連接時,它並沒有根據移除node后的新的LRU node來重置 nextIdelCloseCheckTime。所以下一次調用maybeCloseOldestConnection時,if的判斷條件肯定為true,因此會繼續檢測並關閉連接。
這種做法有些不妥,因為這樣做的話一個poll並不能關閉所有應該關閉的空閑連接,不能指望用戶接下來會主動地多poll幾次。
總結
Kafka使用這個抽象出來的Selector的確比直接使用NIO在編程上要好一些,主要是代碼會不那么臃腫,因為Selector配合KafkaChannel、Send, NetworkReceive, 處理了NIO網絡編程的一些細節。Selector的這些代碼寫的也的確不錯。 不過,poll這個操作被搞得有些教條,被賦予了太多的責任,看起來是為了迎合Kafka的新consumer的特點搞出來的東西。這個東西讓人想起了回合制的游戲,設置好下一回合想干啥,點確定,然后就喝茶等了。
