轉發請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/8260370.html
flink checkpoint 源碼分析 (一)一文主要講述了在JobManager端定時生成TriggerCheckpoint的代碼部分,本文繼續研究下TaskManager端如何處理收到的TriggerCheckpoint消息並執行對應的備份操作。
TriggerCheckpoint消息進入TaskManager的處理路徑為 handleMessage -> handleCheckpointingMessage -> Task.triggerCheckpointBarrier
1 public void triggerCheckpointBarrier( 2 final long checkpointID, 3 long checkpointTimestamp, 4 final CheckpointOptions checkpointOptions) { 5 6 final AbstractInvokable invokable = this.invokable; 7 final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); 8 9 if (executionState == ExecutionState.RUNNING && invokable != null) { 10 if (invokable instanceof StatefulTask) { 11 // build a local closure 12 final StatefulTask statefulTask = (StatefulTask) invokable; 13 final String taskName = taskNameWithSubtask; 14 final SafetyNetCloseableRegistry safetyNetCloseableRegistry = 15 FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); 16 Runnable runnable = new Runnable() { 17 @Override 18 public void run() { 19 // set safety net from the task's context for checkpointing thread 20 LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); 21 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); 22 23 try { 24 boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions); 25 if (!success) { 26 checkpointResponder.declineCheckpoint( 27 getJobID(), getExecutionId(), checkpointID, 28 new CheckpointDeclineTaskNotReadyException(taskName)); 29 } 30 } 31 catch (Throwable t) { 32 if (getExecutionState() == ExecutionState.RUNNING) { 33 failExternally(new Exception( 34 "Error while triggering checkpoint " + checkpointID + " for " + 35 taskNameWithSubtask, t)); 36 } else { 37 LOG.debug("Encountered error while triggering checkpoint {} for " + 38 "{} ({}) while being not in state running.", checkpointID, 39 taskNameWithSubtask, executionId, t); 40 } 41 } finally { 42 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); 43 } 44 } 45 }; 46 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); 47 } 48 else { 49 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, 50 new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask)); 51 52 LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).", 53 taskNameWithSubtask, executionId); 54 55 } 56 } 57 else { 58 LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); 59 60 // send back a message that we did not do the checkpoint 61 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, 62 new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); 63 } 64 }
在正常的情況下,triggerCheckpointBarrier會調用StreamTask內部實現的triggerCheckpoint()方法,並根據調用鏈條
triggerCheckpoint->performCheckpoint->checkpointState->CheckpointingOperation.executeCheckpointing
public void executeCheckpointing() throws Exception { startSyncPartNano = System.nanoTime(); boolean failed = true; try { for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op); } if (LOG.isDebugEnabled()) { LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), owner.getName()); } startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread runAsyncCheckpointingAndAcknowledge(); failed = false; if (LOG.isDebugEnabled()) { LOG.debug("{} - finished synchronous part of checkpoint {}." + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); }
在executeCheckpointing方法里進行了兩個操作,首先是對該task對應的所有StreamOperator對象調用checkpointStreamOperator(op)
checkpointStreamOperator代碼:
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { // first call the legacy checkpoint code paths nonPartitionedStates.add(op.snapshotLegacyOperatorState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions)); OperatorSnapshotResult snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); snapshotInProgressList.add(snapshotInProgress); } else { nonPartitionedStates.add(null); OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult(); snapshotInProgressList.add(emptySnapshotInProgress); } }
StreamOperator的snapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)方法最終由它的子類AbstractStreamOperator給出了一個final實現
@Override public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult(); CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables())) { snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { snapshotInProgress.cancel(); } catch (Exception e) { snapshotException.addSuppressed(e); } throw new Exception("Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + '.', snapshotException); } return snapshotInProgress; }
上述代碼里的snapshotState(snapshotContext)方法在不同的最終operator中有自己的具體實現。
executeCheckpointing的第二個操作是然后是調用runAsyncCheckpointingAndAcknowledge執行
所有的state固化文件操作並返回acknowledgeCheckpoint給JobManager。
private static final class AsyncCheckpointRunnable implements Runnable, Closeable { ..... ..... if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), checkpointMetrics, subtaskState);
補充,在上文提到的performCheckpoint方法內,調用checkpointState方法之前,flink會把預先把checkpointBarrier發送到下游task,以便下游operator盡快開始他們的checkpoint進程,
這也是flink barrier機制生成barrier的地方。
synchronized (lock) { if (isRunning) { // we can do a checkpoint // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true;
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException { try { CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions); for (RecordWriterOutput<?> streamOutput : streamOutputs) { streamOutput.broadcastEvent(barrier); } } catch (InterruptedException e) { throw new IOException("Interrupted while broadcasting checkpoint barrier"); } }
上述描述的觸發checkpoint調用路徑是針對source task的鏈路。對於其余非souce的operator,
方法鏈路為StreamInputProcessor/StreamTwoInputProcessor.processInput() ->barrierHandler.getNextNonBlocked()->processBarrier ->notifyCheckpoint->triggerCheckpointOnBarrier
參考文檔: