Redian新闻
>
一碰就头疼的 Kafka 消息重复问题,立马解决!

一碰就头疼的 Kafka 消息重复问题,立马解决!

公众号新闻

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

 
来源:juejin.cn/post/
7172897190627508237

一、前言

数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

整理下消息重复的几个场景:
  1. 生产端: 遇到异常,基本解决措施都是 重试
  • 场景一:leader分区不可用了,抛 LeaderNotAvailableException 异常,等待选出新 leader 分区。
  • 场景二:Controller 所在 Broker 挂了,抛 NotControllerException 异常,等待 Controller 重新选举。
  • 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException 异常,等待网络恢复。
  1. 消费端: poll 一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会 poll 上批数据,再度消费就造成了消息重复。
怎么解决?

先来了解下消息的三种投递语义:

  • 最多一次( at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqttQoS = 0
  • 至少一次( at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqttQoS = 1
  • 精确一次( exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqttQoS = 2
了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:
  1. Kafka 幂等性 Producer 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)
  2. Kafka 事务: 保证生产端发送消息幂等。解决幂等 Producer 的局限性。
  3. 消费端幂等:保证消费端接收消息幂等。蔸底方案。
1)Kafka 幂等性 Producer

幂等性指 :无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

幂等性使用示例:在生产端添加对应配置即可

Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 设置幂等
props.put("acks""all"); // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection"5); // 3. 注意
  1. 设置幂等,启动幂等。

  2. 配置 acks,注意:一定要设置 acks=all,否则会抛异常。

  3. 配置 max.in.flight.requests.per.connection 需要 <= 5 ,否则会抛异常 OutOfOrderSequenceException

  • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
  • Kafka >= 1.1, max.in.flight.request.per.connection <= 5
[**为了更好理解,需要了解下\ \Kafka 幂等机制:](https://mp.weixin.qq.com/s/PiAxqEhkR8g1AOYGGS5Yqw)
  1. Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)

  2. Sequence Numbe:针对每个 <Topic, Partition> 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num

  3. 判断是否重复:<pid, seq num>Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在

  • 如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。
  • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。
  • 反之,要么重复,要么丢消息,均拒绝。
这种设计针对解决了两个问题:
  1. 消息重复: 场景 Broker 保存消息后还没发送 ack 就宕机了,这时候 Producer 就会重试,这就造成消息重复。
  2. 消息乱序: 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。
那什么时候该使用幂等:
  1. 如果已经使用 acks=all,使用幂等也可以。
  2. 如果已经使用 acks=0 或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。
2)Kafka 事务

使用 Kafka 事务解决幂等的弊端:单会话且单分区幂等。

Tips 这块篇幅较长,这先稍微提及下使用,之后另起一篇。

事务使用示例:分为生产端 和 消费端

Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 设置幂等
props.put("acks""all"); // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection"5); // 3. 最大等待数
props.put("transactional.id""my-transactional-id"); // 4. 设定事务 id

Producer<String, String> producer = new KafkaProducer<String, String>(props);

// 初始化事务
producer.initTransactions();

try{
    // 开始事务
    producer.beginTransaction();

    // 发送数据
    producer.send(new ProducerRecord<String, String>("Topic""Key""Value"));
 
    // 数据发送及 Offset 发送均成功的情况下,提交事务
    producer.commitTransaction();
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 数据发送或者 Offset 发送出现异常时,终止事务
    producer.abortTransaction();
finally {
    // 关闭 Producer 和 Consumer
    producer.close();
    consumer.close();
}

这里消费端 Consumer 需要设置下配置:isolation.level 参数

  • read_uncommitted 这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed 表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
3)消费端幂等

“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。

只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

典型的方案是使用:消息表,来去重:

  • 上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息。
  • 如果消息重复,则新增操作 insert 会异常,同时触发事务回滚。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

二、案例:Kafka 幂等性 Producer 使用

环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

准备工作如下:

1、Zookeeper:本地使用 Docker 启动

$ docker run -d --name zookeeper -p 2181:2181 zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

2、Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动)

3、启动生产者:Kafka 源码中 exmaple

4、启动消息者:可以用 Kafka 提供的脚本


> 基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
> * 项目地址:<https://github.com/YunaiV/yudao-cloud>
> * 视频教程:<https://doc.iocoder.cn/video/>

# 举个栗子:topic 需要自己去修改
$ cd ./kafka-2.7.1-src/bin
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

创建 topic 1副本,2 分区

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

生产者代码:

public class KafkaProducerApplication {

    private final Producer<String, String> producer;
    final String outTopic;

    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic)
 
{
        this.producer = producer;
        outTopic = topic;
    }

    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord
            = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }

    public void shutdown() {
        producer.close();
    }

    public static void main(String[] args) {

        final Properties props = new Properties();

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}

启动生产者后,控制台输出如下:

启动消费者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
修改配置 acks
``

启用幂等的情况下,调整 acks 配置,生产者启动后结果是怎样的:

  • 修改配置 acks = 1
  • 修改配置 acks = 0

会直接报错:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
Otherwise we cannot guarantee idempotence.
修改配置 max.in.flight.requests.per.connection
``

启用幂等的情况下,调整此配置,结果是怎样的:

将  max.in.flight.requests.per.connection > 5 会怎样?

当然会报错:

Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.


欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
白纸坦荡荡IPO被否,财务真实性被反复问询,项目质量一言难尽......程序员头疼的4种原因所有墨尔本人注意!这两天在海边做这件事,立马罚款200刀!疫情刚散,经济又衰退,法拉盛八大道华人最头疼的难题,是时候彻底解决了...狗子在家“天女散花”被抓包,立马四脚朝天躺地装死:不是我桌游不光是玩,顺便记住令人头疼的恐龙英文名字| 试玩《骰笔恐龙岛》王小波:人在年轻时,最头疼的一件事边牧对猫呲牙却喷出鼻涕泡,立马尴尬地躲起来哈哈美丽的凡尔赛花园早上跳广场舞,下午打牌,一带孩子就头疼,婆婆患有“公主病”怎么治?澳洲航空业强劲复苏!机票价格即将大跳水!最让航司头疼的问题是这个!加息重压下,哪些科技公司值得关注?成本高不赚钱,台积电为何非要去美国建厂?|直播预告很多男人出门头疼的事,可以解决了恐怖!载106人波音突然冒烟,迫降加拿大!"啥也别拿,立马下机!"还在为圣诞节送礼物头疼的小伙伴们看过来!关于奥密克戎致病率、后遗症、重复感染等问题,钟南山最新研判……秋行南意—小村的故事(2)小初攻克英语,立马想到的,还是这套“过时经典”恐怖!载106人波音突然冒烟,迫降加拿大… 飞机喷火俯冲… "啥也别拿,立马下机"!“你再给我顶一句,立马让你走!”管理人员怒斥员工,上市公司回应:已降其为一线环卫工巴基斯坦这个生存问题,必须解决!柴犬看到主人吃苹果派,立马把自己塞进椅子中间讨吃:我也要刘强东回国后,立马干了一件大事两会专访|全国人大代表、中科院量子信息重点实验室副主任郭国平:产业链协同助力量子计算穿越商业化“死亡谷”小黑狗看到主人在准备沐浴用品,立马躲到…瞬间消失哈哈从亨廷顿的预言到特朗普的MAGA(六)毛主席大“党领导一切”, 在不同的时候含义不同, 未必非得是党员来代表党组织怕被GPT4取代?看完这些案例,立马用起来孩子看见数学就头疼?那一定要看看这篇文章!弱者逃避问题,强者解决问题,智者消除问题拿到Offer后,还有100+问题待解决!新生入学指导资源最全汇总彩票中$20亿,30岁穷光蛋秒变富豪,立马在好莱坞买$2550万豪宅给一看到娃写作文就头疼的您开一剂良药!柯基见“好友”跳上保时捷想傍大款,立马喊来主人:差点让你小子飞黄腾达
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。