Flink源碼解讀之狀態管理


一、從何說起

State要能發揮作用,就需要持久化到可靠存儲中,flink中持久化的動作就是checkpointing,那么從TM中執行的Task的基類StreamTask的checkpoint邏輯說起。

1.streamTask

 1 StreamTask
 2 
 3 protected OperatorChain<OUT, OP> operatorChain;
 4 CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator)
 5 <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 6 
 7       TypeSerializer<K> keySerializer,
 8 
 9       int numberOfKeyGroups,
10 
11       KeyGroupRange keyGroupRange)
12 OperatorStateBackend createOperatorStateBackend(
13 
14       StreamOperator<?> op, Collection<OperatorStateHandle> restoreStateHandles)
15 CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation)
16 StateBackend createStateBackend()
17 boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
18 void triggerCheckpointOnBarrier(
19 
20       CheckpointMetaData checkpointMetaData,
21 
22       CheckpointOptions checkpointOptions,
23 
24       CheckpointMetrics checkpointMetrics)
25 boolean performCheckpoint(
26 
27       CheckpointMetaData checkpointMetaData,
28 
29       CheckpointOptions checkpointOptions,
30 
31       CheckpointMetrics checkpointMetrics)
32 void checkpointState(
33 
34       CheckpointMetaData checkpointMetaData,
35 
36       CheckpointOptions checkpointOptions,
37 
38       CheckpointMetrics checkpointMetrics)
triggerCheckpoint->performCheckpoint->checkpointState,最終來到了checkpointingOperation。

2.checkpointingOperation

 1 CheckpointingOperation
 2 void executeCheckpointing(){
 3 ……
 4 for (StreamOperator<?> op : allOperators) {
 5    checkpointStreamOperator(op);
 6 }
 7 
 8 ……
 9 }
10 void checkpointStreamOperator(StreamOperator<?> op)
11        ……
12        op.snapshotState(
13 
14        checkpointMetaData.getCheckpointId(),
15 
16        checkpointMetaData.getTimestamp(),
17 
18        checkpointOptions)
19        ……

這個類中,直接對streamTask中傳入的每一個operator調用其snapshotState方法。

那就再看Operator的基類。

3.StreamOperator

 1 StreamOperator
 2 OperatorSnapshotResult snapshotState(
 3 
 4    long checkpointId,
 5 
 6    long timestamp,
 7 
 8    CheckpointOptions checkpointOptions)
 9 void initializeState(OperatorSubtaskState stateHandles)
10 void notifyOfCompletedCheckpoint(long checkpointId)

StreamOperator是一個接口,其中包含了這三個接口,意味着繼承它的Operator都必須實現這幾個方法。

4.AbstractStreamOperator

 1 AbstractStreamOperator
 2 Final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions)
 3 
 4        ……
 5       snapshotState(snapshotContext);
 6        ……
 7        if (null != operatorStateBackend) {
 8 
 9            snapshotInProgress.setOperatorStateManagedFuture(
10 
11            operatorStateBackend.snapshot(checkpointId, timestamp, factory,              checkpointOptions));
12 
13        }
14 
15 
16 
17        if (null != keyedStateBackend) {
18 
19            snapshotInProgress.setKeyedStateManagedFuture(
20 
21            keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
22 
23        }
24       ……
25 void notifyOfCompletedCheckpoint(long checkpointId)
26       if (keyedStateBackend != null) {
27 
28            keyedStateBackend.notifyCheckpointComplete(checkpointId);
29 
30       }
31 void snapshotState(StateSnapshotContext context)
32 void initializeState(StateInitializationContext context)
AbstractStreamOperator是對StreamOperator的基礎實現,在它的snapshotState方法中,分別調用了OperatorStateBackend和KeyedStateBackend的snapshot方法。
特別注意,在調用這兩個方法之前的snapshotState(snapshotContext)這個調用,它一方面實現了Raw的State的snapshot,一方面也實現了用戶自定義的函數的State的更新。

再說一下,后面的兩個函數,snapshotState和initializeState,他們的形參都是一個context,是提供給用戶來重新實現用戶自己的state的checkpoints的。

 

這個類有一個很重要的子類,AbstractUdfStreamOperator,很多Operator都從這個類開始繼承。

5.AbstractUdfStreamOperator

AbstractUdfStreamOperator
void initializeState(StateInitializationContext context) throws Exception {

   super.initializeState(context);

   StreamingFunctionUtils.restoreFunctionState(context, userFunction);
void snapshotState(StateSnapshotContext context) throws Exception {

   super.snapshotState(context);

   StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction)
這里可以很明顯的看到,在實現父類的方法的過程中,它添加了東西,就是userFunction的restore和snapshot。

 

看看上面這些子類,真正會被實例化的Operator。

6.StreamingFunctionUtils

 1 StreamingFunctionUtils
 2 void snapshotFunctionState(
 3 
 4       StateSnapshotContext context,
 5 
 6       OperatorStateBackend backend,
 7 
 8       Function userFunction {
 9 ……
10 while (true) {
11 
12 
13 
14       if (trySnapshotFunctionState(context, backend, userFunction)) {
15 
16          break;
17 
18       }
19 
20 
21 
22       // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
23 
24       if (userFunction instanceof WrappingFunction) {
25 
26          userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
27 
28       } else {
29 
30          break;
31 
32       }
33 
34    }
35 
36 }
37 boolean trySnapshotFunctionState(
38 
39       StateSnapshotContext context,
40 
41       OperatorStateBackend backend,
42 
43       Function userFunction) throws Exception {
44 
45    if (userFunction instanceof CheckpointedFunction) {
46 
47       ……
48 
49       return true;
50 
51    }
52 
53    if (userFunction instanceof ListCheckpointed) {
54 
55       ……
56       return true;
57 
58    }
59 
60    return false;
61 
62 }

從上面可以看到,這個Util的作用,就用就是把用戶實現的CheckpointedFunction和ListCheckpointed來做restore和snapshot。

二、工廠

上面從task和operator的層面說明了state保存的過程,那么保存到哪里?就由下面的三個工廠類來提供。

7.State backend

 

 

 

 

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

CheckpointStream

MemCheckpointStreamFactory

FsCheckpointStreamFactory

FsCheckpointStreamFactory

SavepointStream

MemCheckpointStreamFactory

FsSavepointStreamFactory

FsSavepointStreamFactory

KeyedStateBackend

HeapKeyedStateBackend

HeapKeyedStateBackend

RocksDBKeyedStateBackend

OperatorStateBackend

DefaultOperatorStateBackend

DefaultOperatorStateBackend

DefaultOperatorStateBackend

RocksDBStateBackend的構造函數可以傳入一個AbstractStateBackend,否則默認采用FsStateBackend

可以看到,從OperatorState的角度來講,目前Flink只有一個實現,即DefaultOperatorStateBackend,它將List風格的State保存在內存中。

從KeyedState的角度來講,目前有兩種實現,HeapKeyedStateBackend將state保存在內存中,而RocksDbKeyedStateBackend將State保存在TM本地的RocksDB中。相對而言,前者在內存中,速度會快,效率高,但一方面會限制state的大小,另一方面也會造成JVM自己的內存問題;后者在本地文件中,就會涉及序列化和反序列化,效率不及前者,但可以保存的state的大小會很大。

從checkpoint和savepoint的角度來看,Memory工廠方法都保存在內存中,顯然不能在生產環境使用,而Fs工廠方法和RocksDb工廠方法,則統一都放在文件系統中,比如HDFS。

三、房子

具體存儲State的目前有三種,以DefaultOperatorStateBackend作為OperatorState的例子,以及HeapKeyedStateBackend作為KeyedState的例子來看。

8.DefaultOperatorStateBackend

DefaultOperatorStateBackend
Map<String, PartitionableListState<?>> registeredStates;
RunnableFuture<OperatorStateHandle> snapshot(

      final long checkpointId,

      final long timestamp,

      final CheckpointStreamFactory streamFactory,

      final CheckpointOptions checkpointOptions)
……
if (registeredStates.isEmpty()) {

   return DoneFuture.nullValue();

}
……
for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) 
……
ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)

 

這里截取了三個方法,其中registeredStates可以看到,其還是以map的方式在存儲,snapshotState方法具體實現了剛才在AbstractStreamOperator中調用snapshotState的方法,后面的getListState提供了在用戶編程中提供ListState實例的接口。

1 PartitionableListState<S>
2 /**
3 
4  * The internal list the holds the elements of the state
5 
6  */
7 
8 private final ArrayList<S> internalList;

由此可以看出 OperatorState都保存在內存中,本質上還是一個ArrayList。

 

9.HeapKeyedStateBackend

 

 1 * @param <K> The key by which state is keyed.
 2 
 3 HeapKeyedStateBackend<K>
 4 
 5 /**
 6  * Map of state tables that stores all state of key/value states. We store it centrally so
 7  * that we can easily checkpoint/restore it.
 8  *
 9  * <p>The actual parameters of StateTable are {@code StateTable<NamespaceT, Map<KeyT, StateT>>}
10  * but we can't put them here because different key/value states with different types and
11  * namespace types share this central list of tables.
12  */
13 private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
14 
15 <N, V> InternalValueState<N, V> createValueState(
16       TypeSerializer<N> namespaceSerializer,
17       ValueStateDescriptor<V> stateDesc){
18    StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
19    return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
20 }
21 
22  
23 <N, T> InternalListState<N, T> createListState(
24       TypeSerializer<N> namespaceSerializer,
25       ListStateDescriptor<T> stateDesc)
26 
27       new HeapListState<>
28 <N, T> InternalReducingState<N, T> createReducingState(
29       TypeSerializer<N> namespaceSerializer,
30       ReducingStateDescriptor<T> stateDesc)
31 
32       new HeapReducingState<>
33 <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
34       TypeSerializer<N> namespaceSerializer,
35       AggregatingStateDescriptor<T, ACC, R> stateDesc)
36 
37       new HeapAggregatingState<>
38 <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
39       TypeSerializer<N> namespaceSerializer,
40       FoldingStateDescriptor<T, ACC> stateDesc)
41 
42       new HeapFoldingState<>
43 <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
44       MapStateDescriptor<UK, UV> stateDesc)
45 
46       new HeapMapState<>
47 RunnableFuture<KeyedStateHandle> snapshot(
48       final long checkpointId,
49       final long timestamp,
50       final CheckpointStreamFactory streamFactory,
51       CheckpointOptions checkpointOptions)
52 
53 ……
54 
55 if (!hasRegisteredState()) {
56    return DoneFuture.nullValue();
57 }
58 
59 ……

這里也類似,幾個create方法也都提供了在用戶編程中可以調用的接口,分別返回對應類型的State。snapshotState也是對AbstractStreamOperator中調用的具體實現。

四、通道

所謂通道,也就是通過用戶編程,如何使得用戶使用的State和上面的DefaultOperatorStateBackend和HeapKeyedStateBackend發生關聯。用戶編程中首先面對的就是StreamingRuntimeContext這個類。

10.StreamingRuntimeContext

 

1 StreamingRuntimeContext
2 
3 public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
4    KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
5    stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
6    return keyedStateStore.getState(stateProperties);
7 }

這里只截取了getState的方法,其他類型的State的方法類似,這里也很簡單,就是看看是否能拿到KeyedStateStore,然后用其去生成State。

11.PerWindowStateStore

 

 1 PerWindowStateStore
 2 
 3 @Override
 4 public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
 5    try {
 6       return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
 7    } catch (Exception e) {
 8       throw new RuntimeException("Could not retrieve state", e);
 9    }
10 }

PerWindowStateStore是KeyedStateStore的一個子類,具體實現了如何去拿。其中的getPartitionedState最終還是調到了AbstractStreamOperator。

12.AbstractStreamOperator

 

 1 AbstractStreamOperator
 2 
 3 protected <S extends State, N> S getPartitionedState(
 4       N namespace,
 5       TypeSerializer<N> namespaceSerializer,
 6       StateDescriptor<S, ?> stateDescriptor) throws Exception {
 7 
 8    /*
 9     TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
10     This method should be removed for the sake of namespaces being lazily fetched from the keyed
11     state backend, or being set on the state directly.
12     */
13 
14    if (keyedStateStore != null) {
15       return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
16    } else {
17       throw new RuntimeException("Cannot create partitioned state. The keyed state " +
18          "backend has not been set. This indicates that the operator is not " +
19          "partitioned/keyed.");
20    }
21 }

這里也就是一個二傳手的作用,還是調回了keyedStateBackend的方法。

13.AbstractKeyedStateBackend

 1 AbstractKeyedStateBackend
 2 
 3 <N, S extends State> S getPartitionedState(
 4       final N namespace,
 5       final TypeSerializer<N> namespaceSerializer,
 6       final StateDescriptor<S, ?> stateDescriptor)
 7 
 8 <N, S extends State, V> S getOrCreateKeyedState(
 9       final TypeSerializer<N> namespaceSerializer,
10       StateDescriptor<S, V> stateDescriptor)
11 
12 // create a new blank key/value state
13 S state = stateDescriptor.bind(new StateBinder() {
14    @Override
15    public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
16       return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
17    }
18 
19    @Override
20    public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
21       return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
22    }
23 
24    @Override
25    public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
26       return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
27    }
28 
29    @Override
30    public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
31          AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
32       return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
33    }
34 
35    @Override
36    public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
37       return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
38    }
39 
40    @Override
41    public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
42       return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
43    }
44 
45 });

可以看到這里才是真正實現State生成的邏輯,在stateDescriptor.bind這里實現了一個向上綁定,還是比較微妙的。其實在真正的運行中,這里的this就會變成HeapKeyedStateBacked或者RocksDbKeyedStateBackend,它們才真正負責最后的生成。

14.StateInitializationContextImpl

 

1 StateInitializationContextImpl
2 public OperatorStateStore getOperatorStateStore() {
3 
4    return operatorStateStore;
5 
6 }

這個是OperatorState的部分,最終也會調到DefaultOperatorStateBackend的getListState方法,創建state,並注冊state。

五、狀態

說完了用處,存儲和發生關聯,這里才是State本尊的介紹。先來看看如果要實現OperatorState怎么弄。

15.CheckpointedFunction and ListCheckpointed

1 interface CheckpointedFunction {
2 
3    void snapshotState(FunctionSnapshotContext context) throws Exception;
4 
5    void initializeState(FunctionInitializationContext context) throws Exception;
6 
7 }

1 public interface ListCheckpointed<T extends Serializable> {
2 
3       List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
4 
5       void restoreState(List<T> state) throws Exception;
6 
7 }

  1 public class BufferingSink
  2 
  3         implements SinkFunction<Tuple2<String, Integer>>,
  4 
  5         CheckpointedFunction {
  6 
  7 
  8 
  9     private final int threshold;
 10 
 11    
 12 //pay attention here, the definition of the state
 13 
 14     private transient ListState<Tuple2<String, Integer>> checkpointedState;
 15 
 16    
 17 
 18     private List<Tuple2<String, Integer>> bufferedElements;
 19 
 20 
 21 
 22     public BufferingSink(int threshold) {
 23 
 24         this.threshold = threshold;
 25 
 26         this.bufferedElements = new ArrayList<>();
 27 
 28     }
 29 
 30 
 31 
 32     @Override
 33 
 34     public void invoke(Tuple2<String, Integer> value) throws Exception {
 35 
 36         bufferedElements.add(value);
 37 
 38         if (bufferedElements.size() == threshold) {
 39 
 40             for (Tuple2<String, Integer> element: bufferedElements) {
 41 
 42                 // send it to the sink
 43 
 44             }
 45 
 46             bufferedElements.clear();
 47 
 48         }
 49 
 50     }
 51 
 52 
 53 
 54     @Override
 55 
 56     public void snapshotState(FunctionSnapshotContext context) throws Exception {
 57 
 58         checkpointedState.clear();
 59 
 60         for (Tuple2<String, Integer> element : bufferedElements) {
 61 
 62             checkpointedState.add(element);
 63 
 64         }
 65 
 66     }
 67 
 68 
 69 
 70     @Override
 71 
 72     public void initializeState(FunctionInitializationContext context) throws Exception {
 73 
 74         //new a descriptor
 75 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
 76 
 77                 new ListStateDescriptor<>(
 78 
 79                         "buffered-elements",
 80 
 81                         TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
 82 
 83         //get the state by OperatorStateStor
 84 
 85         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 86 
 87         //unlike keyed state, flink will do the restore, user should take care of the restore of the operator state
 88 
 89         if (context.isRestored()) {
 90 
 91             for (Tuple2<String, Integer> element : checkpointedState.get()) {
 92 
 93                 bufferedElements.add(element);
 94 
 95             }
 96 
 97         }
 98 
 99     }
100 
101 }

創建一個ListStateDescriptor,然后從context中獲取OperatorStateStore,也就是剛才的DefaultOperatorStateStore來具體生成狀態。

這里關鍵的一點在於initializeState方法中的isRestored的判斷,需要用戶自己來決定如何恢復State。

16.RichFunction

獲取任何的KeyedState都必須在RichFunction的子類中才能進行。

 

 1 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 2 
 3 
 4 
 5     /**
 6 
 7      * The ValueState handle. The first field is the count, the second field a running sum.
 8 
 9      */
10 
11     private transient ValueState<Tuple2<Long, Long>> sum;//the Keyed State definition
12 
13 
14 
15     @Override
16 
17     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
18 
19 
20 
21         // access the state value
22 
23         Tuple2<Long, Long> currentSum = sum.value();
24 
25 
26 
27         // update the count
28 
29         currentSum.f0 += 1;
30 
31 
32 
33         // add the second field of the input value
34 
35         currentSum.f1 += input.f1;
36 
37 
38 
39         // make sure to update the state
40 
41         sum.update(currentSum);
42 
43 
44 
45         // if the count reaches 2, emit the average and clear the state
46 
47         if (currentSum.f0 >= 2) {
48 
49             out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
50 
51             sum.clear();
52 
53         }
54 
55     }
56 
57 
58 
59     @Override
60 
61     public void open(Configuration config) {
62         //new a descriptor according to the Keyed State
63 
64         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
65 
66                 new ValueStateDescriptor<>(
67 
68                         "average", // the state name
69 
70                         TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
71 
72                         Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
73 
74         //using the context to get the Keyed State
75         sum = getRuntimeContext().getState(descriptor);
76 
77     }
78 
79 }
80 
81 
82 
83 // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
84 
85 env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
86 
87         .keyBy(0)
88 
89         .flatMap(new CountWindowAverage())
90 
91         .print();
92 
93 
94 
95 // the printed output will be (1,4) and (1,5)

這里的Open方法也類似,都是定義一個descriptor,然后直接在context上獲取對應的State。

 

17.State type

 

Managed State

Raw State

Keyed State

RichFunction

1,2

OperatorState

CheckpointedFunction

ListCheckpointed

1. AbstractStreamOperator.initializeState(StateInitializationContext context)

2. AbstractStreamOperator.snapshotState(StateSnapshotContext context)

 

Keyed State:

ValueState<T>:保持一個可以更新和獲取的值(每個Key一個value),可以用來update(T)更新,用來T value()獲取。

ListState<T>: 保持一個值的列表,用add(T) 或者 addAll(List<T>)來添加,用Iterable<T> get()來獲取。

ReducingState<T>: 保持一個值,這個值是狀態的很多值的聚合結果,接口和ListState類似,但是可以用相應的ReduceFunction來聚合。

AggregatingState<IN, OUT>:保持很多值的聚合結果的單一值,與ReducingState相比,不同點在於聚合類型可以和元素類型不同,提供AggregateFunction來實現聚合。

FoldingState<T, ACC>: 與AggregatingState類似,除了使用FoldFunction進行聚合。

MapState<UK, UV>: 保持一組映射,可以將kv放進這個狀態,使用put(UK, UV) or putAll(Map<UK, UV>)添加,或者使用get(UK)獲取。

 

18.FlinkKafkaConsumerBase

 

 1 FlinkKafkaConsumerBase
 2 
 3 final void initializeState(FunctionInitializationContext context) throws Exception {
 4    OperatorStateStore stateStore = context.getOperatorStateStore();
 5    ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
 6     stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 7    this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
 8          OFFSETS_STATE_NAME,
 9          TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
10 
11 ……
12 
13 final void snapshotState(FunctionSnapshotContext context){
14 
15       ……
16 
17       unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
18       ……

作為source和operator state的示例。

19.ElasticsearchSinkBase

 

 1 abstract class ElasticsearchSinkBase
 2 
 3 @Override
 4 public void initializeState(FunctionInitializationContext context) throws Exception {
 5    // no initialization needed
 6 }
 7 
 8 @Override
 9 public void snapshotState(FunctionSnapshotContext context) throws Exception {
10    checkErrorAndRethrow();
11 
12    if (flushOnCheckpoint) {
13       do {
14          bulkProcessor.flush();
15          checkErrorAndRethrow();
16       } while (numPendingRequests.get() != 0);
17    }
18 }

In all the subclass of this, no one override these two method.

作為sink和operatorstate的實例。

六、恢復

20.Restore

20.1 Introduction

 

 

 無狀態的重分布,直接數據重分布就可。有了狀態,就需要先把狀態存下來,然后再拆分,以一定的策略來重分布。

20.2 OperatorState

 

目前flink官方只實現了如下的重分布方案。

RoundRobinOperatorStateRepartitioner

20.3 KeyedState

20.3.1 key distribution

hash(key) mod parallelism

對keyedState而言,只是跟隨key的分布即可。但是為了提高效率,引入了KeyGroup的概念。

20.3.2 KeyGroup

20.3.2.1 Introduce of KeyGroup

Without KeyGroup, the keys in the subtask are wrote sequentially, which is not easy to rescale on parallelism adjust. KeyGroup may have a range of keys, and can be assigned to subtask. Then when checkpointing, keys within the KeyGroup will be wrote together, when rescaling, KeyState of the keys within the same KeyGroup will be read sequeatially. The number of KeyGroup is the upper limit for parallelism, and the number of KeyGroup must be determined before the job is started and cannot be changed after the fact.

 

20.3.2.2 Determine of KeyGroup

setMaxParallelism,the lower limit is 0<, and the upper limit is <=32768.

KeyGroup的數量和maxParallelism的值是一致的。

七、其他

21.Misc

1.能否在非keyby的語句后面直接接一個RichFunction來使用KeyedState?

在構造StreamGraph的過程中,會判斷當前的transform是否有keySelector,如果有,就會在streamNode上設置keySerializer。

然后在Operator的初始化過程中,會判斷是否有KeySerializer,如果有,才會生成KeyedStateBackend。

后續利用KeyedstateBackend來生成相應的KeyedState。

 

如果沒有keyby,直接實現一個RichMapFunction,則可以判斷出沒有KeyedStateBackend,在運行時會拋出異常。

 

2.究竟KeyedState中的ListState和OperatorState中的ListState是不是一回事?

首先來看ListState是個啥

public interface ListState<T> extends MergingState<T, Iterable<T>> {}

顯然它只是一個空接口,用命名的方式來增加一種約束說明。下面是它的繼承圖。

 

可以看到最初的基類以及中間的父類,分布都通過命名的方式來增加約束,其中State只定義了clear方法,AppendingState定義了get和add方法,MergingState的意義和ListState的類似。

然后我們看DefaultOperatorStateBackend中定義了生成state的接口,

1 <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)

的確,它返回的是一個ListState,但別忘了,這只是一個接口,實際返回是什么了?是PartitionableListState<S>,那就來看看他的繼承關系:

 

可以看到,他實現了ListState這個接口,具體的代碼也比較簡單,內部以一個ArrayList來存儲泛型S類型的State。

 

好,回過頭來,我們看看KeyedState的邏輯,看看最外面的接口KeyedStateStore的聲明方式:

1 @PublicEvolving
2 public interface KeyedStateStore {
3 @PublicEvolving
4 <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

看到這里,我們看到,聲明的出參和OperatorState的是一致,可是我們也知道這個只是個空接口,實際如何了?

還得回到HeapKeyedStateBackend來看下,

@Override
public <N, T> InternalListState<N, T> createListState(
      TypeSerializer<N> namespaceSerializer,
      ListStateDescriptor<T> stateDesc) throws Exception {
……
   return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}

中間部分我們都略去,看到這里其實變了,實際函數的出參是InternalListState,可以理解它是ListState的一個子類,但最終返回的是一個HeapListState,同樣,來看看它的繼承圖:

 

 從這個圖上也能看到,HeapListState實現了InternalListState進而間接實現了ListState,但其實這兩個接口都是空接口,都只是一種聲明,沒有任何的動作或者方法包含在里面。

 所以,回到問題上,KeyedState的ListState和OperatorState的ListState是一回事嗎?

還是不好回答,從語法上來講,的確是一回事,因為就是同一個類型啊;可是在實際運行當中,前面也看到了,還是有很大不同的兩個類。

那肯定又有人問了,PartitionableListState和HeapListState有什么區別?如果直接回答,一個是用在OperatorState中的,一個是用在KeyedState中,估計你肯定不滿意。孔子曰,神經病。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM