构建下一代万亿级云原生消息架构:Apache Pulsar 在 vivo 的探索与实践
vivo 移动互联网为全球 4 亿 + 智能手机用户提供互联网产品与服务。其中,vivo 分布式消息中间件团队主要为 vivo 所有内外销实时计算业务提供高吞吐、低延时的数据接入、消息队列等服务,覆盖应用商店、短视频、广告等业务。业务集群已达每天十万亿级的数据规模。
图 1. vivo 分布式消息中间件系统架构
上图为系统的整体架构,其中数据接入层包括数据接入、数据采集服务,支持 SDK 直连;消息中间件由 Kafka 和 Pulsar 共同承担,其中 Pulsar 的承载量达到千亿级别;数据处理部分使用 Flink、Spark 等组件。
目前,Kafka 采用多集群方式,根据不同的业务量级、重要性分别使用不同的集群提供服务,比如计费集群、搜索集群、日志集群。在 Kafka 集群的内部,则采用物理隔离的方式,根据不同业务的重要性,将不同业务的 Topic 控制在不同的资源组内,避免业务之间相互影响。
图 2. Kafka 集群资源隔离
图 3. Kafka 集群流量均衡
资源组内部则会针对 Topic 流量、分区分布、磁盘容量、机器机架等指标生成迁移计划进行流量均衡,以此增强 Kafka 可靠性。目前 Kafka 已在多集群部署、资源隔离、流量均衡三个方面保障了基本的稳定性和资源利用率,但是在此之外,系统仍存在一些问题。
过去几年来,Kafka 集群承载的业务量迅速增长,流量上涨数十倍,带来诸多问题:
Topic 及 Topic 分区总量不断增加,集群性能受到影响:Kafka 高性能依赖于磁盘的顺序读写,磁盘上大量分区导致随机读写加重;
业务流量增加迅速,存量集群变大,需要将老的业务进行资源组隔离迁移或者集群拆分。无论是资源组隔离还是集群的隔离的方式,由于集群不可以进行动态扩缩容,机器不能够进行灵活调配,都存在利用率不高、运维成本增加的问题;
机器扩容慢,需要做长时间流量均衡,难以应对突发流量。集群规模越大,问题越突出;
消费端性能扩展太依赖分区扩容,导致集群元数据疯狂增长;
集群数量对应的机器基数大,硬件故障概率高,出现硬件故障时影响会直接传导到客户端,缺少中间层容错。
面对庞大的集群、流量和多样化的业务场景,综合考虑集群的稳定性和维护成本等因素,vivo 需要一个功能更丰富、适用更多场景、扩展能力更强的消息组件。
Pulsar 如何解决 vivo 存在的问题,可以首先看一下 Pulsar 的架构设计。Pulsar 采用计算存储层分离架构。计算层的 Broker 节点是对等且无状态的,可以快速扩展;存储层使用 BookKeeper 作为节点,同样节点对等。这种分离架构支持计算和存储层各自独立扩展。
图 4. Pulsar 存储计算分离
其次,Pulsar 的各个节点都是轻量化的,在出现故障和宕机时可以快速恢复。一般情况下可以通过快速上下线来解决某个节点机器的问题。同时 Broker 层可以作为 BookKeeper 层的容错层,可以防止故障直接传导至用户端。
Pulsar 扩容时无需长时间的数据迁移,且支持实时均衡。Broker 层抽象了 Bundle 概念,可以用有限的 Bundle 映射海量 Topic,Topic 可以随着 Bundle 迁移,通过动态迁移 Bundle 可以更好地应对流量突发场景。BookKeeper 分层分片的架构让数据分布均匀,在 Broker 层有一个选择机制,在扩容时可以将数据写入存储量小的节点,扩容时无需数据迁移,提供更好的流量高峰应对能力。Bookie 进行数据刷盘时会对批量数据自动进行数据排序,可以避免 Kafka 中的随机读写。
Pulsar 提供了四种消息模型:Exclusive、Failover、Shared 和 Key_Shared,其中 Shared 模型允许一个分区同时被多个消费实例订阅消费,并采用 Round Robin(轮询)方式将数据推送到各个消费实例。因此消费能力的扩展不会过于依赖分区扩容,慢消费的消费实例也可以在 Shared 模型中得到解决。Key_Shared 模型则是在 Shared 的基础上对应对顺序性有要求的场景,可以按照 Key 来消费。
图 5. Pulsar 订阅模型
Pulsar 的设计架构带来了海量分区支撑、消费扩展、精准限流、流量均衡、快速扩缩容、故障恢复、分层存储、云原生容器部署、异地多活等特性和优势,可以帮助集群更好地实现高可用、高扩展,提高了更高的弹性。
下面我们从流量控制和数据管理方面,分享 vivo 在使用 Pulsar 过程中的集群管理经验。
在集群流量控制层面,比较关键的一点就是 Bundle 的管理。Bundle 负责控制用户流量到 Broker 的具体分布。Broker 与 Topic 之间没有直接联系,而是在 Broker 之上抽象出 Bundle 概念,通过 Bundle 与 Topic 建立关系;Topic 通过名称计算哈希值,并散列分布到一致性哈希环中,而哈希环的每一段都是一个 Bundle。另外 Load Manager 根据 Bundle 的负载情况将后者分配到对应的 Broker 上,将 Bundle 数据存储在 ZooKeeper 中。由此以来就间接实现了 Topic 与 Broker 之间的联系(可参考近期 StreamNative 发布的 Broker 负载均衡技术文章)。
图 6. Bundle 与 Topic 建立关系
这里需要注意:
Bundle 的个数影响均衡效果,因为通过一致性哈希来确认 Topic 应该落在哪个 Bundle 上, Topic 与 Bundle 会存在不均衡分配,某些 Bundle 分配的 Topic 可能较多或较少。Bundle 越多,每个 Bundle 承载的 Topic 越少,粒度越细。依赖于 Pulsar 的负载均衡算法,均衡效果更好;否则若 Bundle 太大,无论如何卸载都很难平衡负载;
Bundle 数据和 Broker 映射元数据都维护在 ZooKeeper 中,需要做好 Bundle 数量的规划。
针对以上两点,我们根据 Broker 来设置 Bundle 数量设置最小最大值来控制,还可以对流量较大的 Topic 针对性扩大分区,让分区均匀分配到 Broker Bundle 上。
Pulsar 虽然提供了海量分区能力,但是过多的 Topic 或者分区产生的 lookup 也会对集群产生较大的压力。集群管理者需要提前规划 Bundle 和分区设置,杜绝滥用。
另外对 Bundle 的操作需要注意:
Pulsar 本身提供了卸载操作,可以解除 Bundle 和 Broker 的关联关系,将 Bundle 重新分配。线上流量较大时应卸载 Bundle 而不是整个命名空间,因为卸载后者会导致其上的全部 Bundle 与对应的生产者、消费者断开,重新进行 lookup。
利用 Bundle split 对流量较大的 Bundle 进行拆分,增加命名空间的 Bundle 数量,降低影响。
总体而言,用户需要注意流量的均衡与集群的稳定性,在集群管理之初就做好 Bundle 的数量管理和相关测试,谨慎对待大批量 Bundle 卸载等运维操作。
接下来我们从数据的存储、过期、删除三个方面来分析。
首先介绍数据写入 ledger 的过程。每一个 Topic 分区在一段时间内只创建一个 Ledger 维护分区写入的 Entry 的数据归属。Topic 分区写入的数据以 Entry 的形式,经过 Broker 写入 Netty 线程处理队列,线程依次根据 Entry 的 Ledger Id,对 Ledger 目录数取模,写入到目标磁盘 Ledger 目录,最终以 Entry Log 和 RocksDB 的索引方式存储。需要注意,Ledger 是一个分区在一段时间内写入数据的逻辑管理单位,维护了这段数据存储的 Bookie 位置。一个 Topic 分区在一段时间内写入的数据只被一个活跃 Ledger 管理,待该 Ledger 达到翻转条件后才会关闭 Ledger 并重新计算,创建新 Ledger 继续写入。
图 7. Ledger 翻转示意
Ledger 翻转后,数据才会写入新的数据目录。在 Pulsar 中,在满足 Ledger 最小翻转时间以及以下条件之一后触发 Ledger 翻转:
已达到 Ledger 最大翻转时间;
已达到 Ledger 的最大 Entry 数量;
已达到 Ledger 的最大大小。
默认值:
触发ledger翻转的最小时间:
managedLedgerMinLedgerRolloverTimeMinutes=10
触发ledger翻转的最长时间:
managedLedgerMaxLedgerRolloverTimeMinutes=240
触发ledger翻转的最大entry数:
managedLedgerMaxEntriesPerLedger=50000
触发ledger翻转的最大大小:
managedLedgerMaxSizePerLedgerMbytes=2048
注意两个问题:
Ledger 过大:最小翻转时间是防止 Ledger 元数据过快增长的手段,但实践发现如果 Topic 分区流量较大,Ledger 的实际值可能远超上述设置的上限阈值。Ledger 只有在翻转后才会创建新的 Ledger,Ledger 过大会导致某段时间内写入某个磁盘的数据过多,产生磁盘存储不均衡的问题;针对 Ledger 为对象的一些操作也会受到影响,产生无法及时卸载数据到二级存储、数据卸载时间较长、还未卸载成功但 Ledger 已经过期等问题。
Ledger 间不均衡:Ledger ID 以集群维度进行递增。在分区的维度,按照 Ledger ID 对 Ledger 存储目录数进行取模的方式无法对多磁盘进行均衡写入。但保持 Ledger 间的大小一致,在一定程度上会对多磁盘目录的写入均衡有比较大的改善。
总而言之,建议根据业务消息情况适当调整 Ledger 翻转参数和有针对性地增加大流量 Topic 分区数量,可以防止 Ledger 过大、大小不均衡的问题。
数据过期主要分为四个阶段:
第一阶段:未被 Ack 的消息
Backlog 消息:该段数据不会被删除
第二阶段:已经 Ack 的消息
订阅主动 Ack 后,标记为非 backlog 消息,有多个订阅时以最慢的为准
TTL:若某 Topic 没有活跃订阅,超过 TTL 存活时间的消息会被主动 Ack ,本质上是移动 cursor
第三阶段:消息保留时间检查
Retention:对已经 Ack 的消息的保留策略,按保留周期和保留大小设置来保留消息
第四阶段:消息删除
Deleted:超过 Retenion 范围的消息则被删除。超过 rentention 保留周期和保留大小的消息,系统会从当前已经 ack 消息的最新位置往前检查并获取已经过期的 ledger,将其标记删除。
图 8. 消息保留时间检查与消息删除
从上述的消息阶段演化来看,Pulsar 提供了较大的消息管理空间,但也略显复杂。建议集群维护者建立简单统一的规则处理数据保留策略,如可以设置 TTL = Retention 保留周期值。
此处介绍数据的物理删除。Bookie 在处理数据写入过程时,会将同一段时间内的数据经过排序 flush 到同一个 Entry Log 文件中,将索引存放在 RocksDB 中。由于多个 Ledger 的数据可能会同时写入同一个 Entry Log,因此 Entry Log 便不能被简单直接的删除。对此 BookKeeper 会启动一个 GC(GarbageCollector) 线程进行检查和物理删除操作。
图 9. 数据物理删除流程
Entry Log 维护元数据信息( EntryLogMetadata),该元数据记录了 Ledger 列表、大小与剩余有效数据比例。
GC 清理线程在每个 gcWaitTime 时间间隔:
扫描 Entry Log 的元数据信息,对于已经没有有效数据的 entry log 直接进行删除。
判断是否满足 compaction 条件,满足 compaction 条件后 GC 线程会读取每一个 Entry 判断其是否过期,一旦过期就会丢弃,否则会将数据写入新的 Entry Log。
Compaction 分为 minorCompaction 和 majorCompaction,二者区别在于阈值。默认情况下,minorCompaction 清理间隔 1 小时,阈值 0.2;majorCompaction 清理间隔 24 小时,阈值 0.8。阈值是 Entry Log File 中的剩余有效数据占比。
minorCompactionInterval=3600
minorCompactionThreshold=0.2
majorCompactionThreshold=0.8
majorCompactionInterval=86400
在实际使用中,如果机器节点的磁盘较小且数据迟迟得不到删除,为了及时清除数据,应该按照业务流量和磁盘空间适当调整数据清理间隔时间、有效数据阈值,并配合 compaction 限速策略减小对集群的影响。
vivo 的 Pulsar 指标监控链路架构如下:
图 10. vivo 针对 Pulsar 监控指标搭建的监控架构
该架构中:
采用 Prometheus 采集 Pulsar 指标;
应用 Prometheus 远程存储特性将格式化后的指标发送到 Kafka;
Druid 消费 Kafka 数据后可以作为 Grafana 的数据源,配置 Grafana 面板查询指标。
为什么不使用 Prometheus 存储数据?因为有些数据较久远,一旦集群规模增加,监控指标数量级会很大。Prometheus 对资源依赖重,我们只采用了它的采集能力。
下图是常用的关键指标:
图 11. 关键监控指标
指标类型分为:
客户端指标:用来排查客户端出现的异常
Broker 端指标:监控 topic 流量、调整 broker 间流量差距
Bookie 端指标:排查读写延迟等问题
除了官方指标外,团队还开发了 Bundle 相关的一些指标:
分区数、流量等在 Bundle 的分布
Broker 端记录读写延迟的 P95/P99 值
基于请求对列实现 Broker 端网络负载指标等。
负载均衡的目的是对资源平均分配,差异大会影响稳定性。对负载均衡设置的目标是节点流量偏差 20% 以内,每天均衡频次在 10 次以内,否则客户端会频繁断连、重连。优化后的参数如下:
# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.Broker.loadbalance.impl.ThresholdShedder
# enable/disable namespace Bundle auto split
loadBalancerAutoBundleSplitEnabled=false
# enable/disable automatic unloading of split Bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false
#计算新资源使用量时的CPU使用权重(默认1.0)
loadBalancerCPUResourceWeight=0.0
#计算新的资源使用量时的堆内存使用权重(默认1.0)
loadBalancerMemoryResourceWeight=0.0
#计算新资源使用量时的直接内存使用权重(默认1.0)
loadBalancerDirectMemoryResourceWeight=0.0
下面三个参数改为零,是因为集群使用了相同的机型,团队更关注流量均衡,对内存和 CPU 不是特别关注。
以一个具体产品案例来看,其中有 1 个 Topic、30 个分区、180 个 Bundle:
图 12. 1 个 Topic、30 个分区、180 个 Bundle 的每秒入流量
上图节点间流量差异较大,由 Bundle unload 导致。
图 13. 1 个 Topic、30 个分区、180 个 Bundle 下,Bundle 上 Topic 分区情况
上图可看出,有两个 Bundle 分配了四个分区,远超其他 Bundle。实践中出现以下问题:
均衡频次高,一天大概有 200 多次
客户端连接频繁切换,流量波动大
每个 Bundle 的分区数量分布差异大
图 14. 1 个 Topic、30 个分区、180 个 Bundle 的入流量分布
优化过程中,关键在于将分区打散到不同 Bundle 上,但分区数量太少很难做到。Topic 通过哈希算法分配到 Bundle 上在前文已经介绍。此案例中,问题在于分区数量少。
于是团队将分区增加到 120 个,效果如下:
节点间流量差异小
均衡频次降低,一天大概有 10 次左右
客户端连接切换减少,流量波动较小
每个 bundle 的分区数量分布差异降低
图 15. 1 个 Topic、120 个分区、180 个 Bundle 的每秒入流量
图 16. 1 个 Topic、120 个分区、180 个 Bundle 下,Bundle 上 Topic 分区情况
图 17. 1 个 Topic、120 个分区、180 个 Bundle 的入流量分布
在和上述业务相同的场景中,分区数量增加后,系统滚动重启后出现了流量下降情况:
图 18. 单个 Topic,30 个分区增加到 120 个,系统滚动重启后流量下降
客户端配置参数:
memoryLimitBytes=209715200 (默认为 0)
maxPendingMessages=2000 (默认 1000)
maxPendingMessagesAcrossPartitions=40000 (默认 50000)
batchingMaxPublishDelayMicros=50 (默认 1 毫秒)
batchingMaxMessages=2000 (默认 1000)
batchingMaxBytes=5242880 (默认 128KB)
满足三个 batch 数据中的任何一个的情况下就会触发打包、发送。
图 19. 重启后 maxPendingMessages(队列长度)出现下降
这里 maxPendingMessages(队列长度)=min(maxPendingMessages, maxPendingMessagesAcrossPartitions/partitionNum)
。而分区数添加(30 -> 120)后,需要重启客户端才对队列长度生效。重启后 maxPendingMessages 队列长度 从 40000/30 = 1333 变为 40000/120 = 333,出现了明显下降。
另外,测试发现 batchingMaxMessages 调小后性能提升 10 倍之多:
图 20. 单个 Topic,30 个分区增加到 120 个,调整后性能提升
建议 batchingMaxPublishDelayMicros
不要过大,确保 batchingMaxMessages
比 maxPendingMessages
要大,否则等待 batchingMaxPublishDelayMicros
才会发送。
某个分区队列满后会导致发送线程阻塞,影响所有分区的整体发送和集群稳定性:
图 21. 执行 Kill-9 一台 Broker 后,其他 Broker 流量下降
图 22. 第四个分区已满,发送线程阻塞在 canEnqueRequest 上,等待时间长,其他未满分区的发送也被影响。
图 23. 极端情况下,第四个分区已满,其他分区等待中。发送线程会在第四个分区阻塞等待,其他线程无法发送。
针对这一问题的优化思路,首先是能者多劳,让发送快的分区尽可能多发送;然后是将阻塞点从 ProducerImpl 移到 PartitionedProducerImpl;如果分区 ProducerImpl 出现队列已满阻塞较长时间,就将该分区排除。
图 24. 宕机导致集群流量骤降优化思路
实践中可分为可用 Producer 和不可用 Producer 两个列表。在 ① 中,两个列表都处于初始化状态并可用;在 ② 中,某个可用分区阻塞一段时间后可以等待一段时间;若不可用就移动到不可用列表中,如 ③ 所示;当分区可用比例达到阈值再挪回可用列表,如 ④ 所示。
经过优化后,宕机 Broker 流量可以快速转移到其他 Broker:
图 25. 优化后 Broker 流量分流并上涨
注:优化只支持 RoundRobinPartitionMessageRouterImpl
路由策略。
在单个 ProducerImpl 对应的 Broker 出现处理慢、网络慢等导致发送响应慢的情况,都可能会导致发送线程阻塞,业务发送消息的速度受限于最慢的 ProducerImpl 的速度。
本文分享了 vivo 在 Pulsar 集群管理与监控的经验,并介绍 vivo 在负载均衡等方面的最佳实践。
由于服务端的问题很难通过监控指标进行分析,vivo 在未来计划实现生产端到消费端的全链路监控能力。大数据团队希望整合大数据组件,支撑 Flink、Spark、Druid 等核心下游组件打通落地。
同时,vivo 内部目前 Pulsar 与 Kafka 同时运行,团队将尝试基于 KoP 对存量 Kafka 万亿流量尝试迁移,降低 Kafka 迁移成本;并探索容器化落地,充分发挥 Pulsar 云原生优势。
全利民,vivo 大数据工程师,负责 vivo 分布式消息中间件建设
陈建波,vivo 大数据工程师,曾任微服务应用架构师,负责 vivo 分布式消息中间件的建设
奇葩事儿:删除用户云数据还无法恢复,只赔 3 万;微信键盘来了,体积 524MB;谷歌希望将效率提高 20%:暗示将裁员?| Q资讯
微信扫码关注该文公众号作者