Flink 流量控制与反压机制完全总结

star2017 1年前 ⋅ 1034 阅读

来源: Flink

前言

笔者最近回顾自己对 Flink 技术栈细节的理解,发现对 Flink 的网络栈、流控与反压这一套机制存在比较大的盲区。虽然平时多次处理过作业反压的问题,但是不完全理解背后的实现显然说不过去。于是专门写一篇总结,站在大佬们的肩膀上彻底搞清楚 Flink 是怎么做流控与处理反压的。

Flink 网络传输的数据流向

Flink 网络传输的数据流向如下图所示。

Sender 在发送数据时,首先写入 TaskManager 内部的网络缓存,利用 Netty 进行传输——将待发送的数据存入 Netty 的 ChannelOutboundBuffer,再经由 Socket 的发送缓存发送出去。Receiver 在接收数据时是反过来的,同样要经过 3 层缓存,即 Socket 接收缓存 →Netty ChannelInboundBuffer→TaskManager 网络缓存。要实现流量控制,就是在上面的流程上做文章。

Flink 的反压传播

反压(back pressure)就是流式系统中关于处理能力的动态反馈机制,并且是从下游到上游的反馈。下图示出数据流在 Flink TaskManager 之间流动的逻辑。可见,一旦因为下游处理能力不足而出现反压,反压信号的传播应该分为两个阶段:一是从下游 TaskManager 的输入端(InputGate)传播到直接上游 TaskManager 的输出端(ResultPartition);二是在 TaskManager 内部从输出端传播到输入端。当然,我们要重点考虑的是跨 TaskManager 的反压传播,因为它的链路比较长(参考上一节的数据流向图),更有可能成为瓶颈。

下面先来介绍旧版本中的流控和反压机制。

Flink 1.5 之前:基于 TCP 的流控和反压 在 1.5 版本之前,Flink 并没有特别地去实现自己的流控机制,而是在传输层直接依靠 TCP 协议自身具备的滑动窗口机制(大学计算机网络课程必讲)。下面通过实例来复习 TCP 滑动窗口是如何实现流控的。

  1. 初始情况如下图所示。Sender 每单位时间发送 3 个包,发送窗口初始大小为 3;Receiver 每单位时间接收 1 个包,接收窗口初始大小为 5(与接收缓存的大小相同)。

  1. Sender 发送 1~3 三个包,Receiver 接收到之后将它们放入缓存。
  2. Receiver 消费一个包,接收窗口向前滑动一格,并告知 Sender ACK=4(表示可以从第 4 个包开始发送),以及 Window=3(表示接收窗口当前的空余量为 3)。
  3. Sender 接收到 ACK 消息后发送 4~6 三个包,Receiver 接收到之后将它们放入缓存。
  4. Receiver 消费一个包,接收窗口向前滑动一格,并告知 Sender ACK=7(表示可以从第 7 个包开始发送),以及 Window=1(表示接收窗口当前的空余量为 1)。Sender 接收到 ACK 消息之后,发现 Receiver 只能再接收 1 个包了,就将发送窗口的大小调整为 1 并发送包 7,达到了限流的目的。

接着这个流程分析下去,可以得知 Sender 最终会无法发送数据(因为 Receiver 报告 Window=0),直到 Receiver 消费掉缓存中的数据才能继续发送。同时 Sender 还会定时向 Receiver 发送 ZeroWindowProbe 探测消息,保证 Receiver 能够及时将消费能力报告给 Sender。

接下来用实例介绍反压流程。

  1. 如图所示,Sender 发送速度与 Receiver 接收速度的比是 2:1,起初是可以正常发送与接收的。



  1. 一段时间过后,Receiver 端 InputChannel 本身的缓存被耗尽,因此会向本地缓存池 LocalBufferPool 申请新的缓存。

  1. 一段时间过后,LocalBufferPool 的可用额度会被耗尽,因此会向网络缓存池 NetworkBufferPool 申请新的缓存。

  1. 随着数据不断积压,NetworkBufferPool 的额度也会被耗尽,此时没有空间再接收新的数据,Netty 的 auto read 会被关闭,不再从 Socket 缓存读取数据。

  1. Socket 缓存耗尽后,Receiver 报告 Window=0(参见上文的滑动窗口),Sender 的 Socket 就会停止发送数据。

  1. Sender 端的 Socket 缓存积压,导致 Netty 无法再发送数据。

  1. 待发送的数据都积压在 Sender 的 ChannelOutboundBuffer 中,当数据量超过 Netty 的 high watermark 之后,Channel 被置为不可写,ResultSubPartition 也就不再向 Netty 写数据。

  1. Sender 端的 ResultSubPartition 缓存满了之后,就会像 Receiver 端的 InputChannel 一样,不断地向 LocalBufferPool 和 NetworkBufferPool 申请新的缓存,直到缓存全部耗尽,RecordWriter 不能再写数据。

这样,我们就实现了反压向上游 TaskManager 的传递。

Flink 1.5 之后:基于 Credit 的流控和反压

基于 TCP 的流控和反压方案有两大缺点:

  • 只要 TaskManager 执行的一个 Task 触发反压,该 TaskManager 与上游 TaskManager 的 Socket 就不能再传输数据,从而影响到所有其他正常的 Task,以及 Checkpoint Barrier 的流动,可能造成作业雪崩;
  • 反压的传播链路太长,且需要耗尽所有网络缓存之后才能有效触发,延迟比较大。

Flink 1.5+ 版本为了解决这两个问题,引入了基于 Credit 的流控和反压机制。它本质上是将 TCP 的流控机制从传输层提升到了应用层——即 ResultPartition 和 InputGate 的层级,从而避免在传输层造成阻塞。具体来讲:

  • Sender 端的 ResultSubPartition 会统计累积的消息量(以缓存个数计),以 backlog size 的形式通知到 Receiver 端的 InputChannel;
  • Receiver 端 InputChannel 会计算有多少空间能够接收消息(同样以缓存个数计),以 credit 的形式通知到 Sender 端的 ResultSubPartition。

也就是说,Sender 和 Receiver 通过互相告知对方自己的处理能力的方式来精准地进行流控(注意 backlog size 和 credit 也是要通过传输层的,不是直接交换的)。接下来仍然通过实例来说明基于 Credit 的流控和反压流程。

  1. 仍然是 Sender 发送速度与 Receiver 接收速度的比是 2:1 的情景。Sender 端的 ResultSubPartition 积压了 2 个缓存的数据,因此会将该批次要发送的数据与 backlog size = 2 一同发往 Receiver。Receiver 收到当前批数据和 backlog size 之后,会计算 InputChannel 是否有足够的缓存来接收下一批数据,如果不够,则会去 LocalBufferPool/NetworkBufferPool 申请缓存,并将 credit = 3 通知到上游的 ResultSubPartition,表示自己能够接收 3 个缓存的消息。

  1. 随着 Receiver 端的数据不断积压,网络缓存最终被耗尽,因此会反馈给上游 credit = 0(相当于 TCP 滑动窗口中的 window = 0),Sender 端 ResultPartition 到 Netty 的链路会被阻断。按照上一节所述的流程,Sender 端的网络缓存会被更快地耗尽,RecordWriter 不能再写数据,从而达到反压的效果。

由上可知,反压信号在 TaskManager 之间不需要再通过传输层随着数据向上反馈,大大降低了反压的延迟。并且也不会因为一个 Task 反压而阻塞整个 Socket 链路,能够相当精确地在 Task 粒度控制流量,不仅轻量级,而且高效。


本文地址:https://www.6aiq.com/article/1604530106450
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: