flink基礎教程


隨着大數據技術在各行各業的廣泛應用,要求能對海量數據進行實時處理的需求越來越多,同時數據處理的業務邏輯也越來越復雜,傳統的批處理方式和早期的流式處理框架也越來越難以在延遲性、吞吐量、容錯能力以及使用便捷性等方面滿足業務日益苛刻的要求。

在這種形勢下,新型流式處理框架Flink通過創造性地把現代大規模並行處理技術應用到流式處理中來,極大地改善了以前的流式處理框架所存在的問題。

 

一句話:flink是etl的工具。

flink的層次結構:

其中,

windows下flink示例程序的執行 簡單介紹了一下flink在windows下如何通過flink-webui運行已經打包完成的示例程序(jar)

從flink-example分析flink組件(1)WordCount batch實戰及源碼分析講到DataSet的轉換

從flink-example分析flink組件(2)WordCount batch實戰及源碼分析----flink如何在本地執行的?flink batch批處理如何在本地執行的

從flink-example分析flink組件(3)WordCount 流式實戰及源碼分析 flink stream流式處理如何在本地執行的?

使用flink Table &Sql api來構建批量和流式應用(1)Table的基本概念介紹了Table的基本概念及使用方法

使用flink Table &Sql api來構建批量和流式應用(2)Table API概述介紹了如何使用Table

使用flink Table &Sql api來構建批量和流式應用(3)Flink Sql 使用 介紹了如何使用sql

flink dataset api使用及原理 介紹了DataSet Api 

flink DataStream API使用及原理介紹了DataStream Api 

flink中的時間戳如何使用?---Watermark使用及原理 介紹了底層實現的基礎Watermark

flink window實例分析 介紹了window的概念及使用原理

Flink中的狀態與容錯 介紹了State的概念及checkpoint,savepoint的容錯機制

flink的特征

最后,給出官網給出的特征作為結束:

1、一切皆為流(All streaming use cases )

  • 事件驅動應用(Event-driven Applications)

              

  

  • 流式 & 批量分析(Stream & Batch Analytics)

    

 


  

  •  數據管道&ETL(Data Pipelines & ETL)

     

 

2、正確性保證(Guaranteed correctness)

  • 唯一狀態一致性(Exactly-once state consistency)
  • 事件-事件處理(Event-time processing)
  • 高超的最近數據處理(Sophisticated late data handling)

3、多層api(Layered APIs)   

  • 基於流式和批量數據處理的SQL(SQL on Stream & Batch Data)
  • 流水數據API & 數據集API(DataStream API & DataSet API)
  • 處理函數 (時間 & 狀態)(ProcessFunction (Time & State))

           

4、易用性

  • 部署靈活(Flexible deployment)
  • 高可用安裝(High-availability setup)
  • 保存點(Savepoints)

5、可擴展性

  • 可擴展架構(Scale-out architecture)
  • 大量狀態的支持(Support for very large state)
  • 增量檢查點(Incremental checkpointing)

6、高性能

  • 低延遲(Low latency)
  • 高吞吐量(High throughput)
  • 內存計算(In-Memory computing)

flink架構 

1、層級結構

 

2.工作架構圖

 

 

一:flink初識
Flink起源於Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目,2014年4月Stratosphere的代碼被復制並捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成為Apache軟件基金會的頂級項目。
Flink主頁在其頂部展示了該項目的理念:“Apache Flink是為分布式、高性能、隨時可用以及准確的流處理應用程序打造的開源流處理框架”
Flink將批處理(即處理有限的靜態數據)視作一種特殊的流處理。

 

 

DataStream API 流式處理的接口,DataSet API 面相批處理的接口

FlinkML:機器學習 CEP:復雜事件處理 Gelly圖計算 針對流式和批處理的Table API

二:flink基本架構
2.1:JobManager 與TaskManager
Flink運行時包含了兩種類型的處理器:JobManager處理器,TaskManager處理器

 

 

 

2.2 無界數據流與有界數據流
無界數據流:以特定的順序獲取event,立即處理

有界數據流:處理有界流不需要有序獲取,因為可以始終對有界數據集進行排序,有界的處理也稱為批處理

 

三:flink集群搭建

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

四:flink運行架構
4.1任務提交流程

 

 


4.2 任務調度原理

 

 


4.3 TaskManager與slots
每一個TaskManager是一個JVM進程,它可能會在獨立的線程上執行一個進程或多個subtask。為了控制一個TaskManager能接收多少task,TaskManager通過task slot來進行控制(一個TaskManager至少有一個task slot)

每個task slot 表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么他將其內存內存分成三份給各個slot。需要注意的是,這里不涉及到CPU的隔離。

Task Slot是靜態的概念,是指TaskManager具有的並發執行的能力,可以通過taskmanager.numberOfTaskSlots進行配置,而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的並發能力,可以通過paramllelism.default設置。

例:假設一共有3個TaskManager,每個TaskManager中分配3個TaskSlot,也就是每個TaskManager可以接收3 個Task,一共9個TaskSlot,如果我們設置parallelism.default=1,即運行程序的並行度為1,9個TaskSlot只用了1個,有8個空閑,因此要設置合適的並行度才可以提高效率。

五:flink DataStream API
六:Time與Window
七:EventTime與Window

 

Checkpoint 源碼流程:

  Flink MiniCluster 啟動流程 

  Flink CheckpointCoordinator 啟動流程  

  Flink Checkpoint 流程 

先貼段簡單的代碼

復制代碼
val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), prop)
val kafkaSource1 = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), prop)
val kafkaProducer = new FlinkKafkaProducer[String]("kafka_offset_out", new SimpleStringSchema(), prop)
val source = env
  .addSource(kafkaSource)
  .setParallelism(1)
  .name("source")
val source1 = env
  .addSource(kafkaSource1)
  .setParallelism(1)
  .name("source1")

source.union(source1)
  .map(node => {
    node.toString + ",flinkx"
  })
  .addSink(kafkaProducer)
復制代碼

很簡單,就是讀Kafka,再寫回kafka,主要是Checkpoint 的流程,代碼在這里就不重要了

-------------------------------------------

一個簡化的 Checkpoint 流圖

 

1、CheckpointCoordicator tirgger checkpoint 到 source
2、Source
  1、生成並廣播 CheckpointBarrier
  2、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator)
3、Map
  1、接收到 CheckpointBarrier
  2、廣播 CheckpointBarrier
  3、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator)
4、Sink
  1、接收到 CheckpointBarrier
  2、Snapshot state(完成后 ack Checkpoint 到 CheckpointCoordicator)
5、CheckpointCoordicator 接收到 所有 ack
  1、給所有算子發 notifyCheckpointComplete
6、Source、Map、Sink 收到 notifyCheckpointComplete

 

這里有個更好的圖: https://www.cnblogs.com/bethunebtj/p/9168274.html#5-%E4%B8%BA%E6%89%A7%E8%A1%8C%E4%BF%9D%E9%A9%BE%E6%8A%A4%E8%88%AAfault-tolerant%E4%B8%8E%E4%BF%9D%E8%AF%81exactly-once%E8%AF%AD%E4%B9%89

從流程圖上可以看出,Checkpoint 由 CheckpointCoordinator 發起、確認,通過Rpc 通知 Taskmanager 的具體算子完成 Checkpoint 操作。

Checkpoint Timer 啟動

Flink Checkpoint 是由 CheckpointCoordinator 協調啟動,有個內部類來做這個事情

復制代碼
private final class ScheduledTrigger implements Runnable {

    @Override
    public void run() {
        try {
            triggerCheckpoint(true);
        }
        catch (Exception e) {
            LOG.error("Exception while triggering checkpoint for job {}.", job, e);
        }
    }
}
復制代碼

JobMaster.java 啟動 Checkpoint 的 timer

復制代碼
private void startScheduling() {
        checkState(jobStatusListener == null);
        // register self as job status change listener
        jobStatusListener = new JobManagerJobStatusListener();
        schedulerNG.registerJobStatusListener(jobStatusListener);
        // 這個會調用到 CheckpointCoordinator.scheduleTriggerWithDelay 方法啟動第一次 Checkpoint,后續就由 CheckpointCoordinator 自己啟動
        schedulerNG.startScheduling();
    }
復制代碼

ScheduledTrigger 由 Timer 定時調用

復制代碼
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
    return timer.scheduleAtFixedRate(
        new ScheduledTrigger(),
        initDelay, baseInterval, TimeUnit.MILLISECONDS);
}

/**
 * Executes the given command periodically. The first execution is started after the
 * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
 * the third after {@code initialDelay + 2*period} and so on.
 * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
 * is cancelled.
 *
 * @param command the task to be executed periodically
 * @param initialDelay the time from now until the first execution is triggered  第一次啟動 Checkpoint 時間
 * @param period the time after which the next execution is triggered            后續的時間間隔
 * @param unit the time unit of the delay and period parameter
 * @return a ScheduledFuture representing the periodic task. This future never completes
 * unless an execution of the given task fails or if the future is cancelled
 */
ScheduledFuture<?> scheduleAtFixedRate(
    Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit);
復制代碼

ScheduledTrigger 開始 checkpoint

ScheduledTrigger.run 方法  調用 triggerCheckpoint 開始執行 checkpoint

復制代碼
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic) {
    return triggerCheckpoint(checkpointProperties, null, isPeriodic, false);
}

@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
        CheckpointProperties props,
        @Nullable String externalSavepointLocation,
        boolean isPeriodic,
        boolean advanceToEndOfTime) {


    CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
    requestDecider
        .chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime)
        //  調用 startTriggeringCheckpoint 方法
        .ifPresent(this::startTriggeringCheckpoint);
    return request.onCompletionPromise;
}

private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {

    // 獲取 需要觸發 Checkpoint 的算子
    final Execution[] executions = getTriggerExecutions();
....
    // no exception, no discarding, everything is OK
    final long checkpointId = checkpoint.getCheckpointId();
    snapshotTaskState(
        timestamp,
        checkpointId,
        checkpoint.getCheckpointStorageLocation(),
        request.props,
        executions,
        request.advanceToEndOfTime);

    coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));

    onTriggerSuccess();
............
}


private void snapshotTaskState(
        long timestamp,
        long checkpointID,
        CheckpointStorageLocation checkpointStorageLocation,
        CheckpointProperties props,
        Execution[] executions,
        boolean advanceToEndOfTime) {

        // send the messages to the tasks that trigger their checkpoint
        // 給每個 Execution (在這里可以理解為每個 Source 算子,因為Checkpoint 是從 Source 開始的) 發送 trigger checkpoint 消息
        for (Execution execution: executions) {
            if (props.isSynchronous()) {
                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
            } else {
                execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
            }
        }
    }
復制代碼

Execution.java 的 triggerCheckpoint 方法 調用 triggerCheckpointHelper 方法, 通過 TaskManagerGateway 發送 triggerCheckpoint 的 RPC 請求,

調用 RpcTaskManagerGateway.triggerCheckpoint 方法,然后調用 TaskExecutorGateway 的 triggerCheckpoint 方法(TaskExecutor繼承自 TaskExecutorGateway,就到 TaskManager 端了) 

復制代碼
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

    final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
    if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    final LogicalSlot slot = assignedResource;

    if (slot != null) {
        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

        taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
    } else {
        LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
    }
}
復制代碼

到這里, JobManager 端觸發 checkpoint 就完成了,下面就是 TaskManager 端接收 triggerCheckpoint 消息了

TaskManager 接收 triggerCheckpoint 消息

從上面可以知道,JobManager 給 TaskManager 發送 Rpc 請求,調用 RpcTaskManagerGateway.triggerCheckpoint 發送checkpoint 的 Rpc 到 TaskManager,TaskManager 接收到 Rpc 后,會反射到 TaskExecutor 的 triggerCheckpoint 方法,這里就進入 TaskManager 里面了

復制代碼
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) {
    log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

    final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
    if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }
    // 使用 operator id 獲取 job 對應的 Source 算子 (如果有多個 Source,JobManager 端會發送兩個 Rpc 請求,TaskManager 也是執行兩次)
    final Task task = taskSlotTable.getTask(executionAttemptID);

    if (task != null) {
        // 對 task 觸發 一個 Checkpoint Barrier
        task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

        return CompletableFuture.completedFuture(Acknowledge.get());
    } else {
        final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

        log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }
}
復制代碼

Flink Checkpoint 在 TaskManager 端的處理過程是從 Source 開始的,JobManager 會給每個Source 算子發送一個 觸發 Checkpoint 的 Rpc 請求,TaskManager 接收到對應 Source 算子的 Checkpoint Rpc 請求后,就開始執行對應流程,同時會往自己的下游算子廣播 CheckpointBarrier。

對應 Kafka Source,執行 Checkpoint 的方法 是 FlinkKafkaConsumerBase.snapshotState,Checkpoint 的時候,從 TaskExecutor 到 FlinkKafkaConsumerBase.snapshotState 的調用棧如下,調用棧比較長,就簡單列下調用的方法

復制代碼
TaskExecutor.triggerCheckpoint  --> task.triggerCheckpointBarrier

Task.triggerCheckpointBarrier   --> invokable.triggerCheckpointAsync

SourceStreamTask.triggerCheckpointAsync   -->   super.triggerCheckpointAsync

StreamTask.triggerCheckpointAsync   -->  triggerCheckpoint

StreamTask.triggerCheckpoint  ---> performCheckpoint

StreamTask.performCheckpoint  --> subtaskCheckpointCoordinator.checkpointState

SubtaskCheckpointCoordinatorImpl.checkpointState       --->  takeSnapshotSync    (會廣播 CheckpointBarrier 到下游算子, takeSnapshotSync 成成功后會發送 ack 到 JobMaster)

SubtaskCheckpointCoordinatorImpl.takeSnapshotSync   --->  buildOperatorSnapshotFutures

SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures   ---> checkpointStreamOperator

SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator   --->  op.snapshotState

AbstractStreamOperator.snapshotState   ---> stateHandler.snapshotState

StreamOperatorStateHandler.snapshotState   --->  snapshotState              (會調用 operatorStateBackend.snapshot/keyedStateBackend.snapshot 將 state 持久化到 stateBackend )

StreamOperatorStateHandler.snapshotState   --->  streamOperator.snapshotState(snapshotContext)

AbstractUdfStreamOperator.snapshotState   ---> StreamingFunctionUtils.snapshotFunctionState

StreamingFunctionUtils.snapshotFunctionState   ---> trySnapshotFunctionState

StreamingFunctionUtils.trySnapshotFunctionState   ---> ((CheckpointedFunction) userFunction).snapshotState  (就進入 udf 了,這里是:FlinkKafkaConsumerBase, 如果是自定義的 Source,會進入對應Source 的 snapshotState 方法)

FlinkKafkaConsumerBase.snapshotState
復制代碼

Kafka Source Checkpoint 

Flink State 可以分為 KeyedState 和 OperatorState,KeyedState 在keyBy 算子之后使用,OperatorState 使用較多的就是存儲Source 和 Sink 的狀態,比如Kafka Source 存儲當前消費的 offset。 其他算子想使用 OperatorState 需要實現 CheckpointedFunction,Operator state 存在 taskManager 的 heap 中,不建議存儲大狀態。

Kafka Source 的 checkpoint 是在 FlinkKafkaConsumerBase 中實現的,具體方法是: snapshotState
FlinkKafkaConsumerBase 的 checkpoint 流程 大概是,獲取 kafkaFetcher 的

復制代碼
/** Accessor for state in the operator state backend. */
// ListState 存儲狀態,Checkpoint 的時候只需要將 KafkaTopicPartition 和 Offset 放入這個對象中,Checkpoint 的時候,就會寫入 statebackend
// Operator state 都是這樣的,自己實現的也是,將對應內容寫入狀態就可以了
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;


@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    if (!running) {
        LOG.debug("snapshotState() called on closed source");
    } else {
        // 消費者還在運行
        // 清楚之前的狀態
        unionOffsetStates.clear();

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            // the fetcher has not yet been initialized, which means we need to return the
            // originally restored offsets or the assigned partitions
            for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // the map cannot be asynchronously updated, because only one checkpoint call can happen
                // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
            }
        } else {
            // 獲取對應的 KafkaTopicPartition 和 offset
            HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
            // 放到 pendingOffsetsToCommit 中
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // the map cannot be asynchronously updated, because only one checkpoint call can happen
                // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
            }
            // 將 KafkaTopicPartition 和 offset 寫入 operator state 中
            for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                unionOffsetStates.add(
                        Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
            }
        }
        // 移除還未 提交的 offset
        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // truncate the map of pending offsets to commit, to prevent infinite growth
            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                pendingOffsetsToCommit.remove(0);
            }
        }
    }
}
復制代碼

當 Checkpoint 完成的時候,會調用到 FlinkKafkaConsumerBase 的 notifyCheckpointComplete 方法,會提交 offset 到 kafka 中,到這里 Kafka Source 的 Checkpoint 就完成了。

fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);

算子快照完成后會給 JobMaster 發個消息說快照完成了

AsyncCheckpointRunnable.java

復制代碼
private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) {
    // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
    // AsyncCheckpointRunnable 的 run 方法,會給 JobMaster 發送 Checkpoint 完成的消息
    executorService.execute(new AsyncCheckpointRunnable(
        snapshotFutures,
        metadata,
        metrics,
        System.nanoTime(),
        taskName,
        registerConsumer(),
        unregisterConsumer(),
        env,
        asyncExceptionHandler));
}
復制代碼

AsyncCheckpointRunnable.run

復制代碼
public void run() {

    ..........
    if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
        // report ack
        reportCompletedSnapshotStates(
            jobManagerTaskOperatorSubtaskStates,
            localTaskOperatorSubtaskStates,
            asyncDurationMillis);

    } else {
        LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
            taskName,
            checkpointMetaData.getCheckpointId());
    }
    .........

private void reportCompletedSnapshotStates(
    TaskStateSnapshot acknowledgedTaskStateSnapshot,
    TaskStateSnapshot localTaskStateSnapshot,
    long asyncDurationMillis) {


    // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
    // to stateless tasks on restore. This enables simple job modifications that only concern
    // stateless without the need to assign them uids to match their (always empty) states.
    taskEnvironment.getTaskStateManager().reportTaskStateSnapshots(
        checkpointMetaData,
        checkpointMetrics,
        hasAckState ? acknowledgedTaskStateSnapshot : null,
        hasLocalState ? localTaskStateSnapshot : null);

}
復制代碼

一路往下查看,會找到發送 Rpc 消息的地方:

下游算子 map Checkpoint

看 map 的 Checkpoint 流程,直接在 Map 算子上打個斷點,看下 調用棧就知道了

StreamTaskNetworkInput.emitNext 處理輸入數據和消息(Checkpoint)

StreamTaskNetworkInput.processElement

OneInputStreamTask.emitRecord

StreamMap.processElement 調用 userFunction.map 就是 我們代碼中的map 了

由於 Map 沒有狀態需要緩存,所以沒有實現 CheckpointedFunction,這里只列 出 CheckpointBarrier 廣播部分

如果消息是 Checkpoint:

StreamTaskNetworkInput.emitNext

復制代碼
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {

    while (true) {
        // get the stream element from the deserializer
        if (currentRecordDeserializer != null) {
            DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
            if (result.isBufferConsumed()) {
                currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                currentRecordDeserializer = null;
            }

            if (result.isFullRecord()) {
                // 數據的處理流程,從這里調用直到 user function 的map 中
                processElement(deserializationDelegate.getInstance(), output);
                return InputStatus.MORE_AVAILABLE;
            }
        }
        // 從 Checkpoint InputGate 讀 CheckpointBarrier
        Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
        ...
    }
}
復制代碼

CheckpointedInputGate.pollNext

復制代碼
@Override
public Optional<BufferOrEvent> pollNext() throws Exception {
    while (true) {

        ....
        Optional<BufferOrEvent> next = inputGate.pollNext();

        else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
            // CheckpointBarrier 的處理流程
            barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo());
            return next;
        }
    ...
    }
}
復制代碼

barrierHandler.processBarrier 方法中對 Checkpoint 的處理流程跟 FlinkKafkaConsumerBase.snapshotState 調用的流程差不多

從 barrierHandler.processBarrier 調用到 SubtaskCheckpointCoordinatorImpl.checkpointState 往下游廣播 CheckpointBarrier

Kafka Sink Checkpoint

Kafka Sink 的 Checkpoint 也是從 Sink 收到 CheckpointBarrier 開始的

接收 CheckpointBarrier 的流程和 Map 一樣(所有算子都一樣,Source 是生成 CheckpointBarrier 的算子)

之后的流程就和 Source 一樣,一路調用到 FlinkKafkaConsumerBase.snapshotState 做快照

與 Kafka Source 一樣,Kafka Sink 也是將這次提交的內容放入 ListState 中,Sink 的 Checkpoint 實現了 TwoPhaseCommitSinkFunction(用以實現 精確一次 語義)

FlinkKafkaProducer.java

復制代碼
/**
 * State for nextTransactionalIdHint.
 */
private transient ListState<FlinkKafkaProducer.NextTransactionalIdHint> nextTransactionalIdHintState;

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    super.snapshotState(context);

    nextTransactionalIdHintState.clear();
    // To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the
    // subtasks would write exactly same information.
    if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
        checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
        long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

        // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
        // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
        // scaling up.
        if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
            nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
        }
        // 精確一次語義的 Checkpoint 狀態
        nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
            getRuntimeContext().getNumberOfParallelSubtasks(),
            nextFreeTransactionalId));
    }
}
復制代碼

TwoPhaseCommitSinkFunction.java

復制代碼
protected transient ListState<State<TXN, CONTEXT>> state;

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // this is like the pre-commit of a 2-phase-commit transaction
    // we are ready to commit and remember the transaction

    checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

    long checkpointId = context.getCheckpointId();
    LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
    // 預提交即調用 producer.flush 提交數據到 Kafka
    preCommit(currentTransactionHolder.handle);
    pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
    LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
    //     開啟事務
    currentTransactionHolder = beginTransactionInternal();
    LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

    state.clear();
    // 將 事務信息寫入 狀態
    state.add(new State<>(
        this.currentTransactionHolder,
        new ArrayList<>(pendingCommitTransactions.values()),
        userContext));
}
復制代碼

看到這里,應該都發現了 FlinkkafkaConsumerBase 和 TwoPhaseCommitSinkFunction 都有 notifyCheckpointComplete,在這個方法才真正完成 Checkpoint 往外部數據寫入 offset,提交事務。

注: Sink 完成 snaposhot 完成后會給 JobMaster 發送 ack 消息,與 Source 部分相同

JobManager 發送 confirmCheckpoint 消息給 TaskManager

JobManager 接收 checkpoint snapshotState 完成的消息

jobmanager接收完成 snapshotState 消息,然后會給 TaskManager 發送 所以算子完成快照的消息,調用算子的 notifyCheckpointComplete 方法,完成 Checkpoint 全部過程。

CheckpointCoordinator.receiveAcknowledgeMessage

CheckpointCoordinator.completePendingCheckpoint

CheckpointCoordinator.sendAcknowledgeMessages

復制代碼
private void sendAcknowledgeMessages(long checkpointId, long timestamp) {
    // commit tasks
    for (ExecutionVertex ev : tasksToCommitTo) {
        Execution ee = ev.getCurrentExecutionAttempt();
        if (ee != null) {
            ee.notifyCheckpointComplete(checkpointId, timestamp);
        }
    }

    // commit coordinators
    for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
        coordinatorContext.checkpointComplete(checkpointId);
    }
}
復制代碼

從 ee.notifyCheckpointComplete 進去,可以看到發送 Rpc 消息的地方

Kafka Source/Sink 接收 notifyCheckpointComplete

對於 Source 從TaskManager 收到 confirmCheckpoint 開始

圖片: tm接收confirmCheckpoint消息

一路調用到 FlinkKafkaConsumerBase.notifyCheckpointComplete 提交 offset 到 Kafka

Sink 基本一樣,不過 FlinkKafkaProducer 的 notifyCheckpointComplete 在 TwoPhaseCommitSinkFunction 中(繼承來的)

里面會調用到 FlinkKafkaProducer.commit 方法,提交 Kafka 事務

復制代碼
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
    if (transaction.isTransactional()) {
        try {
            // 調用 KafkaProduce 的方法,提交事務
            transaction.producer.commitTransaction();
        } finally {
            recycleTransactionalProducer(transaction.producer);
        }
    }
}
復制代碼

至此 Checkpoint的前台流程就全部完成了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM