ERROR Processor got uncaught exception.ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18


[2018-11-23 15:35:14,958] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:748)

原因:新版sdk訪問較舊版的kafka, 發送kafka不支持的request

  1. 當前用的kafka版本為0.9.0.1, 支持的request最大id為16, 這個18是新版 kafka中的ApiVersion Request, 因此會拋這個異常出來;
    1. 跟了一下代碼, 在SocketServer中:
    2. try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) }
    3. 在處理Request時並未處理這個異常,導致這個異常被其外層的 try...catch...處理, 直接進入了下一輪的 selector.poll(300), 而在這個 selector.poll(300)中會清理之前所有的接收到的Requests, 這就導致在這種情況下,可能會漏處理一些Request, 這樣看起來還是個比較嚴重的問題;
    4.  
      解決:
      selector.completedReceives.asScala.foreach { receive =>
                var isClose = false
      
                try {
                  val channel = selector.channel(receive.source)
                  val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
                    channel.socketAddress)
                  val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
                  requestChannel.sendRequest(req)
                } catch {
                  case e @ (_: InvalidRequestException | _: SchemaException) =>
                    // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
                    error("Closing socket for " + receive.source + " because of error", e)
                    isClose = true
                    close(selector, receive.source)
                  case e : ArrayIndexOutOfBoundsException =>
                    error("NotSupport Request | Closing socket for " + receive.source + " because of error", e)
                    isClose = true
                    close(selector, receive.source)
                }
                if (!isClose) {
                  selector.mute(receive.source)
                }
              }
      

        


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM