Kafka網絡模型和通信流程剖析


1.概述

最近有同學在學習Kafka的網絡通信這塊內容時遇到一些疑問,關於網絡模型和通信流程的相關內容,這里筆者將通過這篇博客為大家來剖析一下這部分內容。

2.內容

Kafka系統作為一個Message Queue,涉及到的網絡通信主要包含以下兩個方面:

  • Pull:Consumer從消息隊列中拉取消息數據;
  • Push:Producer往消息隊列中推送消息數據。

要實現高性能的網絡通信,可以使用更加底層的TCP協議或者UDP協議來實現。Kafka在Producer、Broker、Consumer之間設計了一套基於TCP層的通信協議,這套協議完全是為了Kafka系統自身需求而定制實現的。

提示:
這里需要注意的是,由於UDP協議是一種不可靠的傳輸協議,所以Kafka系統采用TCP協議作為服務間的通信協議。

2.1 基本數據類型

通信協議中的基本數據類型分為以下幾種:

  • 定長數據類型:例如,int8、int16、int32和、int64,對應到Java語言中,分別是byte、short、int和long
  • 可變數據類型:例如,Java語言中Map、List等
  • 數組:例如,Java語言中的int[]、String[]等

2.2 通信模型

Kafka系統采用的是Reactor多線程模型,即通過一個Acceptor線程處理所有的新連接,通過多個Processor線程對請求進行處理(比如解析協議、封裝請求、、轉發等)。

提示:
Reactor是一種事件模型,可以將請求提交到一個或者多個服務程序中進行處理。
當收到Client的請求后,Server處理程序使用多路分發策略,由一個非阻塞的線程來接收所有的請求,然后將這些請求轉發到對應的工作線程中進行處理。

之后,在Kafka的版本迭代中,新增了一個Handler模塊,它通過指定的線程數對請求進行處理。Handler和Processor之間通過一個Block Queue進行連接。如下圖所示:

 

 

 

這里 Acceptor是一個繼承於AbstractServerThread的線程類,Acceptor的主要目的是監聽並且接收Client的請求,同時,建立數據傳輸通道(SocketChannel),然后通過輪詢的方式交給一個Processor處理。其核心代碼在Acceptor的run方法中,代碼如下:

def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }

這里還有一個塊通道(BlockingChannel),用於連接Processor和Handler,其代碼如下所示:

class BlockingChannel( val host: String, 
                       val port: Int, 
                       val readBufferSize: Int, 
                       val writeBufferSize: Int, 
                       val readTimeoutMs: Int ) extends Logging {
  private var connected = false
  private var channel: SocketChannel = null
  private var readChannel: ReadableByteChannel = null
  private var writeChannel: GatheringByteChannel = null
  private val lock = new Object()
  private val connectTimeoutMs = readTimeoutMs
  private var connectionId: String = ""

  def connect() = lock synchronized  {
    if(!connected) {
      try {
        channel = SocketChannel.open()
        if(readBufferSize > 0)
          channel.socket.setReceiveBufferSize(readBufferSize)
        if(writeBufferSize > 0)
          channel.socket.setSendBufferSize(writeBufferSize)
        channel.configureBlocking(true)
        channel.socket.setSoTimeout(readTimeoutMs)
        channel.socket.setKeepAlive(true)
        channel.socket.setTcpNoDelay(true)
        channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)

        writeChannel = channel
        // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout
        // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
        readChannel = Channels.newChannel(channel.socket().getInputStream)
        connected = true
        val localHost = channel.socket.getLocalAddress.getHostAddress
        val localPort = channel.socket.getLocalPort
        val remoteHost = channel.socket.getInetAddress.getHostAddress
        val remotePort = channel.socket.getPort
        connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
        // settings may not match what we requested above
        val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
        debug(msg.format(channel.socket.getSoTimeout,
                         readTimeoutMs,
                         channel.socket.getReceiveBufferSize, 
                         readBufferSize,
                         channel.socket.getSendBufferSize,
                         writeBufferSize,
                         connectTimeoutMs))

      } catch {
        case _: Throwable => disconnect()
      }
    }
  }
  
  def disconnect() = lock synchronized {
    if(channel != null) {
      swallow(channel.close())
      swallow(channel.socket.close())
      channel = null
      writeChannel = null
    }
    // closing the main socket channel *should* close the read channel
    // but let's do it to be sure.
    if(readChannel != null) {
      swallow(readChannel.close())
      readChannel = null
    }
    connected = false
  }

  def isConnected = connected

  def send(request: RequestOrResponse): Long = {
    if(!connected)
      throw new ClosedChannelException()

    val send = new RequestOrResponseSend(connectionId, request)
    send.writeCompletely(writeChannel)
  }
  
  def receive(): NetworkReceive = {
    if(!connected)
      throw new ClosedChannelException()

    val response = readCompletely(readChannel)
    response.payload().rewind()

    response
  }

  private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
    val response = new NetworkReceive
    while (!response.complete())
      response.readFromReadableChannel(channel)
    response
  }

}

 

3.通信過程

Kafka系統的通信框架也是經過了不同的版本迭代的。例如,在Kafka老的版本中,以NIO作為網絡通信的基礎,通過將多個Socket連接注冊到一個Selector上進行監聽,只用一個線程就能管理多個連接,這極大的節省了多線程的資源開銷。

在Kafka之后的新版本中,依然以NIO作為網絡通信的基礎,也使用了Reactor多線程模型,不同的是,新版本將具體的業務處理模塊(Handler模塊)獨立出去了,並用單獨的線程池進行控制。如下圖所示:

 

 通過上圖,我們可以總結一下Kafka的通信流程:

  • Client向Server發送請求時,Acceptor負責接收TCP請求,連接成功后傳遞給Processor線程;
  • Processor線程接收到新的連接后,將其注冊到自身的Selector中,並監聽READ事件
  • 當Client在當前連接對象上寫入數據時,會觸發READ事件,根據TCP協議調用Handler進行處理
  • Handler處理完成后,可能會有返回值給Client,並將Handler返回的結果綁定Response端進行發送

通過總結和分析,我們可以知道Kafka新版中獨立Handler模塊,用這樣以下幾點優勢:

  • 能夠單獨指定Handler的線程數,便於調優和管理
  • 防止一個過大的請求阻塞一個Processor線程
  • Request、Handler、Response之間都是通過隊列來進行連接的,這樣它們彼此之間不存在耦合現象,對提升Kafka系統的性能很有幫助

這里需要注意的是,在Kafka的網絡通信中,RequestChannel為Processor線程與Handler線程之間數據交換提供了一個緩沖區,是通信中Request和Response緩存的地方。因此,其作用就是在通信中起到了一個數據緩沖隊列的作用。Processor線程將讀取到的請求添加至RequestChannel的全局隊列(requestQueue)中,Handler線程從請求隊列中獲取並處理,處理完成后將Response添加至RequestChannel的響應隊列(responseQueues)中,通過responseListeners喚醒對應的Processor線程,最后Processor線程從響應隊列中取出后發送到Client。實現代碼如下:

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

  newGauge(
    "RequestQueueSize",
    new Gauge[Int] {
      def value = requestQueue.size
    }
  )

  newGauge("ResponseQueueSize", new Gauge[Int]{
    def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
  })

  for (i <- 0 until numProcessors) {
    newGauge("ResponseQueueSize",
      new Gauge[Int] {
        def value = responseQueues(i).size()
      },
      Map("processor" -> i.toString)
    )
  }

  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

  /** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }

  /** No operation to take for the request, need to read more over the network */
  def noOperation(processor: Int, request: RequestChannel.Request) {
    responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
    for(onResponse <- responseListeners)
      onResponse(processor)
  }

  /** Close the connection for the request */
  def closeConnection(processor: Int, request: RequestChannel.Request) {
    responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
    for(onResponse <- responseListeners)
      onResponse(processor)
  }

  /** Get the next request or block until specified time has elapsed */
  def receiveRequest(timeout: Long): RequestChannel.Request =
    requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
    requestQueue.take()

  /** Get a response for the given processor if there is one */
  def receiveResponse(processor: Int): RequestChannel.Response = {
    val response = responseQueues(processor).poll()
    if (response != null)
      response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
    response
  }

  def addResponseListener(onResponse: Int => Unit) {
    responseListeners ::= onResponse
  }

  def shutdown() {
    requestQueue.clear()
  }
}

4.總結

通過認真閱讀和分析Kafka的網絡通信層代碼,可以收獲不少關於NIO的網絡通信知識。通過對Kafka的源代碼進行閱讀和學習,這對大規模Kafka集群性能的調優和問題定位排查是很有幫助的。

5.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM