[源碼分析] 從實例和源碼入手看 Flink 之廣播 Broadcast
0x00 摘要
本文將通過源碼分析和實例講解,帶領大家熟悉Flink的廣播變量機制。
0x01 業務需求
1. 場景需求
對黑名單中的IP進行檢測過濾。IP黑名單的內容會隨時增減,因此是可以隨時動態配置的。
該黑名單假設存在mysql中,Flink作業啟動時候會把這個黑名單從mysql載入,作為一個變量由Flink算子使用。
2. 問題
我們不想重啟作業以便重新獲取這個變量。所以就需要一個能夠動態修改算子里變量的方法。
3. 解決方案
使用廣播的方式去解決。去做配置的動態更新。
廣播和普通的流數據不同的是:廣播流的1條流數據能夠被算子的所有分區所處理,而數據流的1條流數據只能夠被算子的某一分區處理。因此廣播流的特點也決定適合做配置的動態更新。
0x02 概述
廣播這部分有三個難點:使用步驟;如何自定義函數;如何存取狀態。下面就先為大家概述下。
1. broadcast的使用步驟
- 建立MapStateDescriptor
- 通過DataStream.broadcast方法返回廣播數據流BroadcastStream
- 通過DataStream.connect方法,把業務數據流和BroadcastStream進行連接,返回BroadcastConnectedStream
- 通過BroadcastConnectedStream.process方法分別進行processElement及processBroadcastElement處理
2. 用戶自定義處理函數
- BroadcastConnectedStream.process接收兩種類型的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction
- 兩種類型的function都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,默認是空操作,允許子類重寫
- processElement處理業務數據流
- processBroadcastElement處理廣播數據流
3. Broadcast State
- Broadcast State始終表示為MapState,即map format。這是Flink提供的最通用的狀態原語。是托管狀態的一種,托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
- 用戶必須創建一個
MapStateDescriptor
,才能得到對應的狀態句柄。 這保存了狀態名稱, 狀態所持有值的類型,並且可能包含用戶指定的函數 - checkpoint的時候也會checkpoint broadcast state
- Broadcast State只在內存有,沒有RocksDB state backend
- Flink 會將state廣播到每個task,注意該state並不會跨task傳播,對其修改僅僅是作用在其所在的task
- downstream tasks接收到broadcast event的順序可能不一樣,所以依賴其到達順序來處理element的時候要小心
0x03. 示例代碼
1. 示例代碼
我們直接從Flink源碼入手可以找到理想的示例。 以下代碼直接摘錄 Flink 源碼 StatefulJobWBroadcastStateMigrationITCase,我會在里面加上注釋說明。
@Test
def testRestoreSavepointWithBroadcast(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 以下兩個變量是為了確定廣播流發出的數據類型,廣播流可以同時發出多種類型的數據
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
env.setStateBackend(new MemoryStateBackend)
env.enableCheckpointing(500)
env.setParallelism(4)
env.setMaxParallelism(4)
// 數據流,這里數據流和廣播流的Source都是同一種CheckpointedSource。數據流這里做了一系列算子操作,比如flatMap
val stream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
.flatMap(new StatefulFlatMapper)
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
// 廣播流
val broadcastStream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
.broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
// 把數據流和廣播流結合起來
stream
.connect(broadcastStream)
.process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
.addSink(new AccumulatorCountingSink)
}
}
// 用戶自定義的處理函數
class TestBroadcastProcessFunction
extends KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)] {
// 重點說明,這里的 firstBroadcastStateDesc,secondBroadcastStateDesc 其實和之前廣播流的那兩個MapStateDescriptor無關。
// 這里兩個MapStateDescriptor是為了存取BroadcastState,這樣在 processBroadcastElement和processElement之間就可以傳遞變量了。我們完全可以定義新的MapStateDescriptor,只要processBroadcastElement和processElement之間認可就行。
// 這里參數 "broadcast-state-1" 是name, flink就是用這個 name 來從Flink運行時系統中存取MapStateDescriptor
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
override def processElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext,
out: Collector[(Long, Long)]): Unit = {
// 這里Flink源碼中是直接把接受到的業務變量直接再次轉發出去
out.collect(value)
}
override def processBroadcastElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#Context,
out: Collector[(Long, Long)]): Unit = {
// 這里是把最新傳來的廣播變量存儲起來,processElement中可以取出再次使用. 具體是通過firstBroadcastStateDesc 的 name 來獲取 BroadcastState
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString)
}
}
// 廣播流和數據流的Source
private class CheckpointedSource(val numElements: Int)
extends SourceFunction[(Long, Long)] with CheckpointedFunction {
private var isRunning = true
private var state: ListState[CustomCaseClass] = _
// 就是簡單的定期發送
override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
ctx.emitWatermark(new Watermark(0))
ctx.getCheckpointLock synchronized {
var i = 0
while (i < numElements) {
ctx.collect(i, i)
i += 1
}
}
// don't emit a final watermark so that we don't trigger the registered event-time
// timers
while (isRunning) Thread.sleep(20)
}
}
2. 技術難點
MapStateDescriptor
首先要說明一些概念:
- Flink中包含兩種基礎的狀態:Keyed State和Operator State。
- Keyed State和Operator State又可以 以兩種形式存在:原始狀態和托管狀態。
- 托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
- raw state即原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
- MapState是托管狀態的一種:即狀態值為一個map。用戶通過
put
或putAll
方法添加元素。
回到我們的例子,廣播變量就是OperatorState的一部分,是以托管狀態的MapState形式保存的。具體getBroadcastState函數就是DefaultOperatorStateBackend中的實現
所以我們需要用MapStateDescriptor描述broadcast state,這里MapStateDescriptor的使用比較靈活,因為是key,value類似使用,所以個人覺得value直接使用類,這樣更方便,尤其是對於從其他語言轉到scala的同學。
processBroadcastElement
// 因為主要起到控制作用,所以這個函數的處理相對簡單
override def processBroadcastElement(): Unit = {
// 這里可以把最新傳來的廣播變量存儲起來,processElement中可以取出再次使用,比如
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
}
processElement
// 這個函數需要和processBroadcastElement配合起來使用
override def processElement(): Unit = {
// 可以取出processBroadcastElement之前存儲的廣播變量,然后用此來處理業務變量,比如
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
var actualSecondState = Map[String, String]()
for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) {
val v = secondExpectedBroadcastState.get(entry.getKey).get
actualSecondState += (entry.getKey -> entry.getValue)
}
// 甚至這里只要和processBroadcastElement一起關聯好,可以存儲任意類型的變量。不必須要和廣播變量的類型一致。重點是聲明新的對應的MapStateDescriptor
// MapStateDescriptor繼承了StateDescriptor,其中state為MapState類型,value為Map類型
}
結合起來使用
因為某些限制,所以下面只能從網上找一個例子給大家講講。
// 模式始終存儲在MapState中,並將null作為鍵。broadcast state始終表示為MapState,這是Flink提供的最通用的狀態原語。
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
// 能看到的是,在處理廣播變量時候,存儲廣播變量到BroadcastState
public void processBroadcastElement(Pattern pattern, Context ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
// 能看到的是,在處理業務變量時候,從BroadcastState取出廣播變量,存取時候實際都是用"patterns"這個name字符串來作為key。
public void processElement(Action action, ReadOnlyContext ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
0x04. Flink 源碼解析
1. 廣播的邏輯流程
* The life cycle of the Broadcast:
* {@code
* -- 初始化邏輯 -> 用一個BroadcastConnectedStream把數據流和廣播流結合起來進行拓撲轉換
* |
* +----> businessStream = DataStream.filter.map....
* | // 處理業務邏輯的數據流,businessStream 是普通DataStream
* +----> broadcastStream = DataStream.broadcast(broadcastStateDesc)
* | // 處理配置邏輯的廣播數據流,broadcastStream是BroadcastStream類型
* +----> businessStream.connect(broadcastStream)
* | .process(new processFunction(broadcastStateDesc))
* | // 把業務流,廣播流 結合起來,生成一個BroadcastConnectedStream,然后進行 process
* +----------> process @ BroadcastConnectedStream
* | TwoInputStreamOperator<IN1, IN2, OUT> operator =
* | new CoBroadcastWithNonKeyedOperator<>(clean(function),
* | broadcastStateDescriptors);
* | return transform(outTypeInfo, operator);
* | // 生成一個類型是TwoInputStreamOperator 的 operator,進行 transform
* +----------------> transform @ BroadcastConnectedStream
* | transform = new TwoInputTransformation<>(
* | inputStream1.getTransformation(), // 業務流
* | inputStream2.getTransformation(), // 廣播流
* | ifunctionName, // 用戶的UDF
* | operator, // 算子 CoBroadcastWithNonKeyedOperator
* | outTypeInfo); // 輸出類型
* | returnStream = new SingleOutputStreamOperator(transform);
* | getExecutionEnvironment().addOperator(transform)
* | // 將業務流,廣播流與拓撲聯合起來形成一個轉換,加到 Env 中,這就完成了拓撲轉換
* | // 最后返回結果是一個SingleOutputStreamOperator。
* }
* 數據結構:
* -- BroadcastStream.
* 就是簡單封裝一個DataStream,然后記錄這個廣播流對應的StateDescriptors
public class BroadcastStream<T> {
private final StreamExecutionEnvironment environment;
private final DataStream<T> inputStream;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
}
* 數據結構:
* -- BroadcastConnectedStream.
* 把業務流,廣播流 結合起來,然后會生成算子和拓撲
public class BroadcastConnectedStream<IN1, IN2> {
private final StreamExecutionEnvironment environment;
private final DataStream<IN1> inputStream1;
private final BroadcastStream<IN2> inputStream2;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
}
* 真實計算:
* -- CoBroadcastWithNonKeyedOperator -> 真正對BroadcastProcessFunction的執行,是在這里完成的
public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
private transient TimestampedCollector<OUT> collector;
private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
private transient ReadWriteContextImpl rwContext;
private transient ReadOnlyContextImpl rContext;
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
rContext.setElement(element);
// 當上游有最新業務數據來的時候,調用用戶自定義的processElement
// 在這可以把之前存儲的廣播配置信息取出,然后對業務數據流進行處理
userFunction.processElement(element.getValue(), rContext, collector);
rContext.setElement(null);
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
rwContext.setElement(element);
// 當上游有數據來的時候,調用用戶自定義的processBroadcastElement
// 在這可以把最新傳送的廣播配置信息存起來
userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
rwContext.setElement(null);
}
}
2. DataStream的關鍵函數
// 就是connect,broadcast,分別生成對應的數據流
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final Transformation<T> transformation;
@PublicEvolving
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
return new BroadcastConnectedStream<>(
environment,
this,
Preconditions.checkNotNull(broadcastStream),
broadcastStream.getBroadcastStateDescriptor());
}
@PublicEvolving
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}
}
3. 關鍵數據結構MapStateDescriptor
主要是用來聲明各種元數據信息。后續可以看出來,系統是通過MapStateDescriptor的name,即第一個參數來存儲 / 獲取MapStateDescriptor對應的State。
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keySerializer The type serializer for the keys in the state.
* @param valueSerializer The type serializer for the values in the state.
*/
public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
}
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyTypeInfo The type information for the keys in the state.
* @param valueTypeInfo The type information for the values in the state.
*/
public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
}
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyClass The class of the type of keys in the state.
* @param valueClass The class of the type of values in the state.
*/
public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
super(name, new MapTypeInfo<>(keyClass, valueClass), null);
}
}
4. 狀態存取
在processBroadcastElement和processElement之間傳遞的狀態,是通過MapStateDescriptor的name為key,來存儲在Flink中。即類似調用如下ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
。所以我們接下來需要介紹下Flink的State概念。
State vs checkpoint
首先區分一下兩個概念,state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。Flink通過定期地做checkpoint來實現容錯和恢復。
Flink中包含兩種基礎的狀態:Keyed State和Operator State。
Keyed State
顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。
Operator State
與Keyed State不同,Operator State跟一個特定operator的一個並發實例綁定,整個operator只對應一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應多個keyed state。
舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。
原始狀態和Flink托管狀態 (Raw and Managed State)
這又是另外一個維度。
Keyed State 和 Operator State 分別有兩種存在形式:managed and raw, 即原始狀態和托管狀態。
托管狀態是由 Flink框架運行時 管理的狀態,比如內部的 hash table 或者 RocksDB。 比如 “ValueState”, “ListState” 等。Flink runtime 會對這些狀態進行編碼並寫入 checkpoint。
比如managed keyed state 接口提供不同類型狀態的訪問接口,這些狀態都作用於當前輸入數據的 key 下。換句話說,這些狀態僅可在 KeyedStream
上使用,可以通過 stream.keyBy(...)
得到 KeyedStream
。而我們可以通過實現 CheckpointedFunction
或 ListCheckpointed
接口來使用 managed operator state。
Raw state即原始狀態,由用戶自行管理狀態具體的數據結構,保存在算子自己的數據結構中。checkpoint 的時候,Flink 並不知曉具體的內容,僅僅寫入一串字節序列到 checkpoint。
通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。
回到我們的例子,廣播變量就是OperatorState的一部分,是以托管狀態的MapState形式保存的。具體getBroadcastState函數就是DefaultOperatorStateBackend中的實現。
StateDescriptor
你必須創建一個 StateDescriptor
,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,並且它們必須具有唯一的名稱以便可以引用它們), 狀態所持有值的類型,並且可能包含用戶指定的函數,例如ReduceFunction
。 根據不同的狀態類型,可以創建ValueStateDescriptor
,ListStateDescriptor
, ReducingStateDescriptor
,FoldingStateDescriptor
或 MapStateDescriptor
。
狀態通過 RuntimeContext
進行訪問,因此只能在 rich functions 中使用。
OperatorStateBackEnd
OperatorStateBackEnd 主要管理OperatorState. 目前只有一種實現: DefaultOperatorStateBackend。
DefaultOperatorStateBackend
DefaultOperatorStateBackend狀態是以Map方式來存儲的。其構造出一個 PartitionableListState (屬於ListState)。OperatorState都保存在內存中。
public class DefaultOperatorStateBackend implements OperatorStateBackend {
/**
* Map for all registered operator states. Maps state name -> state
*/
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
/**
* Map for all registered operator broadcast states. Maps state name -> state
*/
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
/**
* Cache of already accessed states.
*
* <p>In contrast to {@link #registeredOperatorStates} which may be repopulated
* with restored state, this map is always empty at the beginning.
*
* <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
*/
private final Map<String, PartitionableListState<?>> accessedStatesByName;
private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName; // 這里用來緩存廣播變量
// 這里就是前文中所說的,存取廣播變量的API
public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
// 如果之前有,就取出來
BackendWritableBroadcastState<K, V> previous =
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(
name);
if (previous != null) {
return previous;
}
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
BackendWritableBroadcastState<K, V> broadcastState =
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
broadcastStateValueSerializer));
registeredBroadcastStates.put(name, broadcastState);
} else {
// has restored state; check compatibility of new state access
RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
// check whether new serializers are incompatible
TypeSerializerSchemaCompatibility<K> keyCompatibility =
restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
TypeSerializerSchemaCompatibility<V> valueCompatibility =
restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
}
accessedBroadcastStatesByName.put(name, broadcastState); // 如果之前沒有,就存入
return broadcastState;
}
}
0x05. 參考
Flink原理與實現:詳解Flink中的狀態管理 https://yq.aliyun.com/articles/225623
Flink使用廣播實現配置動態更新 https://www.jianshu.com/p/c8c99f613f10
Flink Broadcast State實用指南 https://blog.csdn.net/u010942041/article/details/93901918
聊聊flink的Broadcast State https://www.jianshu.com/p/d6576ae67eae
Working with State https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/stream/state/state.html