kafka客戶端中使用了很多的回調方式處理請求。基本思路是將回調函數暫存到ClientRequest中,而ClientRequest會暫存到inFlightRequests中,當返回response的時候,從inFlightRequests中讀取對應的ClientRequest,並調用request中的回調函數完成處理。
inFlightRequests是請求和響應處理的橋梁.
1. 接口和抽象類
無論是producer還是consumer,回調函數類都是實現了RequestCompletionHandler接口。
public interface RequestCompletionHandler {
public void onComplete(ClientResponse response);
}
consumer的回調函數類不但實現了RequestCompletionHandler,還繼承了RequestFuture。RequestFuture是一個有狀態的類,在調用中會設置響應的狀態,可以持有RequestFuture的引用,用來判斷請求的狀態。
public class RequestFuture<T> {
private boolean isDone = false;
private T value;
private RuntimeException exception;
private List<RequestFutureListener<T>> listeners = new ArrayList<>();
// 省略其他方法
}
2. producer
producer是在sender線程中創建的ClientRequest,如下:
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
}
// 創建request
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
RequestSend send = new RequestSend(Integer.toString(destination),
this.client.nextRequestHeader(ApiKeys.PRODUCE),
request.toStruct());
// 回調函數
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
// 回調函數保存到request中, 然后request被保存到了inFlightRequests
return new ClientRequest(now, acks != 0, send, callback);
}
在NetworkClient#poll(..)最后會處理會調用對應的回調函數
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) { // response中封裝了request中的回調函數
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response); //調用回調函數
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
3. Consumer
consumer使用回調函數和producer使用方式類似,但是比producer復雜一些。前面說了Consumer的回調函數不但實現了RequestCompletionHandler,還繼承了RequestFuture。
public static class RequestFutureCompletionHandler
extends RequestFuture<ClientResponse>
implements RequestCompletionHandler {
@Override
public void onComplete(ClientResponse response) {
if (response.wasDisconnected()) {
ClientRequest request = response.request();
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
int correlation = send.header().correlationId();
log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
api, request, correlation, send.destination());
raise(DisconnectException.INSTANCE);
} else {
complete(response); // 關鍵, complete方法會設置RequestFuture的狀態
}
}
}
}
public void complete(T value) { // 設置RequestFuture狀態
if (isDone)
throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
this.value = value;
this.isDone = true;
fireSuccess(); // 循環調用RequestFuture中的listeners
}
private void fireSuccess() {
for (RequestFutureListener<T> listener : listeners)
listener.onSuccess(value);
}
private void fireFailure() {
for (RequestFutureListener<T> listener : listeners)
listener.onFailure(exception);
}
與producer類似,請求被放到一個map中,不過名字是unsent。如下ConsumerNetworkClient#send(..):
public RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler future = new RequestFutureCompletionHandler(); // 回調函數
RequestHeader header = client.nextRequestHeader(api);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
put(node, new ClientRequest(now, true, send, future)); // request方法哦unsent中
return future; // 並返回回調函數類的引用
}
在調用ConsumerNetworkClient#send(..)后又緊接着調用了Future#compose(..)。如下:
private RequestFuture<Void> sendGroupCoordinatorRequest() {
Node node = this.client.leastLoadedNode();
if (node == null) {
return RequestFuture.noBrokersAvailable();
} else {
log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) // send后返回FutureRequest,然后又調用compose方法
.compose(new RequestFutureAdapter<ClientResponse, Void>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
handleGroupMetadataResponse(response, future);
}
});
}
}
Future#compose(..)方法又兩個作用
- 添加FutureRequest的listeners
- 返回一個新的FutureRequest,用新FutureRequest來判斷狀態
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<S>(); // 返回新的RequestFuture
addListener(new RequestFutureListener<T>() { // 添加到原先FutureRequest中的listeners中
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted); // 返回response后會調用listeners,從而會設置新的RequestFuture狀態,我們就可以根據這個新的RequestFuture來判斷response處理狀態。
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
所以將ClientRequest放到map中后,最終我們持有的是compose中新建的FutureRequest,如AbstractCoordinator#ensureCoordinatorReady(..):
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupCoordinatorRequest();// 最終返回compose返回的future。
client.poll(future); // 在poll中不停的輪訓future的狀態
if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
coordinatorDead();
time.sleep(retryBackoffMs);
}
}
}
public void poll(RequestFuture<?> future) {
while (!future.isDone()) // 輪訓future狀態,當response做相應處理會調用回調函數,從而設置future相應狀態。
poll(Long.MAX_VALUE);
}
總結
kafka客戶端中使用了大量的回調函數做請求的處理,理解回調函數很重要,附回調函數鏈接:
http://www.cnblogs.com/set-cookie/p/8996951.html