flink任务并行


Slot和TaskManager

  • 首先Flink中每个真正执行任务的taskManager都是一个JVM进程,其在多线程环境中执行一个或者多个子任务,执行的任务可以看成一个线程,线程所占据的资源可以看做是slot。
  • 那么为了控制一个JVM同时能运行的任务数量,flink引入了task slot的概念,每一个slot能独立执行某个任务。
  • 每一个task solt代表了taskManager资源的一个子集,比如,一个拥有3个solt的TaskManager,每一个solt可以使用 1/3 taskManager所管理的内存。
  • 进行资源分割意味着为子任务保留足够的内存,从而避免与其他子任务进行竞争。注意:当前solt还不能分割cpu资源,仅仅对当前taskManager的内存进行了分割。
  • slots控制着静态的并行执行任务的能力,即使slot的数量非常多,但可以不用,主要还是看动态的并行度设置的多少
  • 一般来说动态的并行度要小于可用的静态的slot个数

并行度 Parallelism

  • 一个特定算子的子任务的个数被称之为该算子的并行度,并且在代码中可以显示的指定该算子的并行度
  • 一个数据流的并行度,就是其所有算子中最大的并行度
  • 怎么设置并行度:
    • Flink集群配置文件 flink-conf.yaml parallelism.default:1 优先级最低
    • Flink Client(提交任务时设置)  -p 1 优先级其次
    • 代码设置全局并行度 env.setParallelism(2) 也可以单个算子进行设置 优先级最高

Flink是怎么实现并行计算?

  • 就是对算子设置并行度即可,在不同slot之间是并行计算的

对于并行的任务,需要占用多少slot?

  • 当前任务的每一个并行的子任务都需要占用一个slot,那么所需的最少的slot个数为:不同slot组的最大任务并行度之和

Slot共享

  • Flink允许不同的子任务共享一个slot
  • 对于同一任务的子任务,需要分配到不同的slot上,但对于有先后顺序的不同任务的子任务,slot允许进行子任务共享
  • slot共享更有益于资源的合理利用,不会造成资源的浪费,减少网络IO,并行子任务分配到slot上是随机的(默认随机,可以自己设置)

一个流处理任务需要多少个Slot

  • 在Flink程序中,算子的最大并行度为需要的slots数量。只要有这么多slots,任务就能跑起来。

数据传输形式

由于不同算子,可能具有不同的并行度。算子之前的传输形式可以是 one-to-one(forwarding) 也可以是 redistributing的模式
  • one-to-one:Stream维护着分区以及元素的顺序。比如map和filter 如果这两个算子的并行度是2的话,那么他们之间便是one-to-one一一对应的关系,数据之间不存在重分区操作 [类似于spark中的窄依赖]
    • 如果map和filter的并行度不同,也会产生redistributing,Flink内部会采取重分区的操作,默认是采用轮询的方式来进行。
    • map、filter、flatmap等算子应该都是one-to-one对应关系
  • redistributing:涉及到的前后算子之间需要做重分区,Stream的分区会发生改变。 [redistributing类似于在spark中 前后算子是宽依赖]
    • 每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。
    • 例如keyby算子基于hashcode进行重分区

合并任务

  • 前后算子并行度相同,且是one-to-one Flink将这样的算子连接起来形成一个task
  • 这种技术被称之为任务链的优化技术,可以减少本地通信的开销,序列化与反序列化开销

Flink 任务(task) 总数

  • 首先 所有的Flink程序都是由三部分组成的 source(数据来源) transformation(转换操作,数据加工) sink(数据输出) ,task总数 = (合并之后的task个数 * 合并之后task的并行度)之和

独享slot

  • 某个算子因其逻辑太复杂了,非常耗费CPU,只能拿到一个slot里面执行,那么需要把该算子单独拿出来到一个slot里面,而不进行合并
    • .filter.disableChaining() 表示当前filter算子不会和前后算子进行合并操作,会导致任务数量变多 但由于slot共享的限制,还不能做到独享slot
  • 不是因为某个算子太复杂,而是因为任务数量过多,任务链太复杂,需要从中间进行拆分 一分为二 重新开始一个新的任务链
    • .map.startNewChain() 表示从map开始生成一个新的任务链
  • 以上两种方式,任务不进行合并或是生成一个新的任务链并没有实际意义,因为切分之后还是有可能会造成slot共享,所以拆开就变得没有意义了
  • .flatMap.slotSharingGroup("a") 从当前的flatMap算子开始,往后的算子 都共享在slot组为a的slot里面,和前面的算子的slot组就分开了,然后不同slot组的任务,一定会被分配到不同的slot里面 。slotSharingGroup("default") 默认值 不设置的话默认就在default这个slot组
  • 全局切断,env.disableOperatorChaining() 表示所有算子都不会进行合并操作
  • 所以slot数量为 不同slot组的最大并行度之和


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM