zeebe源碼解析
zeebe調用源碼解析
ZeebeClient通過Grpc調用Gateway,Gateway將請求通過netty分發到Broker,Broker處理邏輯
客戶端使用ZeebeClient和Zeebe交互,其中最常用的是注冊Worker來完成job
zeebeClient.newWorker()
.jobType(ZeebeConfigProperties.CONFIG_UPDATE_PIPELINE_EXECUATION_ERROR)
.handler((jobClient, job) -> configChangeWorkers.errorHandle(jobClient, job))
.fetchVariables(Workers.CONTEXT_ID, Workers.CHANGE_REQUEST, Workers.DEVICE_INFO,
Workers.MANAGED_OBEJCT)
.open()
Open方法實例化JobWorkerImpl,在JobWorkerImpl的構造方法中建立定時任務tryActivateJobs不斷拉取job
public JobWorkerImpl(final int maxJobsActive, final ScheduledExecutorService executor, final Duration pollInterval, final JobRunnableFactory jobRunnableFactory, final JobPoller jobPoller) {
this.maxJobsActive = maxJobsActive;
this.activationThreshold = Math.round((float)maxJobsActive * 0.3F);
this.remainingJobs = new AtomicInteger(0);
this.executor = executor;
this.jobRunnableFactory = jobRunnableFactory;
this.jobPoller = new AtomicReference(jobPoller);
executor.scheduleWithFixedDelay(this::tryActivateJobs, 0L, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
}
在activateJobs方法中jobPoller.poll進行任務拉取,this::submitJob進行拉取任務后對任務處理
private void activateJobs() {
JobPoller jobPoller = (JobPoller)this.jobPoller.getAndSet((Object)null);
if (jobPoller != null) {
int currentRemainingJobs = this.remainingJobs.get();
if (this.shouldActivateJobs(currentRemainingJobs)) {
int maxActivatedJobs = this.maxJobsActive - currentRemainingJobs;
try {
jobPoller.poll(maxActivatedJobs, this::submitJob, (activatedJobs) -> {
this.remainingJobs.addAndGet(activatedJobs);
this.jobPoller.set(jobPoller);
}, this::isOpen);
} catch (Exception var5) {
LOG.warn("Failed to activate jobs", var5);
this.jobPoller.set(jobPoller);
}
} else {
this.jobPoller.set(jobPoller);
}
}
}
poll方法通過Grpc調用Zeebe
private void poll() {
LOG.trace("Polling at max {} jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getMaxJobsToActivate(), this.requestBuilder.getWorker(), this.requestBuilder.getType()});
((GatewayStub)this.gatewayStub.withDeadlineAfter(this.requestTimeout, TimeUnit.MILLISECONDS)).activateJobs(this.requestBuilder.build(), this);
}
onNext方法中就是對返回的結果進行比對,如果有對應上的worker則進行處理
public void onNext(final ActivateJobsResponse activateJobsResponse) {
this.activatedJobs += activateJobsResponse.getJobsCount();
activateJobsResponse.getJobsList().stream().map((job) -> {
return new ActivatedJobImpl(this.objectMapper, job);
}).forEach(this.jobConsumer);
}
Zeebe的proto文件定義在gateway-protocol項目中
GatewayGrpcService是Zeebe接口的入口處
@Override
public void activateJobs(
final ActivateJobsRequest request,
final StreamObserver<ActivateJobsResponse> responseObserver) {
endpointManager.activateJobs(
request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver));
}
@Override
public void cancelProcessInstance(
final CancelProcessInstanceRequest request,
final StreamObserver<CancelProcessInstanceResponse> responseObserver) {
endpointManager.cancelProcessInstance(
request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver));
}
@Override
public void completeJob(
final CompleteJobRequest request,
final StreamObserver<CompleteJobResponse> responseObserver) {
endpointManager.completeJob(
request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver));
}
其中completeJob方法中調用sendRequest方法,RequestMapper::toCompleteJobRequest是對請求參數處理的方法引用,ResponseMapper::toCompleteJobResponse是返回結果處理的方法引用
public void completeJob(
final CompleteJobRequest request,
final ServerStreamObserver<CompleteJobResponse> responseObserver) {
sendRequest(
request,
RequestMapper::toCompleteJobRequest,
ResponseMapper::toCompleteJobResponse,
responseObserver);
}
該方法中先將grpc的request轉化為brokerRequest,再通過brokerClient將請求轉發給broker
private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest(
final GrpcRequestT grpcRequest,
final Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper,
final BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper,
final ServerStreamObserver<GrpcResponseT> streamObserver) {
final BrokerRequest<BrokerResponseT> brokerRequest;
try {
brokerRequest = requestMapper.apply(grpcRequest);
} catch (final Exception e) {
streamObserver.onError(e);
return;
}
brokerClient.sendRequestWithRetry(
brokerRequest,
(key, response) -> consumeResponse(responseMapper, streamObserver, key, response),
streamObserver::onError);
}
sendRequestInternal先決定處理的borker,sender.send請求調用,actor.runOnCompletion對返回結果進行處理
private <T> void sendRequestInternal(
final BrokerRequest<T> request,
final CompletableFuture<BrokerResponse<T>> returnFuture,
final TransportRequestSender sender,
final Duration requestTimeout) {
final BrokerAddressProvider nodeIdProvider;
try {
nodeIdProvider = determineBrokerNodeIdProvider(request);
} catch (final PartitionNotFoundException e) {
returnFuture.completeExceptionally(e);
GatewayMetrics.registerFailedRequest(
request.getPartitionId(), request.getType(), "PARTITION_NOT_FOUND");
return;
} catch (final NoTopologyAvailableException e) {
returnFuture.completeExceptionally(e);
GatewayMetrics.registerFailedRequest(
request.getPartitionId(), request.getType(), "NO_TOPOLOGY");
return;
}
final ActorFuture<DirectBuffer> responseFuture =
sender.send(clientTransport, nodeIdProvider, request, requestTimeout);
final long startTime = System.currentTimeMillis();
actor.runOnCompletion(
responseFuture,
(clientResponse, error) -> {
RequestResult result = null;
try {
if (error == null) {
final BrokerResponse<T> response = request.getResponse(clientResponse);
result = handleResponse(response, returnFuture);
if (result.wasProcessed()) {
final long elapsedTime = System.currentTimeMillis() - startTime;
GatewayMetrics.registerSuccessfulRequest(
request.getPartitionId(), request.getType(), elapsedTime);
return;
}
} else {
returnFuture.completeExceptionally(error);
}
} catch (final RuntimeException e) {
returnFuture.completeExceptionally(new ClientResponseException(e));
}
registerFailure(request, result, error);
});
}
sender.send的實際調用方式
private static final TransportRequestSender SENDER_WITH_RETRY =
(c, s, r, t) -> c.sendRequestWithRetry(s, BrokerRequestManager::responseValidation, r, t);
**最終調用為 messagingService
.sendAndReceive(nodeAddress, requestContext.getTopicName(), requestBytes, calculateTimeout)通過netty調用broker
**
handler.accept(message, this)方法處理請求
@Override
public void dispatch(final ProtocolRequest message) {
final String subject = message.subject();
final BiConsumer<ProtocolRequest, ServerConnection> handler = handlers.get(subject);
if (handler != null) {
log.trace("Received message type {} from {}", subject, message.sender());
handler.accept(message, this);
} else {
log.debug("No handler for message type {} from {}", subject, message.sender());
byte[] subjectBytes = null;
if (subject != null) {
subjectBytes = StringUtil.getBytes(subject);
}
reply(message, ProtocolReply.Status.ERROR_NO_HANDLER, Optional.ofNullable(subjectBytes));
}
}