轉發請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/8029356.html
checkpoint是Flink Fault Tolerance機制的重要構成部分,flink checkpoint的核心類名為org.apache.flink.runtime.checkpoint.CheckpointCoordinator。
定期產生的checkpoint事件
flink的checkpoint是由CheckpointCoordinator內部的一個timer線程池定時產生的,具體代碼由ScheduledTrigger這個Runnable類啟動。
private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint.", e); } } }
整個triggerCheckpoint方法大致分為三個部分:
1 環境前置檢查
// Sanity check if (props.externalizeCheckpoint() && targetDirectory == null) { throw new IllegalStateException("No target directory specified to persist checkpoint to."); } // make some eager pre-checks synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } // Don't allow periodic checkpoint if scheduling has been disabled if (isPeriodic && !periodicScheduling) { return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); } // validate whether the checkpoint can be triggered, with respect to the limit of // concurrent checkpoints, and the minimum time between checkpoints. // these checks are not relevant for savepoints if (!props.forceCheckpoint()) { // sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint while one was queued already"); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } // if too many checkpoints are currently in progress, we need to mark that a request is queued if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } } // check if all tasks that we need to trigger are running. // if not, abort the checkpoint Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee != null && ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // next, check if all tasks that need to acknowledge the checkpoint are running. // if not, abort the checkpoint Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); for (ExecutionVertex ev : tasksToWaitFor) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", ev.getTaskNameWithSubtaskIndex()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } }
上面的代碼主要在生成一個chepoint之前進行了一些pre-checks,包括checkpoint的targetDirectory、正在進行中的pendingCheckpoint數量上限、前后兩次checkpoint間隔是否過小、以及下游與checkpoint相關tasks是否存活等檢測,任意一個條件不滿足的都不會執行真正的checkpoint動作。
2 生成pendingcheckpoint
final long checkpointID; try { // this must happen outside the coordinator-wide lock, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, props, targetDirectory, executor); if (statsTracker != null) { PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint( checkpointID, timestamp, props); checkpoint.setStatsCallback(callback); } // schedule the timer that will clean up the expired checkpoints final Runnable canceller = new Runnable() { @Override public void run() { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint " + checkpointID + " expired before completing."); checkpoint.abortExpired(); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); triggerQueuedRequests(); } } } };
pendingcheckpoint表示一個待處理的檢查點,每個pendingcheckpoint標有一個全局唯一的遞增checkpointID,並聲明了一個canceller用於后續超時情況下的checkpoint清理用於釋放資源。
// re-acquire the coordinator-wide lock synchronized (lock) { // since we released the lock in the meantime, we need to re-check // that the conditions still hold. if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint while one was queued already"); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); pendingCheckpoints.put(checkpointID, checkpoint); ScheduledFuture<?> cancellerHandle = timer.schedule( canceller, checkpointTimeout, TimeUnit.MILLISECONDS); if (!checkpoint.setCancellerHandle(cancellerHandle)) { // checkpoint is already disposed! cancellerHandle.cancel(false); }
pendingcheckpoint在正式執行前還會再執行一遍前置檢查,主要等待完成的檢查點數量是否過多以及前后兩次完成的檢查點間隔是否過短等問題,這些檢查都通過后,會把之前定義好的cancller注冊到timer線程池,如果等待時間過長會主動回收checkpoint的資源。
3 啟動checkpoint執行
發送這個checkpoint的checkpointID和timestamp到各個對應的executor,也就是給各個TaskManger發一個TriggerCheckpoint類型的消息。
CheckpointOptions checkpointOptions; if (!props.isSavepoint()) { checkpointOptions = CheckpointOptions.forCheckpoint(); } else { checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory); } // send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); } numUnsuccessfulCheckpointsTriggers.set(0); return new CheckpointTriggerResult(checkpoint);
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is " + "no longer running."); } }
@Override public void triggerCheckpoint( ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { Preconditions.checkNotNull(executionAttemptID); Preconditions.checkNotNull(jobId); actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions)); }
其中,for (Execution execution: executions) 這里面的executions里面是所有的輸入節點,也就是flink source節點,所以checkpoint這些barrier 時間首先從jobmanager發送給了所有的source task
JobCheckpointingSettings settings = new JobCheckpointingSettings( triggerVertices, ackVertices, commitVertices, new CheckpointCoordinatorConfiguration( interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), retentionAfterTermination, isExactlyOnce), serializedStateBackend, serializedHooks); jobGraph for (JobVertex vertex : jobVertices.values()) { if (vertex.isInputVertex()) { triggerVertices.add(vertex.getID()); } commitVertices.add(vertex.getID()); ackVertices.add(vertex.getID()); }