Redian新闻
>
一种异步延迟队列的实现方式

一种异步延迟队列的实现方式

公众号新闻




一、应用场景


目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大地节省系统的资源,不必轮询数据库处理任务。

目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,扫描3:00-3:30的数据,是扫描不到3:31的数据的,需要4:00的时候才能扫描到,相当于多延迟了29分钟!



二、演示处理方式调研


1.DelayQueue

实现方式:

jvm提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过condition进行阻塞、睡眠dealy时间 获取延迟任务。

当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。

存在的问题:
  • 单机运行,系统宕机后,无法进行有效的重试

  • 没有执行记录和备份

  • 没有重试机制

  • 系统重启时,会将任务清空!

  • 不能分片消费

优势:
实现简单,无任务时阻塞,节省资源,执行时间准确

2.延迟队列mq

实现方式:依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitMq、jmq都可以设置延迟消费时间。RabbitMq通过将消息设置过期时间,放入信队列进行消费实现。

存在的问题:时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列

优点:依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机

3.定时任务

通过定时任务轮询符合条件的数据

缺点:

  • 必须要读业务数据库,对数据库造成一定的压力,

  • 存在延时

  • 一次扫描数据量过大时,占用过多的系统资源。

  • 无法分片消费


优点:
  • 消费失败后,下次还能继续消费,具备重试能力,

  • 消费能力稳定


4.redis

任务存储在redis中,使用redis的 zset队列根据score进行排序,程序通过线程不断获取队列数据消费,实现延时队列

优点:

  • 查询redis相比较数据库快,set队列长度过大,会根据跳表结构进行查询,效率高

  • redis可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可

  • 无惧机器重启

  • 分布式消费


缺点:

  • 受限于redis性能,并发10W

  • 多个命令无法保证原子性,使用lua脚本会要求所有数据都在一个redis分片上。


5. 时间轮

通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。

缺点:不适合分布式服务的使用,宕机后,会丢失任务。





三、实现目标


兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。

  • 消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。

  • Client支持丰富:支持多重语言。

  • 高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。

  • 实时性:允许存在一定的时间误差。

  • 支持消息删除:业务使用方,可以随时删除指定消息。

  • 支持消费查询

  • 支持手动重试

  • 对当前异步事件的执行增加监控




四、架构设计






五、延迟组件实现方式


1.实现原理

目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,

发送延迟任务时,会根据时间戳+机器ip+queueName+sequence 生成唯一的id,构造消息体,加密后放入zset队列中。

通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。

监控方通过集成ump

消费记录通过redis备份+数据库持久化完成。

通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。

2.消息结构

每个Job必须包含下几个属性:

  • Topic:Job类型,即QueueName

  • Id:Job的唯一标识。用来检索和删除指定的Job信息。

  • Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)

  • Body:Job的内容,供消费者做具体的业务处理,以json格式存储。

  • traceId:发送线程的traceId,待后续pfinder支持设置traceId后,可与发送线程公用同一个traceiD,便于日志追踪


具体结构如下图表示:

TTR的设计目的是为了保证消息传输的可靠性。

3.数据流转及流程图

基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的disruptor无锁队列消费,不同应用、不同queue之间无锁

1)支持应用只发布,不消费,达到消息队列的功能。

2)支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。

3)通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。 

4)支持设置超时时间配置,防止消费线程执行过久

瓶颈:消费速度慢,生产速度过快,会导致ringbuffer队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,直接提高性能。

可能出现的情况:因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。

后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。



六、demo示例


增加配置文件

判断是否开启jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId> <artifactId>senna-event</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
配置
jd:senna:event:enable: truequeue:retryEventQueue:bucketNum: 1handleBean: retryHandle
消费代码:

package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;import com.jd.car.senna.event.annotation.SennaEvent;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;
/*** @author zhangluyao* @description* @create 2022-02-21-9:54 下午*/@Slf4j@Component("retryHandle")public class RetryQueueEvent extends EventHandler {
@Overrideprotected void onHandle(String key, String eventType) {log.info("Handler开始消费:{}", key);}
@Overrideprotected void onDelayHandle(String key, String eventType) {log.info("delayHandler开始消费:{}", key);}}

注解形式:


package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;import com.jd.car.senna.event.annotation.SennaEvent;import lombok.extern.slf4j.Slf4j;
/*** @author zhangluyao* @description* @create 2022-02-21-9:54 下午*/@Slf4j@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)public class TestQueueEvent extends EventHandler {
@Overrideprotected void onHandle(String key, String eventType) {log.info("Handler开始消费:{}", key);}
@Overrideprotected void onDelayHandle(String key, String eventType) {log.info("delayHandler开始消费:{}", key);}}

发送代码:


package com.jd.car.senna.admin.controller;
import com.jd.car.senna.event.queue.IEventQueue;import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Lazy;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.ResponseBody;import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;import java.util.concurrent.CompletableFuture;

/*** @author zly*/@RestController@Slf4jpublic class DemoController {
@Lazy@Resource(name = "testQueue")private IEventQueue eventQueue;
@ResponseBody@GetMapping("/api/v1/demo")public String demo() {log.info("发送无延迟消息");eventQueue.push("no delay 5000 millseconds message 3");return "ok";}
@ResponseBody@GetMapping("/api/v1/demo1")public String demo1() {log.info("发送延迟5秒消息");eventQueue.push(" delay 5000 millseconds message,name",1000*5L);return "ok";}
@ResponseBody@GetMapping("/api/v1/demo2")public String demo2() {log.info("发送延迟到2022-04-02 00:00:00执行的消息");eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));return "ok";}
}

参考有赞设计:https://tech.youzan.com/queuing_delay/



七、目前应用


  1. 云修到店排队24小时后自动取消

  2. 美团请求token定时刷新。

  3. 质保卡延期24小时生成

  4. 结算单延期生成

  5. 短信延迟发送


END



卡巴斯基实锤拼多多APP恶意代码


这里有最新开源资讯、软件更新、技术干货等内容

点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
漫话 | 运维如何给女朋友解释 IO 中的阻塞、非阻塞、同步、异步?消息队列的过去、现在和未来用了8年MQ!聊聊消息队列的技术选型,哪个最香!过年穿新衣:分放新队服欢乐记我的消息队列专栏不要看不起小师妹整理的实验protocol,她做的实验一直被导师夸!如果有人热捧权威,无论是人还是偶像(包括由他们自己定义的科学),那么对他的言行,你要当心了。厦大夏宁邵教授团队设计开发一种超快速、高灵敏度、低成本的实时荧光定量PCR系统用 Redis 实现延迟队列,我研究了两种方案,发现并不简单详解Redisson分布式限流的实现原理实现异步的方式,你知道几个?NSO专题:化学助力碳中和的实现(特邀编辑:张锦)高精度光学气体传感器及整体解决方案提供商「敢为科技」获得数千万元融资,助力碳中和的实现|36氪首发2022年底波特兰、西雅图、温哥华游Cell重磅:蝙蝠能携带大量病毒的原因—已经进化出能够耐受大量病毒序列的机制一段青绿色的快闪第一家公司卖了119亿美元,第二次他们创业瞄准同种异体CAR-T操作系统:文件系统的实现Spring Boot 实现跨域的 5 种方式,总有一种适合你,建议收藏消息队列之 MetaQ 和 Kafka 哪个更香!物流路由线路配载前端算法逻辑实现方案在种植蘑菇计算机的实验室里,科学家用菌丝体实现神经形态电路不要看不起小师妹整理的实验protocol,她做的实验深受我导的心!跨越70年的对话,Dior新系列的女性主义之歌《流浪地球》里的引力弹弓人类真的实现过!张朝阳在线手推旅行者号木星之旅Flutter异步编程指南Sentinel为什么这么强,我扒了扒背后的实现原理砂子和眼睛,匣子和宝珠,语法和语言:论人在语言中的地位掌声响爆! 马云在以色列的演讲...一篇关于使用Python实现财务报表自动化的实用指南消息链路拆分最佳实践:钉钉审批异步链路重构【总结】第一批去以色列的VC/PE已经赚到钱了源自以色列的编程游戏,通过“闯关”教孩子写真实的编程语言“我爱你,但我们不合适”:你的爱情底色,决定你适合哪种异性蚂蚁:实现异步的8种方式,你知道几个?
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。