最近調試Spark Streaming + Kafka,遇到一個連接錯誤, 任務日志如下。
2019-06-22 15:36:47,516 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | Kafka version : 1.0.1 | org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109)
2019-06-22 15:36:47,516 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | Kafka commitId : c0518aa65f25317e | org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set key.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set value.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set auto.offset.reset to none, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set group.id to spark-kafka-source-be3ec2f0-7031-4057-a255-1eb47dee271b--2020720649-executor, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,521 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set enable.auto.commit to false, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,521 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set receive.buffer.bytes to 65536 | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,537 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | Starting new streaming query. | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,614 | WARN | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-be3ec2f0-7031-4057-a255-1eb47dee271b--2020720649-driver-0] Connection to node -1 could not be established. Broker may not be available. | org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)
2019-06-22 15:36:47,617 | WARN | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-be3ec2f0-7031-4057-a255-1eb47dee271b--2020720649-driver-0] Connection to node -2 could not be established. Broker may not be available. | org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)
報錯之前,ConsumerConfig 被打印出來,其中bootstrap.servers顯示我們指定的brokers列表。
2019-06-22 15:36:47,463 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.80.123.9:9092, 10.80.123.17:9092, 10.80.123.20:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
根據kafkatool通過ZK可以成功連接的現象,查看/etc/hosts,發現brokers的地址是10.80.124.x;采用Java測試代碼驗證,發現10.80.124.x確實可以工作。結合代碼分析(見下文),判斷我們指定的bootstrap.servers參數有誤。
接下來,從源代碼分析NetworkClient是如何工作的。
Spark Streaming 內置提供2種類型的輸入源:
- 基本輸入源,比如文件系統或socket連接
- 高級輸入源,Kafka, Flume, Kinesis等需要依賴額外的工具類。
具體包【類】依賴關系如下:
spark-streaming-kafka-010【Subscribe】
【+】 kafka-clients【KafkaConsumer】
【+】 kafka-clients【ConsumerNetworkClient】
【+】 kafka-clients【NetworkClient】
Subscribe在指定的TopicPartition和offset調用onStart並返回接口Comsumer對象。該對象初始化過程創建了NetworkClient對象。
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
int heartbeatIntervalMs = config.getInt("heartbeat.interval.ms");
NetworkClient netClient = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, channelBuilder, logContext), this.metadata, clientId, 100, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), ClientDnsLookup.forConfig(config.getString("client.dns.lookup")), this.time, true, new ApiVersions(), throttleTimeSensor, logContext);
this.client = new ConsumerNetworkClient(logContext, netClient, this.metadata, this.time, this.retryBackoffMs, config.getInt("request.timeout.ms"), heartbeatIntervalMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
this.coordinator = this.groupId == null ? null : new ConsumerCoordinator(logContext, this.client, this.groupId, maxPollIntervalMs, sessionTimeoutMs, new Heartbeat(this.time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, this.retryBackoffMs), this.assignors, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, this.time, this.retryBackoffMs, enableAutoCommit, config.getInt("auto.commit.interval.ms"), this.interceptors, config.getBoolean("exclude.internal.topics"), config.getBoolean("internal.leave.group.on.close"));
this.fetcher = new Fetcher(logContext, this.client, config.getInt("fetch.min.bytes"), config.getInt("fetch.max.bytes"), config.getInt("fetch.max.wait.ms"), config.getInt("max.partition.fetch.bytes"), config.getInt("max.poll.records"), config.getBoolean("check.crcs"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricsRegistry.fetcherMetrics, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel);
} catch (Throwable var24) {
this.close(0L, true);
throw new KafkaException("Failed to construct kafka consumer", var24);
}
NetworkClient主要用於和Kafka集群節點(Node)保持連接和通訊。
public List<ClientResponse> poll(long timeout, long now) { this.ensureActive(); if (!this.abortedSends.isEmpty()) { List<ClientResponse> responses = new ArrayList(); this.handleAbortedSends(responses); this.completeResponses(responses); return responses; } else { long metadataTimeout = this.metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, new long[]{metadataTimeout, (long)this.defaultRequestTimeoutMs})); } catch (IOException var10) { this.log.error("Unexpected error during I/O", var10); } long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList(); this.handleCompletedSends(responses, updatedNow); this.handleCompletedReceives(responses, updatedNow); this.handleDisconnections(responses, updatedNow); this.handleConnections(); this.handleInitiateApiVersionRequests(updatedNow); this.handleTimedOutRequests(responses, updatedNow); this.completeResponses(responses); return responses; } }
當要連接的node斷開之后,會保留在selectable對象中並在每次poll時檢查並輸出異常。
private void handleDisconnections(List<ClientResponse> responses, long now) {
Iterator var4 = this.selector.disconnected().entrySet().iterator();
while(var4.hasNext()) {
Entry<String, ChannelState> entry = (Entry)var4.next();
String node = (String)entry.getKey();
this.log.debug("Node {} disconnected.", node);
this.processDisconnection(responses, node, now, (ChannelState)entry.getValue());
}
if (this.selector.disconnected().size() > 0) {
this.metadataUpdater.requestUpdate();
}
}
在processDisconnection方法中,處理NOT_CONNECTED的情況。
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) { this.connectionStates.disconnected(nodeId, now); this.apiVersions.remove(nodeId); this.nodesNeedingApiVersionsFetch.remove(nodeId); switch(disconnectState.state()) { case AUTHENTICATION_FAILED: AuthenticationException exception = disconnectState.exception(); this.connectionStates.authenticationFailed(nodeId, now, exception); this.metadataUpdater.handleAuthenticationFailure(exception); this.log.error("Connection to node {} ({}) failed authentication due to: {}", new Object[]{nodeId, disconnectState.remoteAddress(), exception.getMessage()}); break; case AUTHENTICATE: this.log.warn("Connection to node {} ({}) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.", nodeId, disconnectState.remoteAddress()); break; case NOT_CONNECTED: this.log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress()); } Iterator var8 = this.inFlightRequests.clearAll(nodeId).iterator(); while(var8.hasNext()) { NetworkClient.InFlightRequest request = (NetworkClient.InFlightRequest)var8.next(); this.log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected", new Object[]{request.header.apiKey(), request.request, request.header.correlationId(), nodeId}); if (!request.isInternalRequest) { responses.add(request.disconnected(now, disconnectState.exception())); } else if (request.header.apiKey() == ApiKeys.METADATA) { this.metadataUpdater.handleDisconnection(request.destination); } } }
全文完。