Cloud Shuffle Service 在字节跳动 Spark 场景的应用实践
本文整理自字节跳动基础架构的大数据开发工程师魏中佳在 ApacheCon Aisa 2022 「大数据」议题下的演讲,主要介绍 Cloud Shuffle Service(CSS) 在字节跳动 Spark 场景下的设计与实现。
背景介绍
在大数据场景下,数据 Shuffle 表示了不同分区数据交换的过程,Shuffle 的性能往往会成为作业甚至整个集群的性能瓶颈。特别是在字节跳动每日上百 PB Shuffle 数据的场景下,Shuffle 过程暴露出来了很多问题,本文会逐个展开此类问题并介绍在字节跳动的优化实践。
External Shuffle Service
首先来看,在 Spark 3.0 及最新的 Spark 3.3 中,External Shuffle Service(以下简称 ESS)是如何完成 Shuffle 任务的?
如下图,每一个 Map Task,从 Mapper 1 到 Mapper M 都会在本地生成属于自己的 Shuffle 文件。这个 Shuffle 文件内部由 R 个连续的数据片段组成。每一个 Reduce Task 运行时都会分别连接所有的 Task,从 Mapper 1 一直到 Mapper M 。连接成功后,Reduce Task 会读取每个文件中属于自己的数据片段。
上述方式带来的问题是显而易见的:
由于每次读取的都是这个 Shuffle 文件的 1/R,通常情况下这个数据量是非常非常小的,大概是 KB 级别(从几百 KB 到几 KB 不等),这样会给磁盘(尤其是 HDD )带来大量随机的读请求。
同时,大家可以看到,Reduce 进行的 Shuffle Fetch 请求整体看是一个网状结构,也就是说会存在大量的网络请求,量级大概是 M 乘以 R,这个请求的数量级也是非常大的。
这两个问题随着作业规模的扩大,会带来越来越严重的 Shuffle Failure 问题。Shuffle Failure 意味着超时,Shuffle Failure 本身还有可能导致 Stage 重算,甚至导致作业失败,严重影响批式作业的稳定性,同时还会浪费大量的计算资源(因为 Fetch 等待超时的时候,CPU 是空闲的)。
Spark 在字节跳动的应用
日均 100 万左右个作业
日均 300 PB Shuffle 数据
大量作业签署 SLA,对稳定性要求非常高,超时严重还会严重影响下游
大量 HDD 机器和少量 SSD 机器
大量在线业务低峰出让的资源,可用磁盘空间非常小,需要把存储拉远
问题总结
Chunk Size 过小导致磁盘产生大量随机 IO,降低磁盘的吞吐,引发 Chunk Fetch 请求的堆积、超时甚至引发 Stage Retry; 磁盘 IOPS 无法在操作系统层面进行隔离,Shuffle 过程中不同 Application 作业会互相影响; 在离线混部场景下,我们希望利用在线服务业务低峰期的 CPU,但缺少对应的磁盘资源。
External Shuffle Service 的优化
参数调优
首先是参数调优。为了实现参数调优,我们研发了一个旁路系统,如下图。
首先,采集 Spark、Yarn 运行时的 Event Log 作为数据源;
其次,使用 Flink 对原始数据进行 Join 和计算,得到作业某个 Stage 的 Shuffle 量、Task 数量等指标;
针对上述指标, 一方面,在计算过程使用可插拔的启发式规则对单个作业进行诊断; 另一方面,同时存在着大量的周期作业重复运行生成该作业的历史画像;
最终,结合历史画像与特征诊断信息对特定作业进行自动调参。
spark.sql.adaptive.shuffle.targetPostShuffleInputSize:64M->512M
spark.sql.files.maxPartitionBytes:1G->40G
Shuffle 限流
Shuffle 限流主要解决的是磁盘的 IOPS 不易隔离的问题。我们通过对低优但高负载的作业进行限流,来减轻对同节点上高优作业的影响。整体的思路是当我们发现 ESS 响应请求的 Letency (延迟)升高到一定程度时,比如 10 秒或 15 秒,我们就认为这个节点当前处于异常状态,这时 ESS 就会针对内部正在排队的 Fetch 请求,按照 Application 分类进行分析,综合当前堆积的排队长度和作业的优先级,给每个作业划定一个合适的长度范围,超过范围的作业会被 ESS 告知对应的 Shuffle Client 进行休眠,暂停数据请求,通常暂停1~2分钟,这时该作业的客户端就进入休眠状态,进行等待,同时原本分配给它的 ESS 的服务能力提供给更高优或其他不受影响的作业。
正常任务打开限流没有影响,不会触发流量限制;
异常任务开启限流,不会让任务变慢或失败,大概率会使得任务变快 (限流减少重试,减轻 Server 压力);
此处有必要解释一下,为什么任务会变得更快呢?原因在于当 Latency 升高时,Chunkr Fetch 开始堆积,大量排队,此时往往容易形成恶性循环,请求过来-开始排队-超时-超时后重试-重试后继续排队-继续超时,Fetch 请求可能永远都得不到正常响应。 但当我们开启限流之后,我们主动地让客户端等待,而非发一个请求过来在服务端排队,由此就可以避免大量无效的 Fetch 请求。也正因如此,大概率即便是被限流的作业也会变得更快。
不同优先级的任务,在限流情况下,高优先级任务允许更高的流量;
上文提到,我们是根据排队的数量,及作业的优先级综合地划定一个合适的范围。在划定这个范围的时候,更高优的作业大概率是不会被限流的。
异常节点快速恢复,2min~5min 能恢复正常。
结合第二点,因为我们让一部分发送大量 Fetch 请求的作业的客户端进行了等待休眠,所以异常节点会得到一个非常快速的恢复,大概 2~5 分钟就能恢复正常,恢复正常后,就可以给所有的 Fetch 继续提供服务。
Cloud Shuffle Service 的设计与实现
基本思路
整体架构
Zookeeper WorkerList:我们使用 zookeeper 来提供服务发现的功能;
CSS Worker [Partitions / Disk | HDFS]:管理磁盘并提供 Shuffle Push 服务节点。每一个机器上都会启动 Worker 进程,当收到启动指令时,它就会向 Zookeeper 进行注册,并定时更新上报信息;
Spark Driver:集成启动 CSS Master 和 ClusterName + ZK CSS Master 的作用是规划和统计,Master 从 Zookeeper 中拉取所有 Worker 的信息,并对 Worker 进行分配,然后把 Worker 和 Shuffle 以及每个 Partition 的对应关系通知到 Executor ClusterName + ZK:通过配置的 ClusterName 在 ZK 中寻找对应的 Workerlist
CSS ShuffleClient:Writer 和 Read 的集合,负责跟 Worker通信,读取数据或写入数据。
读写过程
性能分析
Cloud Shuffle Service 的应用实践
上文分析了 Cloud Shuffle Service 的设计和实现,下面讲一下 Cloud Shuffle Service 的应用实践。
CSS Worker 数量 1000+,对应1000多台机器
部署模式灵活:Shell、Yarn、K8S
支持作业类型众多:Spark、MR、Flink Batch
接入作业数 6w+ 单日 Shuffle 量 9PB+
集群部署&作业接入
构建运维接入管理平台(CSS-Coordinator)
提供用户作业无感知接入功能:直接帮用户注入 CSS 相关的参数;
提供 Cluster|Queue|Job等维度的灰度模式:支持以各种纬度接入作业,用户仅需配置对应的接入纬度,该维度下的所有作业都会接入到 CSS 中;
异常作业的监控告警:作业运行结果会上报到 Coordinator 平台,对于运行失败的作业会进行报警
历史 Shuffle 作业的 HBO 优化:平台在作业接入过程中会针对作业历史的 Shuffle 数据量进行评估,当 CSS 集群资源不足时会拒绝大 Shuffle 的作业接入 CSS;
设置 spark.yarn.maxAppAttempts=2
保留用户原始配置
作业 CSS 失败自动 FallBack 到原生 Shuffle
踩坑记录
CSS 服务相关
超大 Register Shuffle 启动缓慢
在最初的设计中,Register Shuffle 会对所有 Worker 进行初始化工作。因此,在规模比较大的 Shuffle 的场景下,Register 就会非常慢,用户启动一个 Stage 可能需要 2-3 分钟。
后来,我们对 Register Shuffle 进行了精简,把 Worker 的初始化动作改成了 Lazy 模式,即只有第一次数据 Push 过来的时候,Worker 才针对这一个作业的 Partition 进行对应的初始化工作。在 Register Shuffle 的时候,只进行 Worker 和 Partition 之间的分配,大大缓解了超大 Register Shuffle 启动缓慢的问题。
Client 发送速率过快
因为我们是一个有状态的服务,无法把 QPS 通过负载均衡的方式降下来,只能通过一些负反馈的方式让 Client 降速,即当 Server 的服务能力无法满足请求时,就让请求在客户端等待。
后续我们尝试了很多方法,包括 Spark 原生的 Max Inflight 等,但效果都不太好,最终我们选择了 Netflix 的一个三方库。
大致原理是,针对最近一段时间的 RTT 做一个 Smoth 处理,得到一个理论的 RTT,然后拿当前的 RTT 与理论 RTT 做比较,如果小于这个值的话,就在 QPS 上做爬坡。如果大于这个值的话,系统就认为现在的 Server 有排队现象,然后就启动限流。
服务热上线,用户如何不感知
在 CSS beta 的过程中,每天都会有新的 Commit 合到主分支,每天也会产生新的问题。但是公司内部的 Spark 发展周期是比较长,跟 CSS 的迭代周期无法 Match。
最终,我们在 Spark 里只集成了一个最简单的接口,其他的实现都放到 HDFS 上,这样就把公司内 Spark 版本的周期与 CSS 的版本周期做了解耦,CSS 就可以做到小步快跑。在小步快跑的过程中,那我们解决了大量的问题。
Spark 集成相关
AQE Skew-Join 读放大问题
AQE Skew-Join 原理图
上图是 Spark 社区提供的 AQE Skew-Join 原理图,根据这个原理,当 Spark 发现某一个 Partition 数据非常大,远超其他 Partition 的时候,它会主动把该 Partition 的数据拆分成多份数据,然后分别去做 Join。这样最终每个 Task 处理的数据量就会更平均,整体作业的运营时间也会变短。
比如上文的例子,1T 的数据会被切分成很多份 512G 的文件,当 AQE Skew-Join 触发时,就不必把一个超大文件读很多遍,只需把这些 512G 的文件按需分配给不同的 Task 进行 Join 就可以。
Task Huge Partition 导致 Executor 内存占用过大
收益分析
未来展望
下面是 CSS 未来的规划和展望。
此前,Cloud Shuffle Service 已在 Github 上开源,基于字节跳动大规模实践的火山引擎批式计算 Spark 版也已经上线火山引擎,支持公有云、混合云及多云部署,全面贴合企业上云策略,欢迎扫码了解👇
往期推荐
谷歌成多国“提款机”
11个你不需要的 VS Code扩展
微信扫码关注该文公众号作者