flink系列-1、flink介绍,反压原理


一、flink介绍

 Apache Flink是一个分布式大数据处理引擎,可对 有界数据流无界数据流进行 有状态计算。 可部署在各种集群环境,对各种大小的数据规模进行快速计算。

 

1.1、有界数据流和无界数据流 

1、 无界流有一个开始但没有定义的结束。它们不会在生成时终止并提供数据。必须持续处理无界流,即必须在摄 取事件后立即处理事件。无法等待所有输入数据到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)摄取事件。 
2、 有界流具有定义的开始和结束。可以在执行任何计算之前通过摄取所有数据来处理有界流。处理有界流不需要 有序摄取,因为可以始终对有界数据集进行排序。有界流的处理也称为批处理。
Apache Flink擅长处理无界和有界数据集。精确控制时间和状态使Flink的运行时能够在无界流上运行任何类型的应 用程序。有界流由算法和数据结构内部处理,这些算法和数据结构专门针对固定大小的数据集而设计,从而产生出 色的性能。

1.2、flink的特点

  • 支持 java、python、 scala api 
  • 流(dataStream)批(dataSet)一体化 
  • 支持事件处理和无序处理通过DataStream API,基于DataFlow数据流模型 
  • 在不同的时间语义(事件时间-数据产生的时间,摄取时间:集群获取到数据、处理时间)下支持灵活的窗口(时间,滑动、翻滚,会话,自定义触发器) 
  • 支持有状态计算的Exactly-once(仅处理一次)容错保证 
  • 支持基于轻量级分布式快照checkpoint机制实现的容错 
  • 支持savepoints 机制,一般手动触发,在升级应用或者处理历史数据是能够做到无状态丢失和最小停机时间 
  • 兼容hadoop的mapreduce,集成YARN,HDFS,Hbase 和其它hadoop生态系统的组件 
  • 支持大规模的集群模式,支持yarn、Mesos。可运行在成千上万的节点上 
  • 在dataSet(批处理)API中内置支持迭代程序 
  • 图处理(批) 机器学习(批) 复杂事件处理(流) 
  • 自动反压机制 
  • 高效的自定义内存管理 

1.3、flink的分层模型

 flink自身提供了不同级别的抽象来支持我们开发流式或者批处理程序,上图描述了Flink 支持的四种不同级别的抽象。

Stateful Stream Processing 
  • 位于最底层, 是core API 的底层实现 
  • processFunction (处理函数)
  • 利用低阶,构建一些新的组件或者算子 
  • 灵活性高,但开发比较复杂 
  • 表达性最强,可以操作状态,time等
Core API 
  • DataSet - 批处理 API 
  • DataStream –流处理 API 
  • 封装了一些算子
Table 
API
 &
 SQL
  • 
构建在Table
之上,都需要构建Table
环境 
  • 不同的类型的Table
构建不同的Table
环境 
  • Table
可以与DataStream或者DataSet进行相互转换 
  • Streaming
SQL不同于存储的SQL,最终会转化为流式执行计划 

二、finlk的反压原理 

  • 反压是分布式处理系统中经常遇到的问题,当消费者速度低于生产者的速度时,则需要消费者将信息反馈给生产者使得生产者的速度能和消费者的速度进行匹配。
  • Stom 在处理背压问题上简单粗暴,当下游消费者速度跟不上生产者的速度时会直接通知生产者,生产者停止生产数据,这种方式的缺点是不能实现逐级反压,且调优困难。设置的消费速率过小会导致集群吞吐量低下,速率过大会导致消费者 OOM。
  • Spark Streaming 为了实现反压这个功能,在原来的架构基础上构造了一个“速率控制器”,这个“速率控制器”会根据几个属性,如任务的结束时间、处理时长、处理消息的条数等计算一个速率。在实现控制数据的接收速率中用到了一个经典的算法,即“PID 算法”。
  • Flink 没有使用任何复杂的机制来解决反压问题,Flink 在数据传输过程中使用了分布式阻塞队列。我们知道在一个阻塞队列中,当队列满了以后发送者会被天然阻塞住,这种阻塞功能相当于给这个阻塞队列提供了反压的能力。

具体过程如下:

如下图所示展示了 Flink 在网络传输场景下的内存管理。网络上传输的数据会写到 Task 的 InputGate(IG)中, 经过Task的处理后,再由Task写到ResultPartition(RS)中。每个 Task 都包括了输入和输入,输入和输出的 数据存在 Buffer 中(都是字节数据)。Buffer 是 MemorySegment 的包装类。 
  1. 根据配置,Flink 会在 NetworkBufferPool 中生成一定数量(默认2048,一个32K)的内存块 MemorySegment,内存块 的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之间共 享的,每个节点(TaskManager - 跑任务的进程,类似于spark的extour)只会实例化一个。
  2. Task 线程启动时,会向 NetworkEnvironment 注册,NetworkEnvironment 会为 Task 的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 LocalBufferPool(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中 InputChannel 数量一致,RP 对应的缓冲池初始的内存 块数量与 RP 中的 ResultSubpartition 数量一致。不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计 算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。注意,这个过程只是指定了缓冲池所能使用的内存 块数量,并没有真正分配内存块,只有当需要时才分配。为什么要动态地为缓冲池扩容呢?因为内存越多,意 味着系统可以更轻松地应对瞬时压力(如GC),不会频繁地进入反压状态,所以我们要利用起那部分闲置的 内存块。 
  3. 在 Task 线程执行过程中,当 Netty 接收端收到数据时,为了将 Netty 中的数据拷贝到 Task 中, InputChannel(实际是 RemoteInputChannel)会向其对应的缓冲池申请内存块(上图中的①)。如果缓冲池 中也没有可用的内存块且已申请的数量还没到池子上限,则会向 NetworkBufferPool 申请内存块(上图中的 ②)并交给 InputChannel 填上数据(上图中的③和④)。如果缓冲池已申请的数量达到上限了呢?或者 NetworkBufferPool 也没有可用内存块了呢?这时候,Task 的 Netty Channel 会暂停读取,上游的发送端会立 即响应停止发送,拓扑会进入反压状态。当 Task 线程写数据到 ResultPartition 时,也会向缓冲池请求内存 块,如果没有可用内存块时,会阻塞在请求内存块的地方,达到暂停写入的目的。
  4. 当一个内存块被消费完成之后(在输入端是指内存块中的字节被反序列化成对象了,在输出端是指内存块中的 字节写入到 Netty Channel 了),会调用 Buffer.recycle() 方法,会将内存块还给 LocalBufferPool (上 图中的⑤)。如果LocalBufferPool中当前申请的数量超过了池子容量(由于上文提到的动态容量,由于新注 册的 Task 导致该池子容量变小),则LocalBufferPool会将该内存块回收给 NetworkBufferPool(上图中的 ⑥)。如果没超过池子容量,则会继续留在池子中,减少反复申请的开销。 

2.1、反压的过程 

下面这张图简单展示了两个 Task 之间的数据传输以及 Flink 如何感知到反压的:
 
  1.  记录“A”进入了 Flink 并且被 Task 1 处理。(这里省略了 Netty 接收、反序列化等过程)
  2. 记录被序列化到 buffer 中。 
  3. 该 buffer 被发送到 Task 2,然后 Task 2 从这个 buffer 中读出记录。 
不要忘了:记录能被 Flink 处理的前提是,必须有空闲可用的 Buffer 
结合上面两张图看:Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池1),Task 2 在输入端也有一个 相关联的 LocalBufferPool(称缓冲池2)。如果缓冲池1中有空闲可用的 buffer 来序列化记录 “A”,我们就序列化 并发送该 buffer。 
这里我们需要注意两个场景: 
  • 本地传输:如果 Task 1 和 Task 2 运行在同一个 worker 节点(TaskManager),该 buffer 可以直接交给下一 个 Task。一旦 Task 2 消费了该 buffer,则该 buffer 会被缓冲池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就会赶不上 Task 1 取 buffer 的速度,导致缓冲池1无可用的 buffer,Task 1 等待在可用的 buffer 上。最终形成 Task 1 的降速。 
  • 远程传输:如果 Task 1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。 如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制(可配置)来 保证不往网络中写入太多数据。如果网络中的数据(Netty输出缓冲中的字节数)超过了高水位值,我们会等 到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数 据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这 会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求 buffer,阻塞了 writer 往 ResultSubPartition 写数据。 
这种固定大小缓冲池就像阻塞队列一样,保证了 Flink 有一套健壮的反压机制,使得 Task 生产数据的速度不会快 于消费的速度。我们上面描述的这个方案可以从两个 Task 之间的数据传输自然地扩展到更复杂的 pipeline 中,保 证反压机制可以扩散到整个 pipeline。 

2.2、反压监控

 Flink 的实现中,只有当 Web 页面切换到某个 Job 的 Backpressure 页面,才会对这个 Job 触发反压检测,因为 反压检测还是挺昂贵的。JobManager 会通过 Akka 给每个 TaskManager 发送TriggerStackTraceSample消息。默 认情况下,TaskManager 会触发100次 stack trace 采样,每次间隔 50ms(也就是说一次反压检测至少要等待5秒 钟)。并将这 100 次采样的结果返回给 JobManager,由 JobManager 来计算反压比率(反压出现的次数/采样的 次数),最终展现在 UI 上。UI 刷新的默认周期是一分钟,目的是不对 TaskManager 造成太大的负担。 
 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM