一、Kafka連接超時異常
Kafka版本是2.7.1。Kafka客戶端報錯部分信息:
1 Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1636366763102, tries=1, nextAllowedTryMs=1636366763208) timed out at 1636366763108 after 1 attempt(s) 2 at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 3 at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 4 at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 5 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 6 at com.xxx.demo.kafka.KafkaApi.main(KafkaApi.java:21) 7 Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1636366763102, tries=1, nextAllowedTryMs=1636366763208) timed out at 1636366763108 after 1 attempt(s) 8 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: listNodes
報錯信息表示的很明白,是超時的異常。
二、排除常見的問題
首先,我的Kafka的服務是跑在IaaS上的,出現連接超時,一般先想到的是防火牆問題,沒有開放端口,接下來是在本地使用Telnet命令來檢測服務器的ip和port,telnet能通,又去查看了端口開放情況,排除了防火牆問題。而且服務器上部署都沒有問題,也能生產和消費信息。
三、從客戶端源碼進行探究
確認服務端的Kafka版本和客戶端的版本是否一致,版本號都是2.7.1。
1 <dependency> 2 <groupId>org.apache.kafka</groupId> 3 <artifactId>kafka-clients</artifactId> 4 <version>2.7.1</version> 5 </dependency>
創建Kafka客戶端時,實例化一個Selector實例,這個實例內部打開了一個Java NIO的Selector。
static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) { ... selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext); ... }
public Selector(int maxReceiveSize, long connectionMaxIdleMs, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, boolean recordTimePerConnection, ChannelBuilder channelBuilder, MemoryPool memoryPool, LogContext logContext) { try { this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); } ... }
Kafka客戶端實例好之后,客戶端線程就啟動了,while不斷循環去處理請求。
@Override public void run() { log.trace("Thread starting"); try { processRequests(); } finally { ... } }
接着,主程序調用客戶端的 describeCluster 方法來獲取一些Kafka集群的服務器配置,發起了一個新的Call,去選擇一個請求最少的節點Node並返回,
@Override public DescribeClusterResult describeCluster(DescribeClusterOptions options) { final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Set<AclOperation>> authorizedOperationsFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it // simplifies communication with older brokers) return new MetadataRequest.Builder(new MetadataRequestData() .setTopics(Collections.emptyList()) .setAllowAutoTopicCreation(true) .setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())); } ... }
這個新的Call就會被客戶端線程復制到pendingCall進行處理,為這個Call選擇一個節點進行處理。
1 // Choose nodes for our pending calls. 2 pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now)); 3 long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now); 4 if (metadataFetchDelayMs == 0) { 5 metadataManager.transitionToUpdatePending(now); 6 Call metadataCall = makeMetadataCall(now); 7 // Create a new metadata fetch call and add it to the end of pendingCalls. 8 // Assign a node for just the new call (we handled the other pending nodes above). 9 10 if (!maybeDrainPendingCall(metadataCall, now)) 11 pendingCalls.add(metadataCall); 12 }
選擇具有最少未完成請求且至少符合連接條件的節點,首先會去測試能不能連接。
1 @Override 2 public Node leastLoadedNode(long now) { 3 List<Node> nodes = this.metadataUpdater.fetchNodes(); 4 if (nodes.isEmpty()) 5 throw new IllegalStateException("There are no nodes in the Kafka cluster"); 6 int inflight = Integer.MAX_VALUE; 7 8 Node foundConnecting = null; 9 Node foundCanConnect = null; 10 Node foundReady = null; 11 12 int offset = this.randOffset.nextInt(nodes.size()); 13 for (int i = 0; i < nodes.size(); i++) { 14 int idx = (offset + i) % nodes.size(); 15 Node node = nodes.get(idx); 16 if (canSendRequest(node.idString(), now)) { 17 int currInflight = this.inFlightRequests.count(node.idString()); 18 if (currInflight == 0) { 19 // if we find an established connection with no in-flight requests we can stop right away 20 log.trace("Found least loaded node {} connected with no in-flight requests", node); 21 return node; 22 } else if (currInflight < inflight) { 23 // otherwise if this is the best we have found so far, record that 24 inflight = currInflight; 25 foundReady = node; 26 } 27 } else if (connectionStates.isPreparingConnection(node.idString())) { 28 foundConnecting = node; 29 } else if (canConnect(node, now)) { 30 if (foundCanConnect == null || 31 this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) > 32 this.connectionStates.lastConnectAttemptMs(node.idString())) { 33 foundCanConnect = node; 34 } 35 } else { 36 log.trace("Removing node {} from least loaded node selection since it is neither ready " + 37 "for sending or connecting", node); 38 } 39 }
Kafka客戶端的線處理主程序發起的請求。順着代碼走下去,發現 leastLoadedNode 方法返回的節點正是服務器的ip和port。所以當前這個結點來處理Call請求,緊接着是去給這個結點“打電話”。
1 pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
現在是打電話的流程,根據要打電話的名單列表中一個一個的打電話。
1 private long sendEligibleCalls(long now) { 2 long pollTimeout = Long.MAX_VALUE; 3 for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { 4 Map.Entry<Node, List<Call>> entry = iter.next(); 5 List<Call> calls = entry.getValue(); 6 if (calls.isEmpty()) { 7 iter.remove(); 8 continue; 9 } 10 Node node = entry.getKey(); 11 if (!client.ready(node, now)) { 12 long nodeTimeout = client.pollDelayMs(node, now); 13 pollTimeout = Math.min(pollTimeout, nodeTimeout); 14 log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout); 15 continue; 16 } 17 Call call = calls.remove(0); 18 int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs, 19 calcTimeoutMsRemainingAsInt(now, call.deadlineMs)); 20 AbstractRequest.Builder<?> requestBuilder; 21 try { 22 requestBuilder = call.createRequest(requestTimeoutMs); 23 } catch (Throwable throwable) { 24 call.fail(now, new KafkaException(String.format( 25 "Internal error sending %s to %s.", call.callName, node))); 26 continue; 27 } 28 ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, 29 true, requestTimeoutMs, null); 30 log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId()); 31 client.send(clientRequest, now); 32 getOrCreateListValue(callsInFlight, node.idString()).add(call); 33 correlationIdToCalls.put(clientRequest.correlationId(), call); 34 } 35 return pollTimeout; 36 }
client.ready 檢查節點的狀態,節點沒有狀態就會開始初始化連接,有狀態則會檢查通過Selector打開的KafkaChannel是否就緒,最后檢查能否向這個節點發送更多請求,具備這些請求就可以返回true,繼續對這個節點打電話。
1 public boolean ready(Node node, long now) { 2 if (node.isEmpty()) 3 throw new IllegalArgumentException("Cannot connect to empty node " + node); 4 5 if (isReady(node, now)) 6 return true; 7 8 if (connectionStates.canConnect(node.idString(), now)) 9 // if we are interested in sending to a node and we don't have a connection to it, initiate one 10 initiateConnect(node, now); 11 12 return false; 13 }
isReady(node, now) 中檢查給定 id 的節點是否准備好發送更多請求 canSendRequest 。
1 private boolean canSendRequest(String node, long now) { 2 return connectionStates.isReady(node, now) && selector.isChannelReady(node) && 3 inFlightRequests.canSendMore(node); 4 }
節點沒准備好就會初始化連接, connectionStates.canConnect 是沒有連接並且至少在最小重新連接退避周期內沒有連接的話就能初始化連接。
1 private void initiateConnect(Node node, long now) { 2 String nodeConnectionId = node.idString(); 3 try { 4 connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup); 5 InetAddress address = connectionStates.currentAddress(nodeConnectionId); 6 log.debug("Initiating connection to node {} using address {}", node, address); 7 selector.connect(nodeConnectionId, 8 new InetSocketAddress(address, node.port()), 9 this.socketSendBuffer, 10 this.socketReceiveBuffer); 11 } catch (IOException e) { 12 log.warn("Error connecting to node {}", node, e); 13 // Attempt failed, we'll try again after the backoff 14 connectionStates.disconnected(nodeConnectionId, now); 15 // Notify metadata updater of the connection failure 16 metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty()); 17 } 18 }
通過Kafka的Selector(底層用的還是NIO的Selector)進行連接節點,將這個連接添加到與給定 id 號關聯的這個 nioSelector。
1 @Override 2 public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { 3 ensureNotRegistered(id); 4 SocketChannel socketChannel = SocketChannel.open(); 5 SelectionKey key = null; 6 try { 7 configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize); 8 boolean connected = doConnect(socketChannel, address); 9 key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); 10 11 if (connected) { 12 // OP_CONNECT won't trigger for immediately connected channels 13 log.debug("Immediately connected to node {}", id); 14 immediatelyConnectedKeys.add(key); 15 key.interestOps(0); 16 } 17 } catch (IOException | RuntimeException e) { 18 if (key != null) 19 immediatelyConnectedKeys.remove(key); 20 channels.remove(id); 21 socketChannel.close(); 22 throw e; 23 } 24 }
但是這里進行連接時返回的是false,初始化連接失敗。回到打電話的主流程中,打電話這個節點就被跳過了,通話列表里只有這個節點。
最后,Kafka客戶端主線程每次while循環都會檢查每個Call的請求超時的Call請求,並將其移除請求Call列表。
最后因為連接不上Kafka服務器,連接超時,客戶端拋出超時異常。
最后才在控制台的輸出發現,Kafka客戶端一直嘗試連接的節點ip地址是127.0.0.1,這不是本地回環地址嗎,本地也沒有啟動Kafka,那應該是服務器配置問題了。因為在服務器上部署Kafka的時候並沒有什么異常,服務器上也能正常使用生產者和消費者的控制台命令,進行生產/消費消息,那應該是缺少什么才會導致尋找的結點IP錯誤。
1 19519 [kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node 0 (localhost.localdomain/127.0.0.1:9092) could not be established. Broker may not be available.
三、從Kafka服務器配置排查問題
然后轉向排查服務器配置,觀察命令行啟動Kafka服務器的輸出日志:
[2021-11-08 18:51:46,987] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null ... host.name=null ... listeners = null ... port = 9092 ... (kafka.server.KafkaConfig)
看到其配置參數的值有很多null,在config目錄下的server.properties啟動配置文件中,也只提到過listeners這個參數,還是注釋里的。此時把問題轉向Kafka的配置問題上,查查這些值為空的參數的作用。
listeners 由於 Kafka Connect 旨在作為服務運行,因此它還提供了用於管理連接器的 REST API。 可以使用 listeners 偵聽器配置選項配置 REST API 服務器。
advertised.host.name 已棄用,僅在用在未使用 listeners 或者使用了 advertised.listeners 的器情況下。有 advertised.listeners 完全可以替代此屬性。在 IaaS 環境中,這可能需要與代理綁定的接口不同。如果未設置,它將使用 host.name 的值(如果已配置)。否則它將使用從 java.net.InetAddress.getCanonicalHostName() 返回的值。
advertised.port 已棄用,使用情況同上。在 IaaS 環境中,這可能需要與代理綁定的端口不同。如果未設置,它將發布代理綁定到的相同端口。
advertised.listeners 如果與 listeners 配置屬性不同,則發布到 ZooKeeper 供客戶端使用的偵聽器。在 IaaS 環境中,這可能需要與代理綁定的接口不同。如果未設置,則將使用偵聽器的值。與偵聽器不同,通告 0.0.0.0 元地址是無效的。 同樣與偵聽器不同的是,此屬性中可以有重復的端口,因此可以將一個偵聽器配置為通告另一個偵聽器的地址。這在使用外部負載平衡器的某些情況下很有用。
四、得出結論
可以看出, advertised.listeners 就是關鍵,這個配置就是用來給定一個外部地址(外網ip),以便客戶端能正確的連接Kafka,這個沒有設定配置的話,就會默認使用listeners的值,這樣外網的客戶端就嘗試着去使用內網的ip去連接代理服務器,在IaaS外部使用其內網的ip去訪問根本無法連的上的,所以一直連不上最后會出現超時異常。
1 19519 [kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node 0 (localhost.localdomain/127.0.0.1:9092) could not be established. Broker may not be available.
五、解決辦法
Kafka服務器啟動的配置文件 server.properties 中正確配置 advertised.listeners ,把其配置成外網的ip,這樣,外網的客戶端就能連接的上Kafka了。
############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://外網ip:9092