[2018-11-23 15:35:14,958] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:748)
原因:新版sdk訪問較舊版的kafka, 發送kafka不支持的request
- 當前用的kafka版本為0.9.0.1, 支持的request最大id為16, 這個18是新版 kafka中的ApiVersion Request, 因此會拋這個異常出來;
-
- 跟了一下代碼, 在
SocketServer
中: - try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) }
-
在處理Request時並未處理這個異常,導致這個異常被其外層的
try...catch...
處理, 直接進入了下一輪的selector.poll(300)
, 而在這個selector.poll(300)
中會清理之前所有的接收到的Requests, 這就導致在這種情況下,可能會漏處理一些Request, 這樣看起來還是個比較嚴重的問題; -
selector.completedReceives.asScala.foreach { receive => var isClose = false try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) case e : ArrayIndexOutOfBoundsException => error("NotSupport Request | Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) } if (!isClose) { selector.mute(receive.source) } }
- 跟了一下代碼, 在