Redian新闻
>
Redis和MySQL双写一致性如何保证?这个方案够优雅!

Redis和MySQL双写一致性如何保证?这个方案够优雅!

公众号新闻

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

 

来源:阿Q说代码


最近不是正好在研究 canal 嘛,刚巧前两天看了一篇关于解决缓存与数据库一致性问题 的文章,里边提到了一种解决方案是结合 canal 来操作的,所以就想趁热打铁,手动来实现一下。

架构

文中提到的思想是:

  • 采用先更新数据库,后删除缓存 的方式来解决并发引发的一致性问题;
  • 采用异步重试 的方式来保证“更新数据库、删除缓存”这两步都能执行成功;
  • 可以采用订阅变更日志 的方式来清除 Redis 中的缓存;

基于这种思想,阿Q脑海中搭建了以下架构

  • APP 从 Redis 中查询信息,将数据的更新写入 MySQL 数据库中;
  • Canal 向 MySQL 发送 dump 协议,接收 binlog 推送的数据;
  • Canal 将接收到的数据投递给 MQ 消息队列;
  • MQ 消息队列消费消息,同时删除 Redis 中对应数据的缓存;

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

环境准备

这篇文章中有 mysql 的安装教程:mysql 安装

这篇文章中有 canal 的安装教程以及对 mysql 的相关配置:canal安装

考虑到我们服务器之前安装过 RabbitMQ ,所以我们就用 RabbitMQ 来充当消息队列吧。

Canal 配置

修改 conf/canal.properties 配置


> 基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
> * 项目地址:<https://github.com/YunaiV/yudao-cloud>
> * 视频教程:<https://doc.iocoder.cn/video/>

# 指定模式
canal.serverMode = rabbitMQ
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example

# rabbitmq 服务端 ip
rabbitmq.host = 127.0.0.1
# rabbitmq 虚拟主机
rabbitmq.virtual.host = /
# rabbitmq 交换机
rabbitmq.exchange = xxx
# rabbitmq 用户名
rabbitmq.username = xxx
# rabbitmq 密码
rabbitmq.password = xxx
rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10

# 数据库地址:配置自己的ip和端口
canal.instance.master.address=ip:port

# 数据库用户名和密码
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx

# 指定库和表
canal.instance.filter.regex=.*\\..* // 这里的 .* 表示 canal.instance.master.address 下面的所有数据库

# mq config
# rabbitmq 的 routing key
canal.mq.topic=xxx

然后重启 canal 服务。

这篇文章中有 RabbitMQ 的安装教程:RabbitMQ安装

这篇文章中有 Redis 的安装教程:Redis安装

数据库

建表语句

CREATE TABLE `product_info` (
  `id` bigint(20NOT NULL AUTO_INCREMENT,
  `name` varchar(255DEFAULT NULL,
  `price` decimal(10,4DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8

数据初始化

INSERT INTO cheetah.product_info
(idname, price, create_date, update_date)
VALUES(1'从你的全世界路过'14.0000'2020-11-21 21:26:12''2021-03-27 22:17:39');
INSERT INTO cheetah.product_info
(idname, price, create_date, update_date)
VALUES(2'乔布斯传'25.0000'2020-11-21 21:26:42''2021-03-27 22:17:42');
INSERT INTO cheetah.product_info
(idname, price, create_date, update_date)
VALUES(3'java开发'87.0000'2021-03-27 22:43:31''2021-03-27 22:43:34');

实战

项目引入的依赖比较多,为了不占用过多的篇幅,MySQL 和 Redis 的相关配置在此不再赘述;

RabbitMQ 配置

@Configuration
public class RabbitMQConfig {

    public static final String CANAL_QUEUE = "canal_queue";//队列
    public static final String DIRECT_EXCHANGE = "canal";//交换机,要与canal中配置的相同
    public static final String ROUTING_KEY = "routingkey";//routing-key,要与canal中配置的相同

    /**
     * 定义队列
     **/

    @Bean
    public Queue canalQueue(){
        return new Queue(CANAL_QUEUE,true);
    }

    /**
     * 定义直连交换机
     **/

    @Bean
    public DirectExchange directExchange(){
       return new DirectExchange(DIRECT_EXCHANGE);
    }

    /**
     * 队列和交换机绑定
     **/

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
    }
}

商品信息入缓存

/**
 * 获取商品信息:
 * 先从缓存中查,如果不存在再去数据库中查,然后将数据保存到缓存中
 * @param productInfoId
 * @return
 */

@Override
public ProductInfo findProductInfo(Long productInfoId) {
 //1.从缓存中获取商品信息
 Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
 if(ObjectUtil.isNotEmpty(object)){
  return (ProductInfo)object;
 }
 //2.如果缓存中不存在,从数据库获取信息
 ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
 if(productInfo != null){
  //3.将商品信息缓存
  redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY+productInfoId, productInfo,
    REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
  return productInfo;
 }
 return null;
}

执行方法后,查看 Redis 客户端是否有数据存入

更新数据入MQ

/**
 * 更新商品信息
 * @param productInfo
 * @return
 */

@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo){
 productInfoService.updateById(productInfo);
 return AjaxResult.success();
}

当我执行完 update 方法的时候,去RabbitMQ Management 查看,发现并没有消息进入队列。

问题描述

通过排查之后我在服务器中 canal 下的 /usr/local/logs/example/example.log文件里发现了问题所在。

原因就是meta.dat中保存的位点信息和数据库的位点信息不一致导致 canal 抓取不到数据库的动作。

于是我找到 canal 的 conf/example/instance.properties 实例配置文件,发现没有将canal.instance.master.address=127.0.0.1:3306设置成自己的数据库地址。

解决方案

  • 先停止 canal 服务的运行;
  • 删除meta.dat文件;
  • 再重启 canal,问题解决;

再次执行 update 方法,会发现 RabbitMQ Management中已经有我们想要的数据了。

MQ接收数据

编写 RabbitMQ 消费代码的逻辑

@RabbitListener(queues = "canal_queue")//监听队列名称
public void getMsg(Message message, Channel channel, String msg) throws IOException {
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
  log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue());

  //删除reids中对应的key
  ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
  log.info("库名:"+ productInfoDetail.getDatabase());
  log.info("表名: "+ productInfoDetail.getTable());
  if(productInfoDetail!=null && productInfoDetail.getData()!=null){
   List<ProductInfo> data = productInfoDetail.getData();
   ProductInfo productInfo = data.get(0);
   if(productInfo!=null){
    Long id = productInfo.getId();
    redisTemplate.delete(REDIS_PRODUCT_KEY+id);
    channel.basicAck(deliveryTag, true);
    return;
   }
  }
  channel.basicReject(deliveryTag ,true);
  return;
 }catch (Exception e){
  channel.basicReject(deliveryTag,false);
  e.printStackTrace();
 }
}

当我们再次调用 update接口时,控制台会打印以下信息

从图中打印的信息可以看出就是我们的库和表以及消息队列,Redis 客户端中缓存的信息也被删除了。

拓展

看到这,你肯定会问:RabbitMQ 是阅后即焚 的机制,它确认消息被消费者消费后会立刻删除,如果此时我们的业务还没有跑完,没来的及删除 Redis 中的缓存就宕机了,岂不是缓存一直都得不到更新了吗?

首先我们要明确的是 RabbitMQ 是通过消费者回执 来确认消费者是否成功处理消息的,即消费者获取消息后,应该向 RabbitMQ 发送 ACK 回执,表明自己已经处理消息了。

为了不让上述问题出现,消费者返回 ACK 回执的时机就显得非常重要了, 而 SpringAMQP 也为我们提供了三种可选的确认模式:

  • manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack;
  • auto:自动 ack ,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,抛出异常则返回 nack;
  • none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除;

由此可知在 none 模式下消息投递最不可靠,可能会丢失消息;在默认的 auto 模式下如果出现服务器宕机的情况也是会丢失消息的,本次实战中,阿Q为了防止消息丢失采用的是 manual 这种模式,配置信息如下:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启手动确认

所以在代码中也就出现了

//用于肯定确认
channel.basicAck(deliveryTag, true);
//用于否定确认
channel.basicReject(deliveryTag ,true);

当然此种模式虽然不会丢失消息,但是会导致效率变低。

今天的内容到这里就结束了,赶快动手体验一下吧!



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
MySQL 单表数据最大不要超过多少行?为什么?阿里:MySQL 单表数据最大不要超过多少行?为什么?技术派中的缓存一致性解决方案MySQL 运维常用脚本微服务架构中的数据一致性:解决方案与实践生产环境遇到MySQL数据页损坏问题如何解决?线上 MySQL 的自增 id 用尽怎么办?重名那些MySQL 8.0中的隐藏特性美国有多少种性别?为什么不建议在 Docker 中跑 MySQL ?MySQL高级进阶:索引优化MyBatis 动态 SQL 最全教程,这样写 SQL 太爽了!MySQL 之父:不要把一个优秀的开发者提升为管理者,那会是种资源浪费昨日天涯MySQL 巨坑:永远不要在 MySQL 中使用 UTF-8!!一文了解MySQL全新版本模型技术同学必会的MySQL设计规约,都是惨痛的教训为什么Uber的底层存储从Postgres换成MySQL了?活动招募 | 经营婚姻 守护财富——女性如何保护自己在婚姻关系中的权益我们最大的不可知分布式PostgreSQL基准测试:Azure Cosmos DB、CockroachDB和YugabyteDBPHP程序员薪资竟然垫底、PG取代MySQL成为最流行数据库MySQL 不一样的 NULLMySQL 调整版本控制模型,发布首个创新版本 8.1.04 种 MySQL 同步 ES 方案从MySQL到OBOracle:如何处理自增列?用雪花 id 和 uuid 做 MySQL 主键,被领导怼了硬核观察 #1037 PostgreSQL 超过 MySQL 成为开发者首选数据库MySQL 被 PG 干翻了。。Nginx 代理 MySQL 连接,并限制可访问IPVLDB顶会论文解读 | PolarDB MySQL高性能强一致集群核心技术详解汪精卫其诗其人MySQL 之父,和 Amazon、科大讯飞、宝洁、字节、用友等企业专家齐聚深圳 ArchSummit 架构师峰会!高效方案:30万条数据插入 MySQL 仅需13秒
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。