zeebe調用源碼解析


zeebe源碼解析

zeebe調用源碼解析

ZeebeClient通過Grpc調用Gateway,Gateway將請求通過netty分發到Broker,Broker處理邏輯

客戶端使用ZeebeClient和Zeebe交互,其中最常用的是注冊Worker來完成job

zeebeClient.newWorker()
                          .jobType(ZeebeConfigProperties.CONFIG_UPDATE_PIPELINE_EXECUATION_ERROR)
                          .handler((jobClient, job) -> configChangeWorkers.errorHandle(jobClient, job))
                          .fetchVariables(Workers.CONTEXT_ID, Workers.CHANGE_REQUEST, Workers.DEVICE_INFO,
                                  Workers.MANAGED_OBEJCT)
                          .open()

Open方法實例化JobWorkerImpl,在JobWorkerImpl的構造方法中建立定時任務tryActivateJobs不斷拉取job

    public JobWorkerImpl(final int maxJobsActive, final ScheduledExecutorService executor, final Duration pollInterval, final JobRunnableFactory jobRunnableFactory, final JobPoller jobPoller) {
        this.maxJobsActive = maxJobsActive;
        this.activationThreshold = Math.round((float)maxJobsActive * 0.3F);
        this.remainingJobs = new AtomicInteger(0);
        this.executor = executor;
        this.jobRunnableFactory = jobRunnableFactory;
        this.jobPoller = new AtomicReference(jobPoller);
        executor.scheduleWithFixedDelay(this::tryActivateJobs, 0L, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

在activateJobs方法中jobPoller.poll進行任務拉取,this::submitJob進行拉取任務后對任務處理

    private void activateJobs() {
        JobPoller jobPoller = (JobPoller)this.jobPoller.getAndSet((Object)null);
        if (jobPoller != null) {
            int currentRemainingJobs = this.remainingJobs.get();
            if (this.shouldActivateJobs(currentRemainingJobs)) {
                int maxActivatedJobs = this.maxJobsActive - currentRemainingJobs;

                try {
                    jobPoller.poll(maxActivatedJobs, this::submitJob, (activatedJobs) -> {
                        this.remainingJobs.addAndGet(activatedJobs);
                        this.jobPoller.set(jobPoller);
                    }, this::isOpen);
                } catch (Exception var5) {
                    LOG.warn("Failed to activate jobs", var5);
                    this.jobPoller.set(jobPoller);
                }
            } else {
                this.jobPoller.set(jobPoller);
            }
        }

    }

poll方法通過Grpc調用Zeebe

    private void poll() {
        LOG.trace("Polling at max {} jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getMaxJobsToActivate(), this.requestBuilder.getWorker(), this.requestBuilder.getType()});
        ((GatewayStub)this.gatewayStub.withDeadlineAfter(this.requestTimeout, TimeUnit.MILLISECONDS)).activateJobs(this.requestBuilder.build(), this);
    }

onNext方法中就是對返回的結果進行比對,如果有對應上的worker則進行處理

    public void onNext(final ActivateJobsResponse activateJobsResponse) {
        this.activatedJobs += activateJobsResponse.getJobsCount();
        activateJobsResponse.getJobsList().stream().map((job) -> {
            return new ActivatedJobImpl(this.objectMapper, job);
        }).forEach(this.jobConsumer);
    }

Zeebe的proto文件定義在gateway-protocol項目中

GatewayGrpcService是Zeebe接口的入口處

 @Override
  public void activateJobs(
      final ActivateJobsRequest request,
      final StreamObserver<ActivateJobsResponse> responseObserver) {
    endpointManager.activateJobs(
        request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver));
  }

  @Override
  public void cancelProcessInstance(
      final CancelProcessInstanceRequest request,
      final StreamObserver<CancelProcessInstanceResponse> responseObserver) {
    endpointManager.cancelProcessInstance(
        request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver));
  }

  @Override
  public void completeJob(
      final CompleteJobRequest request,
      final StreamObserver<CompleteJobResponse> responseObserver) {
    endpointManager.completeJob(
        request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver));
  }

其中completeJob方法中調用sendRequest方法,RequestMapper::toCompleteJobRequest是對請求參數處理的方法引用,ResponseMapper::toCompleteJobResponse是返回結果處理的方法引用

  public void completeJob(
      final CompleteJobRequest request,
      final ServerStreamObserver<CompleteJobResponse> responseObserver) {
    sendRequest(
        request,
        RequestMapper::toCompleteJobRequest,
        ResponseMapper::toCompleteJobResponse,
        responseObserver);
  }

該方法中先將grpc的request轉化為brokerRequest,再通過brokerClient將請求轉發給broker

  private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest(
      final GrpcRequestT grpcRequest,
      final Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper,
      final BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper,
      final ServerStreamObserver<GrpcResponseT> streamObserver) {
    final BrokerRequest<BrokerResponseT> brokerRequest;

    try {
      brokerRequest = requestMapper.apply(grpcRequest);
    } catch (final Exception e) {
      streamObserver.onError(e);
      return;
    }

    brokerClient.sendRequestWithRetry(
        brokerRequest,
        (key, response) -> consumeResponse(responseMapper, streamObserver, key, response),
        streamObserver::onError);
  }

sendRequestInternal先決定處理的borker,sender.send請求調用,actor.runOnCompletion對返回結果進行處理

  private <T> void sendRequestInternal(
      final BrokerRequest<T> request,
      final CompletableFuture<BrokerResponse<T>> returnFuture,
      final TransportRequestSender sender,
      final Duration requestTimeout) {

    final BrokerAddressProvider nodeIdProvider;
    try {
      nodeIdProvider = determineBrokerNodeIdProvider(request);
    } catch (final PartitionNotFoundException e) {
      returnFuture.completeExceptionally(e);
      GatewayMetrics.registerFailedRequest(
          request.getPartitionId(), request.getType(), "PARTITION_NOT_FOUND");
      return;
    } catch (final NoTopologyAvailableException e) {
      returnFuture.completeExceptionally(e);
      GatewayMetrics.registerFailedRequest(
          request.getPartitionId(), request.getType(), "NO_TOPOLOGY");
      return;
    }

    final ActorFuture<DirectBuffer> responseFuture =
        sender.send(clientTransport, nodeIdProvider, request, requestTimeout);
    final long startTime = System.currentTimeMillis();

    actor.runOnCompletion(
        responseFuture,
        (clientResponse, error) -> {
          RequestResult result = null;
          try {
            if (error == null) {
              final BrokerResponse<T> response = request.getResponse(clientResponse);

              result = handleResponse(response, returnFuture);
              if (result.wasProcessed()) {
                final long elapsedTime = System.currentTimeMillis() - startTime;
                GatewayMetrics.registerSuccessfulRequest(
                    request.getPartitionId(), request.getType(), elapsedTime);
                return;
              }
            } else {
              returnFuture.completeExceptionally(error);
            }
          } catch (final RuntimeException e) {
            returnFuture.completeExceptionally(new ClientResponseException(e));
          }

          registerFailure(request, result, error);
        });
  }

sender.send的實際調用方式

private static final TransportRequestSender SENDER_WITH_RETRY =
      (c, s, r, t) -> c.sendRequestWithRetry(s, BrokerRequestManager::responseValidation, r, t);

**最終調用為 messagingService
.sendAndReceive(nodeAddress, requestContext.getTopicName(), requestBytes, calculateTimeout)通過netty調用broker
**
handler.accept(message, this)方法處理請求

 @Override
  public void dispatch(final ProtocolRequest message) {
    final String subject = message.subject();
    final BiConsumer<ProtocolRequest, ServerConnection> handler = handlers.get(subject);
    if (handler != null) {
      log.trace("Received message type {} from {}", subject, message.sender());
      handler.accept(message, this);
    } else {
      log.debug("No handler for message type {} from {}", subject, message.sender());

      byte[] subjectBytes = null;
      if (subject != null) {
        subjectBytes = StringUtil.getBytes(subject);
      }

      reply(message, ProtocolReply.Status.ERROR_NO_HANDLER, Optional.ofNullable(subjectBytes));
    }
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM