Redian新闻
>
RocketMQ生产者负载均衡(轮询机制)核心原理

RocketMQ生产者负载均衡(轮询机制)核心原理

公众号新闻


RocketMQ生产者为什么需要负载均衡?

    在RocketMQ中,队列是消息发送的基本单位。每个Topic下可能存在多个队列,因此一个生产者实例可以向不同的队列发送消息。当生产者发送消息时,如果不能均衡的将消息发送到不同的队列,那么会导致队列里的消息分布不均衡,这样最终会导致消息性能下降,因此生产者负载均衡机制也是非常重要的。


RocketMQ生产者原理分析

既然生产者负载均衡如此重要,我们看下是如何实现的。

我们通常使用如下方法发送消息:

构建消息Message msg = new Message("TopicTest",    "TagA",    "OrderID188",    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息    SendResult sendResult = producer.send(msg);

    RocketMQ发送消息的核心逻辑在DefaultMQProducerImplsendDefaultImpl


    在发送消息流程利里面有一行非常关键的逻辑,selectOneMessageQueue,看方法名称就可以知道其含义,选择一个消息队列。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);    }

里面是通过策略类来实现的。

策略类最终通过org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 实现。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //生产者第一次发消息 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //非第一次,重试发消息的情况, for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //重试的情况,不取上一个broker的队列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }第一次发消息选择队列核心逻辑在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()
//线程安全的indexprivate volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
public MessageQueue selectOneMessageQueue() { //获取一个基础索引,每次自增1 这个全局存在TopicPublishInfo 每一个topic int index = this.sendWhichQueue.getAndIncrement(); // 基础索引和 消息写队列大小 进行取模 用来实现轮训的算法 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }

    哈哈,这里就是生产者负载均衡轮询机制的核心逻辑了,使用到了ThreadLocal技术,sendWhichQueue为每个生产者线程维护一个自己的下标索引。

    基础索引计算器,使用ThreadLocal技术针对不同的生产者线程第一次随机,后面递增,可以更加负载均衡。

public class ThreadLocalIndex {    //关键技术    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();    private final Random random = new Random();
public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { //第一次随机 index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } //第二次索引位置开始自增1 index = Math.abs(index + 1); if (index < 0) index = 0;
this.threadLocalIndex.set(index); return index; }}

    哈哈,有没有觉得这个实现非常巧妙了。不同的生产者线程都拥有自己的索引因子,分配队列更加均衡。

总结

    本文分析了RocketMQ生产者底层的实现,设计地方有巧妙之处,值得我们学习,上面是发送非顺序消息的场景, 如果是顺序消息,我们作为使用者可以指定负载均衡策略。

接:https://juejin.cn/post/7300102080902217739

(版权归原作者所有,侵删)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
RabbitMQ vs Kafka:正面交锋全球资管排名出炉,再见了BlackRock...资管一哥 | BlackRock 2025 Analyst Program已开启,留学生速冲ChatGPT强劲对手推出重磅更新/Altman和Brockman将不会加入OpenAI董事会/拼多多或进入大模型领域香港BlackRock开启2024暑期实习生招聘,英国留学生可申[评测]ASRock AMD Radeon RX 7700 XT Challenger 12GB OC 评测帖子从今日热点被撤了?BlackRock/Point72/Millennium开放海量Quant实习岗, 留学生快冲!红色日记 办学习班 10.16-31七律 步月下感怀韵和【首发】多肽药物核心原料领军企业泰和伟业融资近3亿元同学会想到精选SDE岗位 | ARM、Bayer、BlackRock公司岗位发布!解密网络流量管理:深入了解负载均衡类型英国秋招实习集中开岗!BlackRock、小摩…新开200+岗位!速投!口罩后面那双美丽的大眼睛为啥现在要缓和跟美国的关系呢?其实核心原因只有三点使用 Docker Compose 部署 RabbitMQ 的一些经验与踩坑记录精选DS岗位 | Amazon、Microsoft、BlackRock公司岗位发布!精选SDE岗位 | BlackRock、Zoom、BlackBerry公司岗位发布!深度好文|为什么Blackstone叫黑石,而Blackrock叫贝莱德?Docker的使用案例以及未来发展、Docker Hub 服务、环境安全、容器部署安全BlackRock专场 | 大量岗位来袭!Old Markets, New Appeal: Young Chinese Rediscover Wet Markets最强资管!DBC职梦学员已收到BlackRock (US) 全职一面邀请贝莱德BlackRock招聘2024 Placement Program,26届可投[评测]ASRock AMD Radeon RX 7800 XT Steel Legend 16GB OC 评测2023全球资管公司排名出炉,BlackRock归来仍是王者!Spring Week | 顶级资管BlackRock急招本硕生,直通2025暑期实习【Locker Room】Locker Room运动上新!女篮、飞盘&腰旗橄榄球,你准备好了吗?BlackRock 2024秋招已开!海外求职:资管(投资策略、研究、交易、投资组合管理)今日实习|BlackRock推出Placement Program,26届毕业生可报名!今日实习|BlackRock推出Placement Program EMEA项目,26届毕业生可报名!为什么Blackstone叫黑石,而Blackrock叫贝莱德?40 个定时任务,带你理解 RocketMQ 设计精髓!
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。