flink中的计算分为有状态计算和无状态计算,
无状态计算每次只转换一条输入记录,并且只根据最新的输入记录输出结果。
有状态计算维护所有已处理的记录的状态值,并根据每条新输入的记录更新状态,因此输出记录反应的是综合考虑多个事件之后的结果。
无状态在这里不在赘述,这里来记录一下有状态计算。
flink中的状态分为算子状态(operatior state),键控状态(keyed state),状态后端(state backends)
1.算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。(也就是一个task下面的subtask共享这个状态)。
Flink 为算子状态提供三种基本数据结构:
⚫ 列表状态(List state)
将状态表示为一组数据的列表。
⚫ 联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保
存点(savepoint)启动应用程序时如何恢复。
⚫ 广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应
用广播状态。
键控状态的作用范围
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护
一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护
和处理这个 key 对应的状态。(一般在keyby之后,keyby算子把经过hash得到的相同的key放到一个分区中,也就是内存中的一块管道处,让下游对应的subtask来拉取)
也就是每个key 都持有着对应的一个map结构
Flink 的 Keyed State 支持以下数据类型:
⚫ ValueState<T>保存单个的值,值的类型为 T。
o get 操作: ValueState.value()
o set 操作: ValueState.update(T value)
⚫ ListState<T>保存一个列表,列表里的元素的数据类型为 T。基本操作如下:
o ListState.add(T value)
o ListState.addAll(List<T> values)
o ListState.get()返回 Iterable<T>
o ListState.update(List<T> values)
⚫ MapState<K, V>保存 Key-Value 对。
o MapState.get(UK key)
o MapState.put(UK key, UV value)
o MapState.contains(UK key)
o MapState.remove(UK key)
⚫ ReducingState<T>
⚫ AggregatingState<I, O>
下面来看一段典型的状态的代码(reduce方法底层调用的)
open方法初始化拿到历史状态(定义一个状态描述器,然后根据状态描述器拿到对应Key的状态,注意这里是不出现数据的key的,只是去管道拿对应的内存块,也就是对应的map数据结构)
processElement方法更新历史状态并存储起来。
要知道即使是经过hash之后,得到的hashkey的分区,中间也是有不同的数据key 的,
比如 spark,1和hive,1现在被分到了0号分区,那么现在再来一条spark,1 。我们是把它加到之前的spark,1还是hive,1进行累加计数呢。所以在拿到对应hashkey的内存数据结构之后,还要判断到底是哪个组。现在主要来看一下
IN currentValue = this.values.value();
中的value方法
现在我们就看到了,在调用value方法,实际上在调用一个get方法,之所以没有在一开始选定key,是因为底层会根据这次新进来的数据拿到对应的key和分区内组号,这样就能唯一锁定对应key的历史状态了。至于更新历史状态update也是同样的道理,这里就不在赘述。
最后贴几张图片很典型的keystate代码段
valuestate
spark,1
hive,1
spark,1 计算相同key的累加次数
Mapstate
辽宁 梧州 1000
辽宁 松花江 2000
东北 大连 1000
辽宁 梧州 1000 最前面的Key进行keyby分区 ,第二个字段作为map结构的key,拿到累加的第三个字段
LIstState
A LIst(a,b,c,c,c,d)
B list(a,d,s,v,x,c)
A,B是keyby 的关键字,后面分别跟对应的行为序列list
状态的逻辑图
状态的物理图