紧急生产问题:线上kafka百万消息积压如何处理?
👉 这是一个或许对你有用的社群
🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入「芋道快速开发平台」知识星球。下面是星球提供的部分资料:
《项目实战(视频)》:从书中学,往事上“练” 《互联网高频面试题》:面朝简历学习,春暖花开 《架构 x 系统设计》:摧枯拉朽,掌控面试高频场景题 《精进 Java 学习指南》:系统学习,互联网主流技术栈 《必读 Java 源码专栏》:知其然,知其所以然
👉这是一个或许对你有用的开源项目
国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。
功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、CRM 等等功能:
Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud 视频教程:https://doc.iocoder.cn 【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本
前言
大家在日常开发中,是否处理过大批量消息积压的问题 呢?
它一般由于代码bug(比如消费逻辑处理有误) 、或者生产者的生产速度大于消费者的消费速度(如大促、抢购等活动期间导致消息数量激增,或者消费者处理速度极慢 ),就可能导致生产环境出现百万、甚至千万的消息积压。
那么,假设发生kafka百万消息堆积,如何解决呢?
先排查是不是bug,如果是,要快速修复 优化消费者代码逻辑 临时紧急扩容,新建临时topic
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro 视频教程:https://doc.iocoder.cn/video/
1. 先排查是不是bug,如果是,要快速修复
遇到消息积压问题时,我们需要先排查,是不是有bug产生了 ,比如消费者未正确提交偏移量(Offset)。
消费者在处理完消息后未提交偏移量,导致重复消费或消费停滞。进而形成大量消息积压。
给个伪代码反例 :
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
// 未提交偏移量
}
}
在处理完消息后,要正确提交偏移量。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
//提交偏移量
consumer.commitSync();
}
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud 视频教程:https://doc.iocoder.cn/video/
2. 优化消费者代码逻辑
如果不是bug,那就可能是消费者速度不给力,导致的消息积压。我们可以优化一下消费者代码逻辑。
可以使用多线程处理 ,可以减少每条消息的处理时间(比如减少不必要的计算),从而提高消息处理速度。
假设消费者有两台 机器,消费者代码优化前是,1秒处理100条消息。代码优化后,l秒可以处理消息500条 。
一个小时,可以处理消息:2* 500 * 3600 = 3600 000
可以发现,如果累积了3百多万消息的话,处理完也要一个小时。如果是生产环境,一些比较敏感或者特殊 的业务,是不允许很长的时间延迟的。
3. 临时紧急扩容,新建临时topic
业务紧急的话,我们可以临时紧急扩容,新建临时topic。
比如原来的topic 只有两个partition分区,因为消费者处理很耗时等操作 ,导致了百万消息积压,这时候需要紧急快速处理。
这时候,消费者的代码,我们可以做一些调整,就是不再处理其他业务操作。而是新建临时的topic ,把消息转发到临时的topic,并且partition 分区增加到原来的 10倍
然后我们原来消费者业务逻辑处理的代码,放在新的临时消息那里处理。
等快速消费完积压数据之后,得恢复原先部署的架构,下掉临时消费者,重新用原先的 consumer 机器来消费消息。
最后
对于线上kafka 消息大量积压的问题,我总结了这几点:
我们要做好监控和告警,当消息积压到一定程度的时候,就要告警,通知负责人 ,提前处理。 不要上来就新建临时topic,去快速处理大量积压问题。应该先排查是不是bug,优化消费者的代码 。 如果消息设置了超时时间,因为百万消息积压,没来得及处理就过期清理,可以设置定时任务拉起来重发一下 。
欢迎加入我的知识星球,全面提升技术能力。
👉 加入方式,“长按”或“扫描”下方二维码噢:
星球的内容包括:项目实战、面试招聘、源码解析、学习路线。
文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)
微信扫码关注该文公众号作者