Redian新闻
>
一文解析 ODPS SQL 任务优化方法原理

一文解析 ODPS SQL 任务优化方法原理

科技

阿里妹导读


本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

一、背景

使用ODPS SQL进行离线数据研发时,开发同学不可避免会碰到任务性能问题,需要经常对ODPS SQL执行任务进行调优,以对重点场景任务产出时效进行保障,避免资源浪费。调优过程需要参考相关优化文档资料,发现技术网站中有很多文章介绍到相关的优化方法,但从ODPS底层执行计划来解释为什么要这样做优化以及背后的依据是什么的介绍文章比较少。本文尝试从ODPS底层逻辑计划拆解部分优化方法对应的优化原理,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

二、ODPS基础架构

本节直接略过MAXCOMPUTE基本信息介绍,直接进入相关架构描述。


2 ODPS架构

ODPS按照功能逻辑划分为接入层、逻辑层、存储/计算层,对应着集群功能则是接入层、控制集群、计算集群。
  • ODPS接入层的最上层是通过LVS实现负载均衡,把请求发送给HTTP Server,该请求包括用户的AccessID和MD5签名信息,HTTP Server在接收到请求后,会把AccessID和MD5签名发给云账号服务进行用户认证,认证通过后,云账号服务会返回该用户的唯一AccountID,在后续执行逻辑中,发送的请求都是包含该AccountID,而不是AccessID。
  • 逻辑层又称作控制层,是MaxCompute的核心部分。实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
  • Worker处理所有的RESTful请求,它可以本地处理一些作业,如对用户空间、表、资源、作业等的管理;而对于需要执行分布式计算的作业,如SQL、MR等,Worker会进一步把它提交给Scheduler处理;
  • Scheduler负责instance的调度,它会维护一个Instance列表,并把Instance分解成各个Task,生成这些Task的工作流——DAG图(Directed Acyclic Graph,有向无环图),把可以运行的Task放到TaskPool中,TaskPool是个优先级队列,后台线程会定时对该优先级队列进行排序;此外,Scheduler还会查询计算集群的资源状况,向计算集群的Fuxi master询问资源占用情况以进行流控(Fuxi slot满的时候,停止响应Executor的task申请)。
  • Executor会判断自身资源情况,如CPU、内存、正在运行的Task数(不能超过上限),如果资源满足,则会主动轮询Scheduler的TaskPool请求获取下一个Task,TaskPool会根据Task的优先级和计算集群的资源情况,把相应Task提交给Executor,Executor获取到Task后,会生成计算层的分布式作业描述文件,提交给计算层,监控这些任务的运行状态,并定时把状态汇报给Scheduler。

简单地说,当用户提交一个ODPS作业请求时,接入层先进行用户认证,然后发送给控制层的Worker,Worker判断是否为同步请求,如果为同步请求,则本地执行并返回。如果是异步请求,Worker会先做些检查(如表是否存在,版本号是否最新等),生成InstanceID,把请求进一步发送给Scheduler,并返回给客户端。Scheduler把作业分解成各个Task,Executor主动轮询Scheduler,获取相应Task,提交给计算层执行,并定时将自己持有的Task的状态汇报给Scheduler。
  • 计算层就是飞天内核(Apsara Core),运行在和控制层相互独立的计算集群上。包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Naming服务)、Shennong(监控模块)等。MaxCompute中的元数据存储在阿里云计算的另一个开放服务OTS(Open Table Service,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/Partition Schema、ACL、Job元数据、安全体系等。

三、ODPS基础概念


3 ODPS作业概念

官方文档描述ODPS元数据模型:

通常情况下,一个odps job对应一个odps instance(会产生一个instance_id), 一个odps instance对应一个odps task, 一个odps task对应一个活多个fuxi job,一个fuxi job可以基于DAG被拆分为多个类型的task如map、reduce和joiner。
一个odps instance对应两个fuxi job的case(小文件合并):

四、ODPS运行时监控


4 Logview2.0框架

参考链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-logview-v2-0-to-view-job-information
后续的任务调优都会基于logview2.0作业运行时监控进行。

五、ODPS执行计划


5.1SQL执行顺序

5.1.1SQL执行顺序

通用的SQL 逻辑算子:

5.1.2SHUFFLE概念
目前大部分的sql性能问题都会和Shuffle强相关,本节重点介绍shuffle基本概念。
目前基本所有的SQL优化问题都会涉及到Shuffle过程,所以先来了解Shuffle的原理,参考Hadoop Shuffle过程原理(Hadoop权威指南):
在Hadoop中数据从Map阶段传递给Reduce阶段的过程就叫Shuffle,Shuffle机制是整个MApReduce框架中最核心的部分。

5.1.3ODPS SQL逻辑执行计划算子

  • ODPS SQL Task Operator结构(截取自ODPS官方文档)

注:可在ODPS SQL前添加EXPLAIN 执行流程得到,EXPLAIN主要有以下的作用:

1、检查SQL语法;

2、检查读取的表和分区是否符合预期,这样可以排除掉很多分区读错的尴尬;

3、检查mapreduce运行结构是否符合预期,检查mapjoin等特性有没有生效;
  • 各operator算子含义


5.2离线ODPS SQL优化方法分析

基于上面的介绍的基本概念,本小节基于ODPS SQL的Explain功能查询静态SQL的逻辑执行计划,分析SQL任务优化前后的差异,结合任务实际运行过程中Logview的监控输出,分析给出任务优化生效的原因。
5.2.1Multi Distinct优化分析

技术网站文章中有大量介绍Multi-Distinct问题的优化方法,先从下面的执行计划来看下。
CASE1:不带Distinct的Count算子使用
EXPLAIN SELECT  app_id        ,count(user_id)FROM    xxx.table_vst_user_testWHERE   dt = '20230816'GROUP BY app_id;
逻辑执行计划:

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果是 app_id和__agg_0_count字段,数据已经预聚合,不存在带有user_id的明细数据shuffle传输,所以任务运行速度较快。

CASE2:带Distinct的Count算子使用
EXPLAIN SELECT  app_id        ,count(DISTINCT user_id)FROM    xxx.table_vst_user_testWHERE   dt = '20230816'GROUP BY app_id;

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id。Map Task输出的中间结果无法预聚合,需要将带有user_id的明细数据传输,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。
CASE3:带多Distinct的Count算子使用
EXPLAIN SELECT  app_id        ,count(DISTINCT user_id)        ,count(DISTINCT cy23_source_name_l1)        ,count(DISTINCT cy23_source_name_l2)        ,count(DISTINCT cy23_source_name_l3)        ,count(DISTINCT cy23_source_name_l4)FROM    xxx.table_vst_user_testWHERE   dt = '20230816'GROUP BY app_id;

分析:可以看到在Map Task输出阶段,还是会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id&cy23_source_name_l1&cy23_source_name_l2&cy23_source_name_l3&cy23_source_name_l4。Map Task输出的中间结果无法预聚合,需要将带有user_id及其他的待去重字段的明细数据传输,字段越多,数据传输量越大,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。
CASE4:带Distinct的Count算子的优化代码(该CASE是对CASE2的代码优化)
EXPLAIN SELECT  app_id        ,COUNT(user_id)FROM(    SELECT  app_id            ,user_id    FROM    xxx.table_vst_user_test    WHERE   dt = '${bizdate}'    GROUP BY app_id            ,user_id)tGROUP BY app_id

分析:优化后的代码,在逻辑计划里多增加了一个Reducer阶段,但在MAP Task的输出阶段,从原先的以app_id进行Hash分区改为了以app_id&user_id进行Hash分区,可以避免数据在传输到Reduce阶段因为热点数据导致的数据倾斜。在第一个Reducer执行阶段,会对Map段传输的数据进行预聚合,不存在带有明细字段的数据向下一个Reducer阶段传输,避免了数据倾斜的发生。整体来看,该优化方法,没有减少Shuffle过程中的明细数据传输,只是对于Map Task的Hash字段从app_id调整为app_id和user_id,减少了热点数据聚集的可能,通过增加计算阶段进行运行时间的优化。
5.2.2系统参数odps.sql.groupby.skewindata=True分析

CASE1:带Distinct的Count算子使用
EXPLAIN SELECT  app_id        ,COUNT(DISTINCT user_id)FROM    xxx.table_vst_user_testWHERE   dt = '${bizdate}'GROUP BY app_id

分析:同5.2.1中的CASE2
CASE2:Case1代码前加入系统优化参数
SET odps.sql.groupby.skewindata = true;
EXPLAIN SELECT app_id ,COUNT(DISTINCT user_id)FROM xxx.table_vst_user_testWHERE dt = '${bizdate}'GROUP BY app_id

分析:可以看到加入系统优化参数后的逻辑执行计划同5.2.1中的Case4,优化后,Map阶段的输出,app_id进行Hash分区改为了以app_id&user_id进行Hash分区,避免热点数据的聚集,通过增加计算阶段进行运行时间的优化。
5.2.3.Join(Map Join/Inner Join/Left Join)
CASE1:大小表关联(SortMergeJoin)
EXPLAIN SELECT  mini_cat_name_l1        ,COUNT(DISTINCT user_id)FROM    (            --主表            SELECT  app_id                    ,user_id            FROM    xxx.table_vst_user_test            WHERE   dt = '20230816'            GROUP BY app_id                    ,user_id        ) t1LEFT JOIN   (                --维表                SELECT  app_id                        ,mini_cat_name_l1                FROM    xxx.dim_category                WHERE   dt = '20230816'            ) t2ON      t1.app_id = t2.app_idGROUP BY mini_cat_name_l1;

下图来自Logview中的执行计划:

J4_1_3内部结构:

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,所以Hash分区的key也是app_id,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在R3_2阶段执行时,不存在热点数据聚集导致的数据倾斜。但参看逻辑执行计划,R3_2的输出会以app_id作为Hash key进行数据传输,数据会在J4_1_3阶段进行整合,并跟M1阶段的小程序维表数据进行MergeJoin,存在数据倾斜的可能。同时在R5_4阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于Logview的执行计划,可以看到两表关联使用的是MergeJoin的算法(参考上图)。

Sort Merge Join算法原理:

算法执行过程:
1. Shuffle阶段:将两张表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;
2. Sort阶段:对单个分区节点的两表数据,分别进行排序;
3. Merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,如果不同,左边小就继续取左边,反之取右边(即用即取即丢),见下图示意:

可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢。
CASE2:大小表关联使用mapjoin hint(BroadcastHashJoin)
EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1        ,COUNT(DISTINCT user_id)FROM    (            SELECT  app_id                    ,user_id            FROM    xxx.table_vst_user_test            WHERE   dt = '20230816'            GROUP BY app_id                    ,user_id        ) t1LEFT JOIN   (                SELECT  app_id                        ,mini_cat_name_l1                FROM    xxx.dim_category                WHERE   dt = '20230816'            ) t2ON      t1.app_id = t2.app_idGROUP BY mini_cat_name_l1;

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,但明确使用的是Mapjoin,所以不存在Hash分区字段,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在J3_1_2阶段执行时,不存在热点数据聚集导致的数据倾斜。数据会在J3_1_2阶段进行整合,并跟M1阶段的小程序维表数据进行Broadcast Hash Join。同时在R4_3阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于逻辑执行计划和Logview的执行计划,可以看到两表关联使用的是BroadcastHashJoin的算法。可以看到针对Case1的代码进行优化后,两表关联算法从SortMergeJoin改为了BroadcastHashJoin,特定场景下,减少了可能的数据倾斜,利用资源空间换时间。
下图来自Logview中的执行计划:

Broadcast Hash Join算法:

SparkSQL中broadcast hash join定义:是将其中一张小表广播分发到大表所在的所有节点上,供打标使用。executor存储小表的全部数据,一定程度上牺牲了空间,换区shuffle操作大量的耗时。

HashJoin的伪代码逻辑:

CASE3:大小表关联使用distributed mapjoin hint
XPLAIN SELECT  /*+ DISTMAPJOIN(t2 (shard_count = 2)) */        mini_cat_name_l1        ,COUNT(DISTINCT user_id)FROM    (            SELECT  app_id                    ,user_id            FROM    xxx.table_vst_user_test            WHERE   dt = '20230816'            GROUP BY app_id                    ,user_id        ) t1LEFT JOIN   (                SELECT  app_id                        ,mini_cat_name_l1                FROM    xxx.dim_category                WHERE   dt = '20230816'            ) t2ON      t1.app_id = t2.app_idGROUP BY mini_cat_name_l1;

下图来自Logview中的执行计划:

J4_2_3内部结构

基于可以看到在Join Task中,使用的是DistributeMapJoin算法。
分析:Case1中的执行计划为原执行计划,M1是小表,上图为使用Distributed MapJoin之后的Plan。
  • 小表一侧分为M1,R2_1 两个Stage。M1阶段读表并进行Shuffle,Shuffle的过程将数据分片(shard=2),使得具有相同hash value的数据分发到同一个worker。R2_1(HashTableBuilder1)作为server端,完成HashTable的构建并常驻内存,接受client端(J4_2_3 DistributedMapJoin1)请求完成Lookup查询并返回values。多个shard共同组合成一个分布式的hash table services,shard数量可以手动调整。各shard的service一旦启动,需要等待client端(DistributedMapJoin1)完成所有的request请求后才stop。

  • 大表一侧为Stage M3和J4_2_3。J4_2_3(DistributedMapJoin1)作为client端,通过网络传输方式将大表端的join keys,分batches往server端(HashTableBuilder1)发起request请求并获取返回values。由于server端的数据已经按照hash value分shard,client端可以根据数据的特征只请求特定的shard。

相比于原Query,使用Distributed MapJoin后,大表侧需要通过RPC建立网络通讯获取小表侧HashTable查询返回的数据,建议大表数据量应该远大于小表,否则带来的收益有限,甚至有可能因为网络的波动导致性能回退。从硬件发展趋势来看,相比于网络带宽,磁盘IO往往更容易成为瓶颈,所以长远看更有益,但是现阶段使用Distributed MapJoin时,要求大表应远大于小表数据量。
注意,本case仅仅是为了对DistributedMapJoin的逻辑执行计划进行分析,与CASE1进行对比,该优化方法不一定适用该测试sql语句。具体适用场景及用法请查询参考资料章节中的DistributedMapJoin链接。

六、总结

本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,精力有限,仅覆盖了部分调优方法的分析,希望能给大家日常SQL优化工作带来一些启发。由于掌握的ODPS底层执行原理资料有限且线上生产环境HBO对于执行计划有影响,存在理解不完全正确的可能,望读者谅解。

参考资料:

  • Sort-Merge Join:https://www.sparkcodehub.com/spark-what-is-a-sort-merge-join-in-spark-sql

  • Join实现原理:https://www.jianshu.com/p/97e76dddcbfb

  • SparkSQL中的三种Join及实现:https://blog.csdn.net/wlk_328909605/article/details/82933552


参与话题讨论赢礼品


你时常焦虑吗?一般是在什么场景,工作或生活?我们是否掉入了“别人贩卖的焦虑”(PUA、35岁危机)的陷阱?


点击阅读原文参加话题讨论,截止2024年1月14日24时,我们将会选出 2 名幸运用户和 2 个优质回答分别获得阿里云开发者无线充电器一个。


微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
一文解读:如何理解“大模型时代”的狂飙趋势?|投资笔记第162期三篇论文解决「语义分割的优化和评估」难题!鲁汶/清华/牛津等联合提出全新方法有多少人对公关行业有偏见 ?一文解析!mysql8.0流程控制一文拿捏MySQL 8.2 正式可用,支持读写分离免费试听|爬虫模型项目实战+SQL面试真题解析三周掌握,华丽变身数据大神!如何设计一款基于 MySQL 实现的 Message Queue4 种 MySQL 同步 ES 方案,yyds!MySQL如何性能调优?上篇Redis缓存与Mysql如何保证双写一致Timescale 推出无服务器数据库的替代方案,Dynamic PostgreSQL罗马最负盛名的古迹(上)基于 MySQL 多通道主主复制的机房容灾方案PostgreSQL数据脱敏方式盘点聊聊优化慢SQL那些事《跟着月亮回故乡》&《相见》1.8w 字详解 SQL 优化美团调整架构,科技与境外业务优先级提升 / 中国成韩国最大跨境电商进口来源地丨36氪出海·要闻回顾技术数商,空间计算,太空5G,出海遇阻,光环褪去,一文解读2024AIoT的推力与阻力Python如何使用MySQL 8.2读写分离?MySQL备份恢复最佳实践:终极指南相聚多伦多(二十三)求不得一文让你对mysql索引底层实现明明白白MySQL主从同步延迟原因与解决方案中日不同的“数字”命名车站EITC-美国对低工作收入家庭的税务优惠读清状元书法MySQL 分库分表实践PostgreSQL夺冠巴菲特公开投资中所使用的量化方法Mysql集群之PXC-Docker安装浅谈SQL优化小技巧MYSQL事务的底层原理为何在中国MySQL远比PostgreSQL流行SQL Server死锁总结
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。