Redian新闻
>
这操作,挽救了一次大事故!

这操作,挽救了一次大事故!

科技

你好,我是yes。

这次事故是有关消息的场景,实际也涉及到分布式事务的内容,我们先简单来回顾下分布式事务。

之前我写过分布式事务相关的文章,基本上把常见的几个分布式事务都捋了捋。

其中我分析了挺多,例如 2pc、TCC、本地消息表、事务消息等等,更多的可以看之前的那篇文章。

这次事故主要是有关本地消息表的实现,借着这篇我也给个实战代码,拿来即用,这里我先再简单把原理介绍一遍。

本地消息表原理

本地消息就是利用了本地事务,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息的插入,即将业务的执行和将消息放入消息表中的操作放在同一个事务中提交,

这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果调用成功就修改这条本地消息的状态。

如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理。

可以看到还是很简单的,也是一种最大努力通知思想。

实战

上面的原理大家应该都清晰了,这篇我就基于本地消息表补偿消息来实现事务和消息发送的事务一致性

在我们日常业务中,MQ的应用必不可少,相信你肯定会遇到这个场景:一个 service 方法里执行一些业务,修改了一些数据落库,然后再发送一条MQ消息,触发下一个流程。

那么问题来了,如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?

这个简单,将同步发送消息的逻辑写在事务内部,就能保证发送失败,事务不会提交。

那么问题又来了,如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能追回吗?

因此我们要解决的第一个问题其实是:当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。

而上述的操作使得我们兜兜转转又回到第一个问题:如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?,万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。

这归根结底是分布式事务问题,是数据库操作跟MQ消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。

而本地消息表则非常简单,接下来我们开始操作!

首先我们需要建立一张本地消息表(当前这个设计主要是为了MQ消息的事务场景):

CREATE TABLE `message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除',
  `topic` varchar(64CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic',
  `tag` varchar(128CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag',
  `msg_id` varchar(64CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id',
  `msg_key` varchar(64COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key',
  `data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串',
  `try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数',
  `status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送',
  `next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间'
  PRIMARY KEY (`id`),
  KEY `idx_key` (`msg_key`),
  KEY `idx_nexttime_status` (`next_time`,`status`),
  KEY `idx_msgid` (`msg_id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';

然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。

@Service
public class MessageService implements IMessageService{
    @Resource
    private Producer producer;
    @Resource
    private MessageMapper messageMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void send(String topicString tagString keyObject obj
{
        sendDelay(topic, tag, key, obj, 0L);
    }
    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void sendDelay(String topicString tagString keyObject objLong period
{
        //计算时间,防止定时任务扫描将还在正常流程中的消息进行重试
        int time = (period == 0L ? 10 : period.intValue() / 1000);
        Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time);
        String data = JSON.toJSONString(obj);
        Message message = new Message()
                .setStatusDelete(0)
                .setTopic(topic)
                .setTag(tag)
                .setMsgId("")
                .setMsgKey(key)
                .setData(data)
                .setTryNum(0)
                .setStatus(0)
                .setNextTime(nextTime);
        // 保存本地消息记录
        messageMapper.save(message);

        // 当前事务提交后,再执行发送消息和更改本地消息记录状态
        TransactionSynchronizationManager.registerSynchronization(
                new TransactionSynchronizationAdapter() {
                    @Override
                    public void afterCommit() {
                        String messageId;
                        try {
                            if (period == 0L) {
                                messageId = producer.send(topic, tag, key, data);
                            } else {
                                messageId = producer.sendDelay(topic, tag, key, data, period);
                            }
                            Message update = new Message()
                                    .setId(message.getId())
                                    .setMsgId(messageId)
                                    .setStatus(1);
                            messageMapper.updateById(update);
                        } catch (Exception e) {
                            log.error("..");
                        }
                    }
                }
        );
    }
}

定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多赘述。

最终的使用就非常简单了:

@Transactional(rollbackFor = Exception.class)
public void doSth(xx
{
    saveA();
    saveB();
    messageService.send(xxx);
}

我们来分析一下:

  1. 假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常。
  2. 假设数据库操作成功,但是数据库事务提交后,服务宕机了,那么消息没发出去,此时 saveA 和 saveB 都保存成功,那么 message 肯定也插入了(它们在同一个事务中),message 的 status 是 0 ,那么我们有个定时任务,根据 nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送
  3. 假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。

就在两个月前,公司用的阿里云的MQ故障,导致消息发送频繁超时,就是因为我们的消息发送都做了以上的改造,因此没有影响业务(数据都正常落库,事务正常提交),部分消息发送超时,由后续补偿任务自动补偿重试。

可以想象,如果没有这个机制可能会发送两种情况:

  1. 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MQ底层升级导致的问题),妥妥P0故障。
  2. 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。

简要分析

一般 service 事务相关方法都用 @Transactional 修饰, messageService.send 也被 @Transactional,默认事务传播级别是 PROPAGATION_REQUIRED,继承外部事务,因此它们处于同一个事务。

然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类

可以看到,它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。

最后

具体操作还是很简单的,仅需一张表,一个服务的所有消息发送都能复用。

回头看看现在的业务代码,看看是不是有业务执行了但是消息没发送成功的风险?小心遇到我之前的问题,有的话赶紧改造吧!

其实这篇也呼应了我之前的这篇文章,RocketMQ事务消息和QMQ(去哪儿)事务消息实现的对比,有兴趣的朋友可以看看。

我是yes,从一点点我亿点点我们下篇见~

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
“当南方人第一次吃冻梨??”哈哈哈哈哈哈哈这操作属实牛X这操作,漂亮!悉尼华人区被曝大量交通隐患,校园周边险酿多起事故!学童安全引忧虑纽约地铁大事故!数百人疏散!旧金山|施工现场发生塌方事故!一名年仅25岁的工人不幸活埋丧命~宫颈癌患者出现医疗事故!医生好心办坏事?徘徊在加拿大落基山脉(二)澳洲一大型工厂发生化学品泄漏事故!五人烧伤德州租客占房敲诈房东,超20户业主被坑?这操作...太专业了!31起事故!17起烧伤!BestBuy爆款高压锅召回:看看你家是不是用这款做了一次大型断舍离,更觉得这些钱没白花 | 96小时限时震惊!车损最大的网红摩托车队事故!疯狂大货车一路碾压!zt寿命也是时势之一 Kung Fu痛”字的草书书法JobKeeper到底多有用?澳政府:至少挽救了30个工作岗位!纽约I-84重大交通事故!满载高中生巴士侧翻 2死40余伤一夜之间取关所有演员,这操作绝了澳华人区被曝大量交通隐患,校园周边险酿多起事故!学童安全引忧虑​被严重警告!日本核电站又发生事故!西北师范大学保研出现重大事故!保研率高达15%!官方通报:严重失误,启动追责问责...[当心]爆4000多起事故!加拿大紧急召回这些!千万别再用!李强私下见拜登,转达习近平歉意澳洲发生火车撞人事故!2条线路被迫停运,部分列车被公交替代痛心!悉尼13岁儿童下公交时遭卡车撞伤!不治身亡!学生频频发生事故!此前还有人被撞飞!炸裂!Chatswood被曝存在大量交通隐患!校园周边险酿多起事故!引发大批华人担忧!语雀突发 P0 级事故!宕机 8 小时被网友怒喷,运维又背锅?Poem by yy56突发事故!16人遇难!澳洲真正的英雄!澳洲大爷用一杯茶挽救了500人的生命!台士兵这操作,他们看了“直摇头”墨尔本发生火车撞人事故!2条线路被迫停运,部分列车被公交替代突发 | 香港突发夺命事故!特首发话严查!她这操作,真不怕被同行封杀吗Kylie Jenner 陷自拍事故!床头柜惊现黑色不明物体引发争议?无忧周报|周六有机会看到日环食,10-12月为撞鹿高发期,这些麻州城镇最多撞鹿事故!红线部份路段将停驶16天
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。