flink checkpoint 源碼分析 (二)


轉發請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/8260370.html     

     flink checkpoint 源碼分析 (一)一文主要講述了在JobManager端定時生成TriggerCheckpoint的代碼部分,本文繼續研究下TaskManager端如何處理收到的TriggerCheckpoint消息並執行對應的備份操作。

      TriggerCheckpoint消息進入TaskManager的處理路徑為 handleMessage -> handleCheckpointingMessage -> Task.triggerCheckpointBarrier

 1     public void triggerCheckpointBarrier(
 2             final long checkpointID,
 3             long checkpointTimestamp,
 4             final CheckpointOptions checkpointOptions) {
 5 
 6         final AbstractInvokable invokable = this.invokable;
 7         final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
 8 
 9         if (executionState == ExecutionState.RUNNING && invokable != null) {
10             if (invokable instanceof StatefulTask) {
11                 // build a local closure
12                 final StatefulTask statefulTask = (StatefulTask) invokable;
13                 final String taskName = taskNameWithSubtask;
14                 final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
15                     FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
16                 Runnable runnable = new Runnable() {
17                     @Override
18                     public void run() {
19                         // set safety net from the task's context for checkpointing thread
20                         LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
21                         FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
22 
23                         try {
24                             boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
25                             if (!success) {
26                                 checkpointResponder.declineCheckpoint(
27                                         getJobID(), getExecutionId(), checkpointID,
28                                         new CheckpointDeclineTaskNotReadyException(taskName));
29                             }
30                         }
31                         catch (Throwable t) {
32                             if (getExecutionState() == ExecutionState.RUNNING) {
33                                 failExternally(new Exception(
34                                     "Error while triggering checkpoint " + checkpointID + " for " +
35                                         taskNameWithSubtask, t));
36                             } else {
37                                 LOG.debug("Encountered error while triggering checkpoint {} for " +
38                                     "{} ({}) while being not in state running.", checkpointID,
39                                     taskNameWithSubtask, executionId, t);
40                             }
41                         } finally {
42                             FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
43                         }
44                     }
45                 };
46                 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
47             }
48             else {
49                 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
50                         new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
51                 
52                 LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
53                         taskNameWithSubtask, executionId);
54 
55             }
56         }
57         else {
58             LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
59 
60             // send back a message that we did not do the checkpoint
61             checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
62                     new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
63         }
64     }

 

在正常的情況下,triggerCheckpointBarrier會調用StreamTask內部實現的triggerCheckpoint()方法,並根據調用鏈條

triggerCheckpoint->performCheckpoint->checkpointState->CheckpointingOperation.executeCheckpointing
    public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            boolean failed = true;
            try {
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                            checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
                runAsyncCheckpointingAndAcknowledge();
                failed = false;

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}." +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

 

在executeCheckpointing方法里進行了兩個操作,首先是對該task對應的所有StreamOperator對象調用checkpointStreamOperator(op)

checkpointStreamOperator代碼:

    private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
                // first call the legacy checkpoint code paths
                nonPartitionedStates.add(op.snapshotLegacyOperatorState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions));

                OperatorSnapshotResult snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                snapshotInProgressList.add(snapshotInProgress);
            } else {
                nonPartitionedStates.add(null);
                OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
                snapshotInProgressList.add(emptySnapshotInProgress);
            }
        }

 

StreamOperator的snapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)方法最終由它的子類AbstractStreamOperator給出了一個final實現

    @Override
    public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();

        CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);

        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                factory,
                keyGroupRange,
                getContainingTask().getCancelables())) {

            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + '.', snapshotException);
        }

        return snapshotInProgress;
    }

 

上述代碼里的snapshotState(snapshotContext)方法在不同的最終operator中有自己的具體實現。

executeCheckpointing的第二個操作是然后是調用runAsyncCheckpointingAndAcknowledge執行

所有的state固化文件操作並返回acknowledgeCheckpoint給JobManager。

    private static final class AsyncCheckpointRunnable implements Runnable, Closeable {
.....
.....

                if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                        CheckpointingOperation.AsynCheckpointState.COMPLETED)) {

                    owner.getEnvironment().acknowledgeCheckpoint(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetrics,
                        subtaskState);

 

 補充,在上文提到的performCheckpoint方法內,調用checkpointState方法之前,flink會把預先把checkpointBarrier發送到下游task,以便下游operator盡快開始他們的checkpoint進程,

這也是flink barrier機制生成barrier的地方。

    synchronized (lock) {
            if (isRunning) {
                // we can do a checkpoint

                // Since both state checkpointing and downstream barrier emission occurs in this
                // lock scope, they are an atomic operation regardless of the order in which they occur.
                // Given this, we immediately emit the checkpoint barriers, so the downstream operators
                // can start their checkpoint work as soon as possible
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;

 

    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
            for (RecordWriterOutput<?> streamOutput : streamOutputs) {
                streamOutput.broadcastEvent(barrier);
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }

 

 上述描述的觸發checkpoint調用路徑是針對source task的鏈路。對於其余非souce的operator,

方法鏈路為StreamInputProcessor/StreamTwoInputProcessor.processInput() ->barrierHandler.getNextNonBlocked()->processBarrier ->notifyCheckpoint->triggerCheckpointOnBarrier

 

 

參考文檔:

Flink 原理與實現:如何生成 StreamGraph


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM