什么是Metadata? Topic/Partion與broker的映射關系:每一個Topic的每一個Partion的Leader、Follower的信息。
它存在哪里?持久化在Zookeeper中;運行時存儲在Broker的內存中。
1 Metadata的2種更新機制
-
周期性的更新: 每隔一段時間更新一次。,這個通過 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 這2個字段來實現。對應的ProducerConfig配置項為:
- metadata.max.age.ms //缺省300000,即10分鍾1次
-
失效檢測,強制更新:檢查到metadata失效以后,調用
metadata.requestUpdate()
強制更新。 requestUpdate()函數里面其實什么都沒做,就是把needUpdate置成了false
每次Sender.poll
的時候,都檢查這2種更新機制,達到了,就觸發更新。
那如何判定Metadata失效了呢?這個在代碼中很分散,有很多地方,會判定Metadata失效。
2. Metadata失效檢測
條件1:initConnect的時候 - NetworkClient.java
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate(); //判定metadata失效
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
條件2:poll里面IO的時候,連接斷掉了 - NetworkClient.java
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (String node : this.selector.disconnected()) {
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now);
}
if (this.selector.disconnected().size() > 0)
metadataUpdater.requestUpdate(); //判定metadata失效
}
條件3:有請求超時 - NetworkClient.java
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
for (String nodeId : nodeIds) {
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now);
}
if (nodeIds.size() > 0)
metadataUpdater.requestUpdate(); //判定metadata失效
}
條件4:發消息的時候,有partition的leader沒找到 - Sender.java
public void run(long now) {
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
條件5:返回的response和請求對不上的時候
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
int correlationId = response.request().request().header().correlationId();
if (response.wasDisconnected()) {
log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
.request()
.destination());
for (RecordBatch batch : batches.values())
completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);
總之:發生各式各樣的異常,數據不同步,都認為metadata可能出問題了,要求更新。
3.Metadata更新特點
Metadata的更新,還有以下幾個特點:
- 更新請求MetadataRequest是nio異步發送的,在
Sender.poll
的返回中,處理MetadataResponse的時候,才真正更新Metadata。
這里有個關鍵點:Metadata的cluster對象,每次是整個覆蓋的,而不是局部更新。所以cluster內部不用加鎖。
- 更新的時候,是從metadata保存的所有Node,或者說Broker中,選負載最小的那個,也就是當前接收請求最少的那個。向其發送MetadataRequest請求,獲取新的Cluster對象。
4 Sender poll()更新Metadata
從下面可以看出,Metadata的更新,是在while循環,每次調用client.poll()的時候更新的。
Sender 是KafkaProducer的一個線程類。
public void run() {
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
。。。
}
public void run(long now) {
Cluster cluster = metadata.fetch();
。。。
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); //遍歷消息隊列中所有的消息,找出對應的,已經ready的Node
if (result.unknownLeadersExist) //如果一個ready的node都沒有,請求更新metadata
this.metadata.requestUpdate();
。。。
//client的2個關鍵函數,一個發送ClientRequest,一個接收ClientResponse。底層調用的是NIO的poll。關於nio, 后面會詳細介紹
for (ClientRequest request : requests)
client.send(request, now);
this.client.poll(pollTimeout, now);
}
//NetworkClient
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now); //關鍵點:每次poll的時候判斷是否要更新metadata
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow); //在返回的handler中,會處理metadata的更新
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
//DefaultMetadataUpdater
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now); //找到負載最小的Node
maybeUpdate(now, node); //把更新Metadata的請求,發給這個Node
}
return metadataTimeout;
}
private void maybeUpdate(long now, Node node) {
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = request(now, nodeConnectionId, topics); //關鍵點:發送更新Metadata的Request
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(metadataRequest, now); //這里只是異步發送,返回的response在上面的handleCompletedReceives里面處理
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
} else { // connected, but can't send more OR connecting
this.lastNoNodeAvailableMs = now;
}
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
// Always expect the response version id to be the same as the request version id
short apiKey = req.request().header().apiKey();
short apiVer = req.request().header().apiVersion();
Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
correlate(req.request().header(), header);
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
@Override
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
//關鍵函數
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster(); //從response中,拿到一個新的cluster對象
if (response.errors().size() > 0) {
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
}
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now); //更新metadata,用新的cluster覆蓋舊的cluster
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now); //更新metadata失敗,做失敗處理邏輯
}
}
//更新成功,version+1, 同時更新其它字段
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster); //如果有人監聽了metadata的更新,通知他們
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster; //新的cluster覆蓋舊的cluster
notifyAll(); //通知所有的阻塞的producer線程
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
//更新失敗,只更新lastRefreshMs
public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
}