Redian新闻
>
Blink实时计算:Explorer大基数表的写入性能优化

Blink实时计算:Explorer大基数表的写入性能优化

科技

阿里妹导读


在研发实时数据的过程中碰到了需要update写入Explore的大基数实时数据表的场景。本文记录了经过一系列方式调优后,在流量正常的情况下,任务不再出现explorer链接失败报错和延迟的全过程。

一、背景

最近在研发实时数据的过程中碰到了需要update写入Explore的大基数实时数据表的场景。为了支持支付宝某个广告位的实时看数需求,需要计算实时累计的流量效果数据。比如曝光pv、点击pv,曝光uv、点击uv,pv曝光点击率。
  • 注:Explorer是一款蚂蚁自研海量数据规模下低延时响应的实时分析型(OLAP)数据库, 目标是提供聚合查询能力和一些高阶分析功能。对标业界ClickHouse和阿里的云原生数据仓库AnalyticDB。

现根据业务需求,需要将用户多次行为记录按特定去重规则合并成1条计算。因此设计了一张在user_id、request_id粒度上的大基数explorer表,目标写入数据量在50~80万行/min左右。希望能在此基础上计算出准确的实时累计曝光pv数据。
Blink任务上线后,sink节点数据写入量在30w行/min以上时会持续报错explorer链接失败,任务频繁失败重启,导致延迟持续上涨。本文总结了常用的优化思路和操作,供参考。
com.alibaba.blink.streaming.connectors.common.exception.BlinkRuntimeException: ************ERR_ID:CON-02010602CAUSE:Explorer operation failed, msg: Cannot get a connection, pool error Timeout waiting for idle objectACTION:Flush to explorer table failed, contact blink/explorer admin for help.DETAIL:
************
explorer表例子:

二、问题出现的直接原因

直接原因是explorer集群中负载不均匀。部分机器负载过高,大批量写入的时候cpu使用率很高导致写入响应慢,最终造成写入超时报错,Blink任务失败重启。比如,explorer表配置的shard数是10(hashBucket) * 2(shardConfig_task_replicants) = 20,假如explorer集群的机器只有14台,理论上讲会有6台机器的负载比其他机器高,大批量写的时候cpu使用率会很高,写入响应慢, 导致Blink写入超时报错,任务失败重启。

三、优化思路


3.1. 均匀机器负载:通过优化explorer表分片数

  • 表的分片数需要根据集群有多少机器进行调整。比如集群机器有14台,可以通过设置hash_bucket=14,让链接都均匀的分布在14台机器上, 不让部分机器负载过高。

{  "shardConfig_partition_columns": "test_column", // 根据该列进行哈希运算  "hash_bucket": "14", // 哈希分桶个数  "update_model": "Row", // 更新模式:追加写入/覆盖写入  "shardConfig_task_replicants": "2", -- 副本数量  "storage_engine": "test_engine",  "storage_explorer_tier": "test",}


3.2. 调大超时时间配置

(最简单粗暴的方式,不能从本质上解决问题)
blink是通过jdbc链接的explorer,创建explorer结果表示例如下:
create table explorer_output(  user_id varchar,   request_id varchar,   ...,  primary key(rowkey)) WITH(  -- 写入explorer时的各种参数配置  `user`='test_name'  ,`url`='jdbc:mysql:///${test_ip}/cheetah?characterEncoding=utf8&autoReconnect=true&connectTimeout=10000&socketTimeout=30000&rewriteBatchedStatements=true''  ,`zdalpassword`='${test_password}'  ,`tablename`='test_table'  ,`type`='explorer'  ,`cache`='ALL'  ,`batchInsertSize`='20000'  ,`partitionBy`='rowkey')

超时配置注意事项:

  • blink链接explorer的超时时间由url中connectTimeout和socketTimeout配置。connectTimeout 是blink与explorer TCP建联的超时时间,socketTimeout是blink到explorer TCP读写数据的超时时间。

经尝试,实际将socketTimeOut适当调大后,报错的频率会减少一些。


3.3. 减少写入数据量

3.3.1. Blink sql逻辑优化:通过去重减少输出到sink算子的量

  • 【方法1】having count(*) = 1, 使得同维度下有多条数据时,只取第一条。效果类似first_value() OVER partion by xxx order by窗口函数。但是经实测后发现有问题,会丢失数据。怀疑是开了miniBatch微批处理后第一次count就超过1,数据被过滤掉了。

SELECT  `user_id`,  `request_id`,   ...FROM `expo_detail`WHERE `request_id` IS NOT NULLGROUP BY  `user_id`,  `request_id`,   ...having count(*) = 1 
  • 【方法2】后续改成row_number 方案,相同维度根据日志时间排序取第一条记录即可,示例代码如下:
SELECT       `user_id`,      `behavior`,      `request_id`,        ...      ROW_NUMBER() OVER (        PARTITION BY             `behavior`,          `request_id`,            ...          ORDER BY loged_time -- 顺序排就行      ) AS rn    FROM    (         SELECT          `user_id`,          `behavior`,          `request_id`,            ...        FROM `expo_detail`        WHERE `request_id` IS NOT NULL    ) ) WHERE rn <= 1 -- 只取第一条记录,不丢失即可

topN语句参考:https://help.aliyun.com/apsara/enterprise/v_3_15_0_20210816/sc/enterprise-developer-guide/topn-statement.html?spm=a2c4g.14484438.10001.163

3.3.2. Blink sql逻辑优化:过滤冗余数据不参与计算

  • 在读取上游数据时可以尽量过滤掉冗余数据,不参与后续算子的计算。

  • 本案例中的日志时间有两个,客户端事实发生的日志时间和服务端上报的日志时间。

  • 客户端日志时间会存在乱序的情况(比如几天前的数据延迟到达、由于时区不同导致的“未来”的时间)。通过限制log_time(客户端日志时间)在当天内且<=当前最新时间,可以过滤掉不需要的数据,减少一定数据量。

  • loged_time服务端上报时间相比客户端日志时间,乱序的情况会少一些。此处由于业务需要,以客户端日志时间为准。

3.3.3. Blink参数优化:限制读取日志流的tps

  • 可通过限制上游输入的tps,让数据稳定快速的被处理完输出出去。

  • 如果设置过大,在tps高峰出现时,source节点输入tps量会暴涨,给任务带来较大的性能压力,最终也会影响sink节点写入explorer的稳定性。

CREATE TABLE test_table (  ....) WITH(  -- blink参数配置`batchGetSize`='5', -- 适当调小,缓解tps高峰时带来的性能压力  ...);

参数释义:https://help.aliyun.com/apsara/enterprise/v_3_15_0_20210816/sc/enterprise-developer-guide/create-a-log-service-source-table-1.html?spm=a2c4g.14484438.10001.114

3.3.4. Blink执行计划优化:减少source节点并发

  • 可以通过减少source节点的并发,减少下游算子压力

  • 如图,可以在Flink UI界面上查看source节点的并发数。一般source节点的并发和上游分片数保持一致。适当调小也可以减少下游写入压力。


3.4. 其他常见延迟调优方法

3.4.1. Blink参数优化:调整explorer写入参数

可以改配置控制Blink写入explorer的频率和一次写入的数据量,提高写入的效率。Explorer写入Blink参数配置说明:
参数
注释说明
备注
batchInsertSize
一次写入的条数
可选,默认200
flushPoolSize
写入线程数
可选,默认1,超过1,写数据会乱序, 如果是update表不能开启
workQueueSize
executor的工作队列大小,和buffer大小成正比,调大就允许更多数据在缓存中
可选,默认是20
flushIntervalMs
刷数据周期,写入速度 1次/2s,30次/m
默认2s,表示每2s会刷一次数据
CREATE TABLE test_table (`user_id`   VARCHAR,`rowkey`    VARCHAR,`loged_time`    TIMESTAMP, ...primary key(`user_id`, `rowkey`)) WITH(  -- 部分参数示例`tablename`='test_table',`type`='explorer',`cache`='ALL',`batchInsertSize`='500' -- 一次写入的条数 默认200,`partitionBy`='rowkey' ,`workeQueueSize`='50' -- executor的工作队列大小,默认20,`flushIntervalMs`='2000' -- 刷数据周期,默认2s,每2s刷一次数据);

补充:TaskManager/subTask/slot和线程池/线程和insertBatchSize的关系

一个slot对应一个subTask,一个taskManager假设有32个slot,有32个subTask, 那么一个subTask对应一个线程。整个connectionPool共32个线程,会同时有活跃和非活跃的线程。Sink节点并发数和cpu数会影响subTask的个数。创建explorer结果表的配置insertBatchSize会影响一次写入的数据量。flushInterval影响写入的频率。update表一次写入的数据量增加,耗时会增大。

3.4.2. 避免频繁Full GC

垃圾回收期间,任务会中断执行,影响Blink性能。如何发现Full GC:通过监控Flink metrics可以直接发现。 

也可以在Flink UI界面上点击某一个taskManager,点击Metric查看Old Generation GC次数,太大说明存在频繁GC的情况,该taskManager的heap内存不够。
怎么判断内存不够?一般需要缓存大量的数据的地方就需要调大Task Off-Heap堆内存。比如做开窗计算时,需要缓存window窗口段内的数据。补充:
Flink内存模型:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/memory/mem_setup_tm/

3.4.3. 其他内存怎么调

  • native 内存:开窗计算,聚合计算, 维表关联,去重,这些操作需要调大native 内存。

  • Direct memory:当出现DirectBuffer 内存溢出(Out Of Memory)报错时时,可通过修改blink任务参数调大Direct memory。 

  • 参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/memory/mem_setup_tm/

四、总结

经过以上方式调优后,在流量正常的情况下,任务不再出现explorer链接失败报错和延迟,写入explorer表的数据量稳定在60-80w行/min。 
但从解决指标去重的业务问题的角度来讲,通过存uid、request_id级明细数据的方式计算实时累计曝光pv在这个场景下并不是一个最好的方案。 
换个思路,该页面区块的曝光pv需要在uid、request_id级别上去重,看起来是一个多维度去重的问题,但实际上uid和request_id是1对多的关系,不是多对多,对于每一个uid,一个request_id下的多次曝光需要去重,可以转化为直接对request_id去重。计算uv是单维度去重问题的典型例子,参考常见的uv去重的方式,可以采用了hyperLogLog算法(原理见参考文献3)计算曝光pv数据。此处再附一个几种去重方案的性能对比数据,可以看出方案4 explorer存储15min级分时adm数据+hyperLogLog算法计算去重曝光pv比较好(前提是业务看数可以接受1-5%由hyperLogLog带来的误差)。

参考文档/文献

  1. Flink官方文档:https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/flink-architecture/

  2. 阿里云-实时计算Blink用户文档: https://help.aliyun.com/apsara/enterprise/v_3_15_0_20210816/sc/enterprise-user-guide/what-is-realtime-compute1.html?spm=a2c4g.14484438.10001.25

  3. Flajolet, P., Fusy, É., Gandouet, O., & Meunier, F. (2007). Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. Discrete mathematics & theoretical computer science, (Proceedings).


阿里云开发者社区,千万开发者的选择


阿里云开发者社区,百万精品技术内容、千节免费系统课程、丰富的体验场景、活跃的社群活动、行业专家分享交流,欢迎点击【阅读原文】加入我们。

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
AmEx Refer Jail (AmEx refer 功能突然消失)现象及解决方案Linux性能优化9张图(收藏)Exploration:基于铁电忆容器阵列的超低能耗存内算‘Yellow Fever’ Expat Soccer Team Forced to Change Name壹号本 OnexPlayer 新款飞行家掌机公布:R7 7840U + 7 英寸 1080p 屏俄罗斯科技巨头Yandex创建大模型YandexGPT,声称性能优于ChatGPT世外、包校代表的国际学校和Rectory、Bement代表的美初升入顶级美高的申请路径分享SpringCloud 组件性能优化技巧几个 Nginx 性能优化方法5078 血壮山河之武汉会战 黄广战役 10钉钉协作Tab前端进化之路【极致性能优化总结】XM announces plans to become model city for express deliveryChina Railway Express (Xiamen) breaks 100K TEU thresholdGas Explosion in Northwest China Restaurant Leaves 31 Dead消息队列CKafka跨洋数据同步性能优化几个Nginx性能优化方法咀外文嚼汉字(243)“九轮草”与“报春花”点击领取 IEEE Xplore MOOC 2023 秋季课程最新课表啦!中国正在背离邓小平模式CodeGeeX2-6B开源,最低6GB显存,性能优于 StarCoder[电脑] 当 ROG 遇上 XPG——ROG Z790 HERO+XPG ROG 认证内存+太阳神装机展示CTO偷偷传我的系统性能优化十大绝招(万字干货)2015 macbook pro 13in(i5 2.7gh/8g/256g)low battery circle *exploRedis10大性能优化策略【60k 开卡奖励】Chase UA Explorer 信用卡杏林春暖字节跳动微服务架构下的高性能优化实践Linux 性能优化的全景指南,都在这一篇里了,建议收藏~你好,我是筚(bì)篥( lì)!Nature Catalysis | 季泉江/申怀宗合作在微型基因编辑器工作机制解析和性能优化方面取得新进展After String of Gas Explosions, China Updates Safety Guidelines精选DS岗位 | Tesla、American Express、Apple 等公司持续热招!美国国会大厦升国旗 向李洪志先生致敬Extreme Drinking Claims Another Chinese LivestreamerIntex Explorer K2 二人独木舟5.3折 149.98元包邮!
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。