Slot和TaskManager
- 首先Flink中每个真正执行任务的taskManager都是一个JVM进程,其在多线程环境中执行一个或者多个子任务,执行的任务可以看成一个线程,线程所占据的资源可以看做是slot。
- 那么为了控制一个JVM同时能运行的任务数量,flink引入了task slot的概念,每一个slot能独立执行某个任务。
- 每一个task solt代表了taskManager资源的一个子集,比如,一个拥有3个solt的TaskManager,每一个solt可以使用 1/3 taskManager所管理的内存。
- 进行资源分割意味着为子任务保留足够的内存,从而避免与其他子任务进行竞争。注意:当前solt还不能分割cpu资源,仅仅对当前taskManager的内存进行了分割。
- slots控制着静态的并行执行任务的能力,即使slot的数量非常多,但可以不用,主要还是看动态的并行度设置的多少
- 一般来说动态的并行度要小于可用的静态的slot个数
并行度 Parallelism
- 一个特定算子的子任务的个数被称之为该算子的并行度,并且在代码中可以显示的指定该算子的并行度
- 一个数据流的并行度,就是其所有算子中最大的并行度
- 怎么设置并行度:
- Flink集群配置文件 flink-conf.yaml parallelism.default:1 优先级最低
- Flink Client(提交任务时设置) -p 1 优先级其次
- 代码设置全局并行度 env.setParallelism(2) 也可以单个算子进行设置 优先级最高
Flink是怎么实现并行计算?
- 就是对算子设置并行度即可,在不同slot之间是并行计算的
对于并行的任务,需要占用多少slot?
- 当前任务的每一个并行的子任务都需要占用一个slot,那么所需的最少的slot个数为:不同slot组的最大任务并行度之和
Slot共享
- Flink允许不同的子任务共享一个slot
- 对于同一任务的子任务,需要分配到不同的slot上,但对于有先后顺序的不同任务的子任务,slot允许进行子任务共享
- slot共享更有益于资源的合理利用,不会造成资源的浪费,减少网络IO,并行子任务分配到slot上是随机的(默认随机,可以自己设置)
一个流处理任务需要多少个Slot
- 在Flink程序中,算子的最大并行度为需要的slots数量。只要有这么多slots,任务就能跑起来。
数据传输形式
由于不同算子,可能具有不同的并行度。算子之前的传输形式可以是 one-to-one(forwarding) 也可以是 redistributing的模式
- one-to-one:Stream维护着分区以及元素的顺序。比如map和filter 如果这两个算子的并行度是2的话,那么他们之间便是one-to-one一一对应的关系,数据之间不存在重分区操作 [类似于spark中的窄依赖]
- 如果map和filter的并行度不同,也会产生redistributing,Flink内部会采取重分区的操作,默认是采用轮询的方式来进行。
- map、filter、flatmap等算子应该都是one-to-one对应关系
- redistributing:涉及到的前后算子之间需要做重分区,Stream的分区会发生改变。 [redistributing类似于在spark中 前后算子是宽依赖]
- 每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。
- 例如keyby算子基于hashcode进行重分区
合并任务
- 前后算子并行度相同,且是one-to-one Flink将这样的算子连接起来形成一个task
- 这种技术被称之为任务链的优化技术,可以减少本地通信的开销,序列化与反序列化开销
Flink 任务(task) 总数
- 首先 所有的Flink程序都是由三部分组成的 source(数据来源) transformation(转换操作,数据加工) sink(数据输出) ,task总数 = (合并之后的task个数 * 合并之后task的并行度)之和
独享slot
- 某个算子因其逻辑太复杂了,非常耗费CPU,只能拿到一个slot里面执行,那么需要把该算子单独拿出来到一个slot里面,而不进行合并
- .filter.disableChaining() 表示当前filter算子不会和前后算子进行合并操作,会导致任务数量变多 但由于slot共享的限制,还不能做到独享slot
- 不是因为某个算子太复杂,而是因为任务数量过多,任务链太复杂,需要从中间进行拆分 一分为二 重新开始一个新的任务链
- .map.startNewChain() 表示从map开始生成一个新的任务链
- 以上两种方式,任务不进行合并或是生成一个新的任务链并没有实际意义,因为切分之后还是有可能会造成slot共享,所以拆开就变得没有意义了
- .flatMap.slotSharingGroup("a") 从当前的flatMap算子开始,往后的算子 都共享在slot组为a的slot里面,和前面的算子的slot组就分开了,然后不同slot组的任务,一定会被分配到不同的slot里面 。slotSharingGroup("default") 默认值 不设置的话默认就在default这个slot组
- 全局切断,env.disableOperatorChaining() 表示所有算子都不会进行合并操作
- 所以slot数量为 不同slot组的最大并行度之和