(依據於0.10.0.0版本)
這個接口的唯一實現類就是NetworkClient,它被用於實現Kafka的consumer和producer. 這個接口實際上抽象出來了Kafka client與網絡交互的方式。
為了對它的API有清楚的認識,先要了解下Kafka protocol所要求的client和broker對於網絡請求的處理規則。
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.
這一段的信息量挺大的。
順序性
首先,broker按照請求被發送的順序處理請求,並且按照同樣的順序發送響應。因為Kafka對消息的順序性有如下的保證:
- Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
- A consumer instance sees messages in the order they are stored in the log.
為了實現這種順序性保證,最簡單可靠的行為就是"The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. ", 也就是說對於一個TCP連接,broker的請求處理鏈條中只會有一個正在處理的(in-flight)消息.
那么,Kafka在broker端需不需要緩存待處理的消息呢?
首先,如果緩存請求的話,可能會占用大量內存.其次,如果緩存請求的話,在請求處理出錯時,會使得Kafka client難以控制消息的順序,因為本質上,這種緩存使得client的請求是異步處理的.而如果不進行緩存,那么broker的行為對於client而言更容易理解.
所以,broker是不會在本地緩存請求的.當它從某個連接讀取一個請求之后,就會停止從這個連接繼續讀取請求.也就是說對於每個TCP連接,broker的處理流程是:接收一個請求 -> 處理請求 -> 發送響應 -> 接收下一個請求 -> ...
具體的做法,可以在kafka.network.Processor(也就是reactive模型里的subRactor) 找到,在其run方法中,對於已經完整讀取的request和發送完畢的response, 有以下的處理
selector.completedReceives.asScala.foreach { receive => try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) //把請求送入requestChannel,以后request handler會從中取出request來處理 selector.mute(receive.source) //停止從這個request的來源(並不只用host來區分)讀取消息 } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) close(selector, receive.source) } } selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() selector.unmute(send.destination) //將已發送完畢的response的源設為可讀的 }
可見,對於正在處理的請求,broker不會從它的來源再讀取新的消息,直至請求被處理完畢,並且其響應被發送完畢。
預抓取
另一方面,對於client,如果它接收到上一個請求的響應之后,才開始生成新的請求,然后再發送新請求,那么在等待響應的過程中,client就處理等待狀態,這樣挺沒效率.因此,"clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer",也就是說client可以在等待響應的過程中繼續發送請求,因為即使broker不去通過網絡讀這些請求,這些請求也會被緩存在OS的socket buffer中,因此,當broker處理完之前的請求,就可以立即讀出來新的請求.不過,如果client這么做的話,會使得它的行為更復雜(因為涉及到出錯時的順序性).
對於consumer,在接收到響應之前難以確定下一次fetch開始的offset,因此在收到前一個fetch respones之后才發送下一次fetch request是比較穩妥的做法.不過如果可以比較准確判斷fetch響應包含消息的數目,比而提前發出fetch request,的確有可能會提交consumer的性能.
而且,"收到fetch respone"和"用戶處理完fetch到的消息"這兩個時間點還是有所不同的,在收到fetch response之后,把抓取到的消息交給用戶處理之前,發出下一個fetch request,這樣可以提高consumer抓取的效率.新的consumer-KafkaConsumer的確是這么做的.這是KafkaConsumer的poll方法里的一段代碼(用戶通過執行這個poll方法來獲取消息)
do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE that we use quickPoll() in this case which disables wakeups and delayed // task execution since the consumed positions has already been updated and we // must return these records to users to process before being interrupted or // auto-committing offsets fetcher.sendFetches(metadata.fetch()); client.quickPoll(); return this.interceptors == null ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0);
中間的那一大段就是在說這個事情,但是它考慮的情況比剛才提到的要復雜一些.
首先,如果pollOnce得到的records不為空,就要把這些records返回給用戶,所以在此之前要先發送一批fetch rquest(利用Fetcher#sendFetches).如果為空的話,在do-while循環里的pollOnce會發送新的fetch request.
其次,由於Fetcher的sendFetches並不會執行網絡IO操作,而只是生成並且緩存fetch request,所以還需要利用ConsumerNetworkClient的quickPoll方法來執行一次IO操作把這些fetch request發出去.但是由於此時用戶還沒有得到這次pollOnce返回的records, 因此不能進行auto-commit操作,否則就會把還沒返回給用戶的records給commit了,並且也不能使得處理的過程被別的線程中斷,因為這樣用戶也拿不到這些records了.所以,這里調用quickPoll,quickPoll會禁止wakeUp,並且不執行DelayedTasks(因為AutoCommitTask就是通過DelayedTask機制執行的).
對Kafka內部隊列選擇的影響
Kafka的broker是一個典型的Reactor模型的socket server。其中Processor相關於sub reactor,而HandlerPool相當於worker pool. Processor和Handler 都有各自的線程,它們之間通過一些隊列來傳遞請求和響應。Kafka把這些隊列封裝成了RequestChannel。
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
... }
Kafka對於一個連接一次只處理一個請求的特性,決定了這里的兩種隊列的類型。其中,存放請求的隊列用的是ArrayBlockingQueue,隊列大小為queuSize,而存放響應的隊列用的是LinkedBlockingQueue,它的capcity是Integer.MAX_VALUE。
有界隊列 VS 無界隊列
存放請求的隊列必須用有界的阻塞隊列,否則可能會有太多的請求撐爆內存。而使用有界隊列,事實上可以阻塞Processor線程,使得在請求隊列滿的情況下,Broker拒絕新的請求。
但是響應隊列選用無界的隊列,其原因卻是很隱晦的。
總的待發送響應的個數由於請求隊列的限制,通常不會太大。但這也不意味着這種選擇不會出問題,因為在最差情況下,可能會有相當於總的連接數的待發送響應。想象一種情況,假設有非常多的consumer(比如1W個)發送fetch請求,每個請求抓取1M的數據,但這些consumer都不從socket中讀取響應,那么會有什么情況發生呢?不是會把內存爆掉嗎?事實上,由於Kafka在發送響應時的zero copy特性,使得FetchRepsonse本身不會占用太大內存,所以即使有非常多的待發送響應,但響應對象所占的大小跟要傳送的數據比,還是通常要小很多(取決於fetch請求的fetch size)。其它的響應,實際上也不會特別大,對於一個大集群,占用內存最大的也就是Metadata相關的響應了。
但是另一方面,如果這個隊列用有界的,那么當所有Handler都阻塞於往這些隊列put元素,而所有Processor都阻塞於往RequestQueue里put元素,那么整個server就死鎖了。所以Kafka還是用了無界的隊列。
非阻塞隊列
另一個有趣的隊列就是Processor和Acceptor之間存放新建立的連接的隊列了。
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
這里用了ConcurrentLinkedQueue,因為新連接的處理和消息的發送/接收是在同一個循環中的,所以存放消息的隊列是非阻塞的更合適一些。
API
KafkaClient,是producer和consumer與broker通信的接口,它的設計就建立在上邊的協議的基礎上。這個類包括了與連接狀態和請求-響應狀態有關的方法。producer和consumer實際使用的它的實現類是NetworkClient。以下方法的作用結合了KafkaClient和NetworkClient的注釋,但以NetworkClient的實現為標准。
public boolean isReady(Node node, long now) 查看某個結點是否准備好發送新請求了。由於是給client用的,因此這里的“node"就是broker
public boolean ready(Node node, long now)是到指定node的連接已經被創建好並且可以發送請求。如果連接沒有創建,就創建到這個node的連接。
public long connectionDelay(Node, long now) 基於連接狀態,返回需要等待的時間。連接的狀態有三種:disconnected, connecting, connected. 如果是disconnected狀態,就返回reconnect的backoff time。當connecting或者connected,就返回Long.MAX_VALUE,因為此時需要等待別的事件發生(比如連接成功,或者收到響應)
public long connectionFailed(Node node) 查看到這個node的連接是否失敗。
public void send(ClientRequest request, long now) 把這個request放入發送隊列。如果request是要發給還沒有連接好的node的,那么就會拋出IllegalStateException異常, 這是一個運行時異常。
public List<ClientResponse> poll(long timeout, long now) 對於socket進行讀寫操作。
public void close(String nodeId) 關閉到指定node的連接
public Node leastLoadedNode(long now) 選擇有最少的未發送請求的node,要求這些node至少是可以連接的。這個方法會優先選擇有可用的連接的節點,但是如果所有的已連接的節點都在使用,它就會選擇還沒有建立連接的節點。這個方法絕對不會選擇憶經斷開連接的節點或者正在reconnect backoff階段的連接。
public int inFlightRequestCount() 所有已發送但還沒收到響應的請求的總數
public int inFlightRequestCount(String nodeId) 對於某個特定node的in-flight request總數
public RequestHandler nextRequestHanlder(ApiKeys key) 為某種請求構造它的請求頭。按照Kafka Protoocl, request包括以下部分:
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
|
而這個方法構造了ApiKey, ApiVersion, CoorelationId和ClientId,作為請求的頭部,request handler在源碼里有對應類org.apache.kafka.common.requests.RequestHandler。
ApiKey表示請求的種類, 如produce request, fetch request, metadata request等。
puclic RequestHandler nextRequestHandler(ApiKey key, short version) 構造請求的頭部,使用特定版本號。
public void wakeup() 如果這個client正在IO阻塞狀態,就喚醒它。
總結
Kafka protocol的一些細節,在Kafka client的接口設計中得到了體現.並且,有一些小細節是挺有意思的.
下面會看一下NetworkClient,它是KafkaClient接口的實現.