一、簡介
- 版本:1.1.1
- Kafka網絡層是Kafka所有請求的入口,網絡模型為NIO實現的多Reactor多線程模型,核心功能是將接受連接、將TCP包轉換成Request,傳遞給API層,處理完后,發送Response
- Github注釋版源碼:https://github.com/nlskyfree/kafka-1.1.1-sourcecode
二、整體架構
2.1 核心邏輯
- 1個Acceptor線程+N個Processor線程(network.threads)+M個Request Handle線程(io threads)
- 多線程多Reactor模型,Acceptor獨占一個selector,每個Processor有自己的selector
- 每個Processor都有一個名為newConnections的ConcurrentLinkedQueue[SocketChannel](),Acceptor會round-robin輪詢Processor,將新的連接放入對應Processor的隊列里
- 每個Processor有自己的selector,監聽網絡IO讀寫事件的發生
- IO讀事件發生時,所有Processor會將組包完成后的Request放入RequestChannel中默認大小500的全局ArrayBlockingQueue中
- Request Handle完成kafka內部邏輯后,將Response寫到處理Request的Processor線程內的LinkedBlockingQueue中
- IO寫事件發生時,將數據寫回Client
2.2 核心類、方法介紹
SocketServer //kafka網絡層的封裝
|-- Acceptor //Acceptor線程的封裝
|-- Processor //Processor線程的封裝
Selector //對java selector的封裝,封裝了核心的poll,selectionkeys的遍歷,事件的注冊等操作
KafkaChannel //對java SocketChannel的封裝,封裝是實際的讀寫IO操作
TransportLayer //對KafkaChannel屏蔽了底層是使用Plaintext不加密通信還是ssl加密通信
RequestChannel //和API層通信的通道層,封裝了和API層通信的Request、Response以及相應的通信隊列
|-- Request //傳遞給API層的Requst
|-- Response //API層返回的Response
整體流程如圖:
三、核心流程分析
3.1 啟動流程
// 1. Kafka.scala
def main(args: Array[String]): Unit = {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// 啟動Server
kafkaServerStartable.startup()
// 通過countDownLatch阻塞主線程,直到kafka關閉
kafkaServerStartable.awaitShutdown()
}
// 2. KafkaServerStartable.scala
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
def startup() {
// 啟動Kafka Server
server.startup()
}
// 3. KafkaServer.scala
def startup() {
// 啟動socketServer,即Acceptor線程,processor會得到KafkaServer啟動完后延遲啟動
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)
// 啟動各種其他組件
······
// 啟動socketServer中的Processor,開始進行網絡IO
socketServer.startProcessors()
}
// 4. SocketServer.scala
def startup(startupProcessors: Boolean = true) {
this.synchronized {
// 創建並啟動Acceptor,創建Processor
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
// 是否立即啟動Processor,默認為false
startProcessors()
}
}
}
private def createAcceptorAndProcessors(processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
// 處理每個Endpoint,一般就是一個
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
// 創建Acceptor線程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
// 這里只是創建Processor並不啟動
addProcessors(acceptor, endpoint, processorsPerListener)
// 非daemon模式啟動線程
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
// 阻塞直至線程啟動成功
acceptor.awaitStartup()
acceptors.put(endpoint, acceptor)
}
}
def startProcessors(): Unit = synchronized {
// 遍歷所有Processor並啟動
acceptors.values.asScala.foreach { _.startProcessors() }
}
private[network] def startProcessors(): Unit = synchronized {
// 確保只啟動一次
if (!processorsStarted.getAndSet(true)) {
startProcessors(processors)
}
}
// 非Daemon模式啟動Processor
private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
processors.foreach { processor =>
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor).start()
}
}
KafkaServer啟動時,初始化並啟動SocketServer
- 創建並運行Acceptor線程,從全連接隊列中獲取連接,並round-robin交給Processor處理
- 所有組件啟動完成后,會啟動一定數目的Processor,實際管理SocketChannel進行IO讀寫
3.2 Acceptor.run流程
Acceptor線程對一個Endpoint只啟動一個,核心代碼位於Socketserver.scala中的Acceptor類中,此類實現了runnable方法,會由單獨線程執行
def run() {
// 注冊
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
var currentProcessor = 0
while (isRunning) {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
val key = iter.next
// 處理完需要從集合中移除掉
iter.remove()
// round-robin選一個processor
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor)
}
// channel初始化,放入對應processor的newConnection隊列
accept(key, processor)
// round robin to the next processor thread, mod(numProcessors) will be done later
currentProcessor = currentProcessor + 1
}
}
}
}
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
connectionQuotas.inc(socketChannel.socket().getInetAddress)
// channel初始化
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
// 將連接放入processor的新連接隊列
processor.accept(socketChannel)
}
def accept(socketChannel: SocketChannel) {
// accept將新連接放入processor的ConcurrentLinkedQueue中
newConnections.add(socketChannel)
// 喚醒該processor的多路復用器
wakeup()
}
Acceptor做的事情很簡單,概括起來就是監聽連接,將新連接輪詢交給processor:
- 使用多路復用器監聽全連接隊列里的連接
- 有連接到達后,round-robin輪詢processors數組,選擇一個processor
- 初始化socketChannel,開啟keepalive、禁用nagle算法、設置send buffer
- 將socketchannel放入選中的processor的新連接隊列里
3.3 Processor.run流程
Processor線程根據num.network.threads啟動對應的線程數,從每個Processor獨占的新連接隊列中取出新的連接並初始化並注冊IO事件。每個Processor有單獨的selector,監聽IO事件,讀事件組包后寫入全局requestQueue,寫事件從每個Processor獨占的responseQueue中獲取,再寫回Client。
override def run() {
while (isRunning) {
// setup any new connections that have been queued up
// acceptor線程會將新來的連接對應的SocketChannel放入隊列,此時消費並向selector注冊這些連接,注冊讀IO事件
configureNewConnections()
// register any new responses for writing
// 從responseQueue中讀取准備發送給client的response,封裝成send放入channel中,並注冊IO寫事件
processNewResponses()
/**
* 1. 發生OP_READ事件的channel,若包全部到達,則形成NetworkReceives寫入到completedReceives(每個channel只會有一條在completedReceives中)
* 2. 發生OP_WRITE事件的channel,會將channel中暫存的send發出,若發送完成則會寫入completedSends
*/
poll()
// 將網絡層組包完成后的NetworkReceive轉換成Request放入到requestQueue中(后面IO Thread讀取)同時mute channel(注銷OP_READ事件),保證一個channel同時只有一個請求在處理
processCompletedReceives()
// unmute channel(注冊OP_READ事件),之前的request處理完成,此channel開始接受下一個request
processCompletedSends()
// 處理關閉的連接,維護些集合,更新統計信息
processDisconnected()
}
}
Processor run方法的核心邏輯做了很好的封裝,從run方法來看線程會一直循環處理以下6個邏輯:
- 從newConenctions隊列里取出新的連接,初始化socketChannel,注冊OP_READ事件
- 遍歷responseQueue所有RequestChannel.Response,封裝寫入KafkaChannel,做為該Channel下一個待發送的Send,然后在對應的SelectionKey上注冊OP_WRITE事件
- poll方法執行核心的NIO邏輯,調用select方法,遍歷有事件發生的selectionKeys
- 發生OP_READ事件的channel,若包全部到達,則形成NetworkReceives寫入到completedReceives(每個channel只會有一條在completedReceives中)
- 發生OP_WRITE事件的channel,會將channel中暫存的send發出,若發送完成則會寫入completedSends
- 遍歷completedReceives中的結果,封裝成Request,寫入全局requestQueue並取消Channel的OP_READ事件監聽,待后續IO Thread處理完Response發送成功后,才會重新注冊OP_READ
- 遍歷completedSends中的結果,向selector重新注冊對該Channel的OP_READ事件
- 遍歷各種原因down掉的connection,做一些收尾工作,清理一些狀態
以下是每一步具體的源碼:
3.3.1 configureNewConnections
用於處理Acceptor新交給此Processor的連接
// SocketChannel.scala
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
// 新的連接注冊IO讀事件,connectionId就是ip+port形成的字符串唯一標志連接使用
selector.register(connectionId(channel.socket), channel)
}
}
// Selector.java
public void register(String id, SocketChannel socketChannel) throws IOException {
// 確保沒有重復注冊
ensureNotRegistered(id);
// 創建kafkachannel並attach到selectkey上
registerChannel(id, socketChannel, SelectionKey.OP_READ);
}
private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
// 向selector注冊
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
// 創建kafka channel並attach到SelectionKey上
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
this.channels.put(id, channel);
return key;
}
主要完成一些初始化工作
- 遍歷newConnections隊列,從中取出新連接
- 向Selector注冊IO讀事件
- 創建KafkaChannel用於封裝SocketChannel
- 將KafkaChannel attach到對應的SelectionKey上
3.3.2 processNewResponses
處理已經處理完的Request的Response
// SocketServer.scala
private def processNewResponses() {
var curr: RequestChannel.Response = null
// 讀取responseQueue,處理所有返回
while ({curr = dequeueResponse(); curr != null}) {
// 理論上每個channel應該只會被遍歷一次,因為一個連接上同時只會有一個Request正在處理
val channelId = curr.request.context.connectionId
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
updateRequestMetrics(curr)
trace("Socket server received empty response to send, registering for read: " + curr)
// 空請求說明此請求處理完了,此時unmute此KafkaChannel,開始接受請求
openOrClosingChannel(channelId).foreach(c => selector.unmute(c.id))
case RequestChannel.SendAction =>
val responseSend = curr.responseSend.getOrElse(
throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
// 注意這里只是將responseSend注冊為KafkaChannel的待發送Send並向SelectionKey注冊OP_WRITE事件
sendResponse(curr, responseSend)
case RequestChannel.CloseConnectionAction =>
updateRequestMetrics(curr)
trace("Closing socket connection actively according to the response code.")
close(channelId)
}
}
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
val connectionId = response.request.context.connectionId
// Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
// removed from the Selector after discarding any pending staged receives.
// `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long
if (openOrClosingChannel(connectionId).isDefined) {
selector.send(responseSend)
inflightResponses += (connectionId -> response)
}
}
// Selector.java
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
// 這里只是設置channel的send,並沒有實際發送
channel.setSend(send);
}
public void setSend(Send send) {
// 同時只能有一個send存在
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
// 設置send
this.send = send;
// transportLayer其實就是對不加密通信、加密通信的封裝,增加對OP_WRITE事件的監聽
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public void addInterestOps(int ops) {
key.interestOps(key.interestOps() | ops);
}
核心邏輯是從responseQueue中獲取待發送的response,並作為KafkaChannel下一個待發送Send,再注冊OP_WRITE事件
- 遍歷responseQueue,獲取已經處理完的Response
- 判斷Response是否為空,為空,unmute channel,注冊OP_READ,等待下一個Request,不為空調用sendResponse發送Response
- 將當前待發送Response封裝成Send,綁定到KafkaChannel上,一次只能有一個待發送Send(一次也只處理一個Request)
- 注冊OP_WRITE事件,事件發生時,才實際發送當前Send
3.3.3 poll
實際調用select,並對發生的IO事件進行處理的方法
// SocketServer.scala
private def poll() {
selector.poll(300)
}
// selector.java
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
clear();
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
if (!memoryPool.isOutOfMemory() && outOfMemory) {
//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.unmute();
}
}
outOfMemory = false;
}
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
// 有IO事件發生或有immediatelyConnect發生或上次IO事件發生時channel數據沒有讀完
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
// 遍歷selectionKey處理IO讀寫事件,讀完的數據放入stagedReceive。同時將KafkaChannel中的Send寫出
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// 處理空閑的連接,默認10min,超時的連接會被斷開
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
// 從stagedReceives中每個channel取一條NetworkReceives放入到CompletedReceived
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
addToCompletedReceives();
}
private int select(long timeoutMs) throws IOException {
if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
// determineHandlingOrder對key集合做了shuffle,避免發生飢餓
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
// 更新channel的過期時間
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
boolean sendFailed = false;
// 從channel讀數據到stagedReceive,若stagedReceive有數據,說明已形成完整Request,不再繼續讀
attemptRead(key, channel);
// 只有ssl通信時才可能為true
if (channel.hasBytesBuffered()) {
keysWithBufferedRead.add(key);
}
// 往channel寫數據
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = null;
try {
// 將channel中的send發送出去,如果發送完成,則注銷OP_WRITE事件
send = channel.write();
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
// 添加到completedSends集合中
this.completedSends.add(send);
}
}
}
}
調用select,對OP_READ、OP_WRITE事件進行響應,處理IO讀寫
- 調用select方法,獲取發生IO事件的SelectionKey
- 有IO事件發生或有immediatelyConnect發生或上次IO事件發生時channel數據沒有讀完,對對應的keys調用pollSelectionKeys
- 遍歷SelectionsKeys
- 調用atemptRead進行實際的channel讀取
- 若發生OP_WRITE事件,調用channel.write將channel綁定的當前Send寫出,若數據全部發送完成,則將該Send放入CompletedReceive,並注銷OP_WRITE事件
- 處理長時間空閑的連接,默認10m,關閉超時的連接
- 將stagedReceives中的networkReceive移動到completeReceives
attemptRead封裝了實際的channel讀操作
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
// channel.read返回不為null則代表讀到一個完的Request
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
addToStagedReceives(channel, networkReceive);
}
// 這里mute了,一定是channel.read()內由於memorypool內存不夠,才會mute
if (channel.isMute()) {
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
// KafkaChannel.java
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
// 從channel里讀取數據,內部實際調用的readFromReadableChannel()
receive(receive);
// 如果讀完了,形成一個完整的Request
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
} else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
//pool must be out of memory, mute ourselves.
mute();
}
return result;
}
// NetworkReceive.java
// 這里的實現和zookeeper網絡層很像,也是前4個字節傳遞payload大小,然后創建指定大小buffer讀取數據
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0;
// size為4個字節大小的bytebuffer,這里沒讀滿,說明頭4個字節還沒拿到
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
// 實際的Request大小
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
// 說明頭4個字節讀完了
if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
// 分配緩沖區內存,memorypool用於控制網絡層緩沖區大小,默認為無限大
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
if (buffer != null) {
// 實際讀取payload
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
若發生OP_READ事件,調用channel.read直到讀到完整的networkReceive,並放入stagedReceive
- 先讀取4個字節size,為整個payload大小
- 再讀取size個字節,讀完后形成的networkReceive為一個完整的Request,放入stagedReceive
channel.write封裝了實際的channel寫數據邏輯
// KafkaChannel.java
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
// 這里根據不同send,實際底層API不同,如RecordsSend底層使用了零拷貝,而ByteBufferSend使用正常channel write
send.writeTo(transportLayer);
// 是否完成,通過讀到的字節數是否達到request的前4個字節對應的大小
if (send.completed())
// 發送完成,則注銷OP_WRITE事件
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
// RecordsSend.java
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = 0;
// 是否未發送完
if (remaining > 0) {
// 發送剩余的bytes
written = records.writeTo(channel, size() - remaining, remaining);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
}
pending = TransportLayers.hasPendingWrites(channel);
if (remaining <= 0 && pending)
channel.write(EMPTY_BYTE_BUFFER);
return written;
}
// FileRecords.java
public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
int count = Math.min(length, oldSize);
final long bytesTransferred;
if (destChannel instanceof TransportLayer) {
TransportLayer tl = (TransportLayer) destChannel;
// 實際內部也是零拷貝實現,調用了channel.transferTo
bytesTransferred = tl.transferFrom(channel, position, count);
} else {
// 零拷貝, 將本地文件拷貝到網卡
bytesTransferred = channel.transferTo(position, count, destChannel);
}
return bytesTransferred;
}
Kafka的發送邏輯,依據Send的不同子類有不同實現,這里只列舉了RecordSend的代碼,使用零拷貝技術將磁盤文件拷貝給網卡
- 判斷是否有待發送Send,若有向transportLayer寫出此Send
- 若是以下兩種場景,則Send為RecordsSend(代表磁盤文件數據),則調用對應的writeTo方法向channel寫數據
- consumer消費partition中的消息
- replica 的follower從leader拉取消息進行同步
- RecordsSend使用channel.transferTo進行零拷貝,將數據拷貝到網卡
- 若是以下兩種場景,則Send為RecordsSend(代表磁盤文件數據),則調用對應的writeTo方法向channel寫數據
- 如果發送成功,注銷OP_WRITE的監聽(因為Java NIO是ET模式,否則會一直觸發OP_WRITE)
3.3.4 processCompletedReceives
處理completedReceives中的NetworkReceive,封裝成Request放入RequestChannel的全局requestQueue中,供API層調用
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
// 根據connectionId獲取Channel
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
val context = new RequestContext(header, receive.source, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
requestChannel.sendRequest(req)
// 注銷OP_READ事件監聽,保證一個連接來的請求,處理完后才會處理下個請求,因此保證單個連接請求處理的順序性
selector.mute(receive.source)
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
}
}
}
1. 遍歷completedReceives中的networkReceive,從payload中提取數據封裝成RequestChannel.Request放入RequestChannel的全局requestQueue中
2. mute對應的KafkaChannel,即在對應selectionKey上注銷OP_READ事件(原因第三章詳解)
3.3.5 processCompletedSends
處理已完成發送的Response,遍歷CompletedSends,unmute對應的KafkaChannel,即重新在對應selectionKey上注冊OP_READ事件,接收下一個Request
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
updateRequestMetrics(resp)
// response發送完成,unmute channel,重新監聽OP_READ事件
selector.unmute(send.destination)
}
}
// selector.scala
public void unmute(String id) {
KafkaChannel channel = openOrClosingChannelOrFail(id);
unmute(channel);
}
private void unmute(KafkaChannel channel) {
explicitlyMutedChannels.remove(channel);
channel.unmute();
}
// kafkaChannel.scala
void unmute() {
if (!disconnected)
transportLayer.addInterestOps(SelectionKey.OP_READ);
muted = false;
}
3.3.6 processDisconnected
若連接已關閉,從inflightResponses集合中移除,並減少對應的限流統計信息
private def processDisconnected() {
selector.disconnected.keySet.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}
四、其它細節
單個連接的順序性保證
Processor每接受到一個完整的Request就會再selector上取消監聽OP_READ事件,直到Response發送完成后才會重新監聽OP_READ事件,從而保證單個連接的Channel上,Server端請求是嚴格按照到達順序處理的。
為什么有transportLayer?
主要是封裝Plaintext通信與ssl通信,對於Plaintext不加密通信,本質transportLayer沒做任何處理,而對ssl通信,transportLayer對Kafka通信協議屏蔽了握手、加解密等操作
為什么要有stagedReceives,而不是直接放入compeletedReceived?
- 主要是由於SSL加密通信時,無法得知准確的數據長度(前4位加密后不知道多長了),例如:一次OP_READ讀到,2個Request,此時需要將這個2個Request都存入stagedReceives(因此每個channel一個隊列),然后一個一個處理(保障順序)。具體也可參考第2點git commit中的對話
- 這塊設計的確實不好,后續Kafka移除了stagedReceived,代碼更加簡潔https://github.com/apache/kafka/pull/5920/commits
為什么RequestQueue是單個隊列,不會有鎖沖突問題嗎?
因為kafka每次處理的數據是一批,實際一批數據才會競爭一次鎖,獲取鎖開銷平均下來並不大。騰訊雲曾嘗試優化這里為無鎖隊列,實際IO性能並沒有顯著提高。
MemoryPool作用?
為了限制網絡層buffer帶來的內存使用量,由queued.max.request.bytes配置,默認無限大,不做限制
零拷貝實現?
可以看到Kafka網絡層的通信並不是任何時候都使用零拷貝進行通信,只有涉及到消息磁盤文件上的消息數據時,才會使用零拷貝提升效率,帶來的問題是,從磁盤到網卡之間不能進行任何數據處理。其它場景如:數據寫入,內存元數據返回都不會使用到零拷貝。
圖片引自:https://blog.csdn.net/lizhitao/article/details/43987319