Flink學習(三)狀態機制於容錯機制,State與CheckPoint


摘自Apache官網

 

一、State的基本概念

什么叫State?搜了一把叫做狀態機制。可以用作以下用途。為了保證 at least once, exactly once,Flink引入了State和Checkpoint

  • 某個task/operator某時刻的中間結果
  • 快照(snapshot)
  • 程序一旦crash,恢復用的
  • 機器學習模型的參數

 

二、Flink中包含的State

Keyed State和Opreator State

1、Keyed State基於KeyedStream的狀態。這個狀態是跟特定的key綁定的。對KeyedStream流上的每一個key,可能都對應一個state。

2、Operator State。於Keyed State不同。Operator State根一個特定的Operator綁定。整個Operator對應一個State。相比較而言一個State上有多個Keyed State。舉例子來說,在Flink中的Kafka Connector就使用了Operator State。會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。

 

一些原子State

  • ValueState:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值。

  • ListState:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值。

  • ReducingState:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合並到一個單一的狀態值。

  • MapState:即狀態值為一個map。用戶通過put或putAll方法添加元素。

上述的State僅僅與狀態進行交互。而真正的狀態值,有可能是存在內存,磁盤、或者其它分布式系統中。相當於只是我們持有了這個句柄。那么我們如何得到這個狀態的句柄呢?Flink通過StateDescriptor來定義一個狀態。這是一個抽象類,內部定義了狀態名稱、類型、序列化器等基礎信息。與上面的狀態對應、從StateDescriptor派生了ValueStateDescriptor,ListStateDescriptor等descriptor。

 

3、研究下Keyed State內部的結構

在StreamingRuntimeContext這個類中可以看到各個State的get方法

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

import java.util.List;
import java.util.Map;

/**
 * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
 * for streaming operators.
 */
@PublicEvolving
public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {

    /** The operator to which this function belongs. */
    private final AbstractStreamOperator<?> operator;

    /** The task environment running the operator. */
    private final Environment taskEnvironment;

    private final StreamConfig streamConfig;

    public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
                                    Environment env, Map<String, Accumulator<?, ?>> accumulators) {
        super(env.getTaskInfo(),
                env.getUserClassLoader(),
                operator.getExecutionConfig(),
                accumulators,
                env.getDistributedCacheEntries(),
                operator.getMetricGroup());

        this.operator = operator;
        this.taskEnvironment = env;
        this.streamConfig = new StreamConfig(env.getTaskConfiguration());
    }

    // ------------------------------------------------------------------------

    /**
     * Returns the input split provider associated with the operator.
     *
     * @return The input split provider.
     */
    public InputSplitProvider getInputSplitProvider() {
        return taskEnvironment.getInputSplitProvider();
    }

    public ProcessingTimeService getProcessingTimeService() {
        return operator.getProcessingTimeService();
    }

    // ------------------------------------------------------------------------
    //  broadcast variables
    // ------------------------------------------------------------------------

    @Override
    public boolean hasBroadcastVariable(String name) {
        throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
    }

    @Override
    public <RT> List<RT> getBroadcastVariable(String name) {
        throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
    }

    @Override
    public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
        throw new UnsupportedOperationException("Broadcast variables can only be used in DataSet programs");
    }

    // ------------------------------------------------------------------------
    //  key/value state
    // ------------------------------------------------------------------------

    @Override
    public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getState(stateProperties);
    }

    @Override
    public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getListState(stateProperties);
    }

    @Override
    public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getReducingState(stateProperties);
    }

    @Override
    public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getAggregatingState(stateProperties);
    }

    @Override
    public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getFoldingState(stateProperties);
    }

    @Override
    public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getMapState(stateProperties);
    }

    private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
        Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
        KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
        Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
        return keyedStateStore;
    }

    // ------------------ expose (read only) relevant information from the stream config -------- //

    /**
     * Returns true if checkpointing is enabled for the running job.
     *
     * @return true if checkpointing is enabled.
     */
    public boolean isCheckpointingEnabled() {
        return streamConfig.isCheckpointingEnabled();
    }

    /**
     * Returns the checkpointing mode.
     *
     * @return checkpointing mode
     */
    public CheckpointingMode getCheckpointMode() {
        return streamConfig.getCheckpointMode();
    }

    /**
     * Returns the buffer timeout of the job.
     *
     * @return buffer timeout (in milliseconds)
     */
    public long getBufferTimeout() {
        return streamConfig.getBufferTimeout();
    }

}

 

所有的State都繼承自StateDescpritor這個類。簡單看一下構造函數。實際上包含了三個參數,

  • 名稱
  • 類型--屬於哪一類state
  • 默認值
 1 protected StateDescriptor(String name, Class<T> type, T defaultValue) {
 2         this.name = requireNonNull(name, "name must not be null");
 3         requireNonNull(type, "type class must not be null");
 4 
 5         try {
 6             this.typeInfo = TypeExtractor.createTypeInfo(type);
 7         } catch (Exception e) {
 8             throw new RuntimeException(
 9                     "Could not create the type information for '" + type.getName() + "'. " +
10                     "The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
11                     "In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
12                     "For example, to describe 'Tuple2<String, String>' as a generic type, use " +
13                     "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
14         }
15 
16         this.defaultValue = defaultValue;
17     }

 

再具體的可以自己研究研究,建議工作遇到或者對源碼有興趣可以讀讀。或者結合實際應用理解下會更快

 

三、容錯機制

1、Flink的核心容錯機制是不斷的給數據流繪制Snapshots。當系統回滾的時候,這些snapshots就扮演了checkpoints的作用。快照機制受Chandy-Lamport 算法的啟發。

讀了下論文研究了下這個算法。英文的看不太懂。找了個中文的。

 

大概就是通過token 和marker來判斷是哪里出了問題和需要恢復。

具體可以參考:

https://www.jianshu.com/p/938001e998f5

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM