關於控制Broker端入站連接數的討論


Kafka Broker端處理請求采用Reactor模型。每台Broker上有個類似於Dispatcher的Acceptor線程,還有若干個處理請求的Processor線程(當然真正處理請求邏輯的線程不是Processor,實際上是KafkaRequestHandler)。每個Processor線程啟動后大致做以下這么幾件事情:

1. 設置新的入站連接

2. 處理新的請求響應(所謂的處理也就是放入到響應隊列中)

3. 執行Selector.select操作獲取那些准備完畢的IO操作

4. 接收新的入站請求

5. 執行已發送響應的回調邏輯

6. 處理已斷開連接

每個Broker啟動之后它創建的Processor線程會不停地執行以上這些動作,循環往復,直至Broker被關閉。

我們重點看看第一步中的邏輯,以下是1.1.1版本的源碼(選擇1.1.1版本不是特意的,其實所有2.3版本之前都是差不多的情形):

/**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        selector.register(connectionId(channel.socket), channel)
      } catch {
        // We explicitly catch all exceptions and close the socket to avoid a socket leak.
        case e: Throwable =>
          val remoteAddress = channel.socket.getRemoteSocketAddress
          // need to close the channel here to avoid a socket leak.
          close(channel)
          processException(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

注意我標成紅色的語句。基本上Processor線程設置新入站連接的方式就是一次性處理完才罷休。代碼中的newConnections是java.util.concurrent.ArrayBlockingQueue實例。Acceptor線程也會訪問newConnections,因此必須是線程安全的。

這種一次性處理完成才收手的做法在某些情況下是有風險的,比如當Kafka集群遭遇到DDOS攻擊時,外部IP會創建海量的入站連接全部砸向newConnections中。此時Processor線程運行時會一直嘗試消耗掉這些新連接,否則它不會干其他事情——比如處理請求等。換句話說,目前Kafka對新入站連接的處理優先級要高於已有連接。當遭遇連接風暴時,Kafka Broker端會優先處理新連接,因此可能造成已有連接上的請求處理被暫停,並最終導致超時。這樣客戶端得到請求超時通知后會會進一步地發送新的請求,因而出現雪崩效應。

 

另外Broker端維護每個連接也不是沒有開銷的。連接信息本身肯定要占用一些內容資源。如果是啟用了SSL的連接,Kafka為額外為其維護一個48KB的臨時緩沖區。因此一旦遭遇連接風暴,OOM錯誤是很常見的。

 

鑒於這些原因,社區在2.3版本改進了Broker端處理新連接請求的方式。首先阻塞隊列保存新連接的個數不再是沒有限制了,而是被固定為20,即每個Processor的新連接隊列最大就是20個連接——這個寫死在代碼里面了,目前沒法修改。第二、社區引入了新參數max.connections,用於控制Broker端所允許連接的最大連接數。你可以調節這個參數來控制一個Broker最多能接收多少個入站連接。這個參數可以在server.properties中被設置,也可以使用kafka-configs腳本動態修改。max.connections是全局性的,你也可以給每個監聽器設置不同的連接數上限。比如你的監聽器中同時使用了PLAINTEXT和SSL,那么你能夠使用listener.name.plaintext.max.connections和listener.name.ssl.max.connections來為這兩個listeners配置各自的連接數,命令如下:

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config max.connections=100$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.plaintext.max.connections=80
Completed updating config for broker: 0.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.ssl.max.connections=80
Completed updating config for broker: 0.

 

第三是Kafka Broker的每個Processor線程會在每輪任務結束之前嘗試去關閉多余的連接。判斷是否需要關閉多余連接的依據有兩點:1. 總的連接數超過了max.connections值;2. 你為Broker設置了多個監聽器,但Kafka會保護Broker內部連接使用的那個監聽器。比如你如果設置了多個監聽器:PLAINTEXT://9092, SSL://9093,SASL://9094,然后設置inter.broker.listener.name=SSL,那么SSL這套監聽器下的連接是不會被Processor強行關閉的。

 

最后提一句,如果所有Processor的阻塞隊列都滿了, 那么前面的Acceptor線程會阻塞住,不會再接收任何入站請求。社區新增加了一個JMX指標來計算Acceptor線程被阻塞的時間比例:kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listenerName}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM