Redian新闻
>
golang中使用kafka的综合指南

golang中使用kafka的综合指南

公众号新闻

介绍

kafka是一个比较流行的分布式、可拓展、高性能、可靠的流处理平台。在处理kafka的数据时,这里有确保处理效率和可靠性的多种最佳实践。本文将介绍这几种实践方式,并通过sarama实现他们。

以下是一些kafka消费的最佳实践:

  1. 选择合适的提交策略:Kafka提供两种提交策略,自动和手动。虽然自动操作很容易使用,但它可能会导致数据丢失或重复。手动提交提供了更高级别的控制,确保消息至少处理一次或恰好一次,具体取决于用例。

  2. 尽可能减少Kafka的传输次数:大批量读取消息可以显著提高吞吐量。这可以通过调整 fetch.min.bytes 和 fetch.max.wait.ms 等参数来实现。

  3. 尽可能使用消费者组:Kafka允许多个消费者组成一个消费者组来并行消费数据。这使得 Kafka 能够将数据分发给一个组中的所有消费者,从而实现高效的数据消费。

  4. 调整消费者缓冲区大小:通过调整消费者的缓冲区大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根据消息的预期大小和消费者的内存容量进行调整。这可以提高消费者的表现。

  5. 处理rebalance:当新的消费者加入消费者组,或者现有的消费者离开时,Kafka会触发rebalance以重新分配负载。在此过程中,消费者停止消费数据。因此,快速有效地处理重新平衡可以提高整体吞吐量。

  6. 监控消费者:使用 Kafka 的消费者指标来监控消费者的性能。定期监控可以帮助我们识别性能瓶颈并调整消费者的配置。

选择合适的提交策略

1.自动提交

Sarama 的 ConsumerGroup 默认情况下会自动提交偏移量。这意味着它会定期提交已成功消费的消息的偏移量,这允许消费者在重新启动或消费失败时从中断的地方继续。

下面是一个自动提交的消费者组消费消息的例子:

config := sarama.NewConfig()  config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.AutoCommit.Enable = true  config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second    ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  if err != nil {      log.Panicf( "创建消费者组客户端时出错: %v" , err)  }    Consumer := Consumer{}  ctx := context.Background()    for {      err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)      if err != nil {          log.Panicf( "来自消费者的错误: %v" , err)      }  }

根据config.Consumer.Offsets.AutoCommit.Interval 可以看到,消费者会每秒自动提交offset。

2. 手动提交

手动提交使我们更好地控制何时提交消息偏移量。下面是一个手动提交的消费者组消费消息的例子:

config := sarama.NewConfig()  config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.AutoCommit.Enable = false   consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)  if err != nil {      log.Panicf( "创建消费者组客户端时出错: %v" , err)  }    Consumer := Consumer{}  ctx := context.Background()    for {      err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)      if err != nil {          log.Panicf( "Error from Consumer: %v" , err)      }  }    
type Consumer struct {} func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil } func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for msg : = range Claim.Messages() { fmt.Printf( "Message topic:%q partition:%d offset:%d\n" , msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "" ) } return nil }


在该示例中, 使用MarkMessage手动将消息标记为已处理,最终根据Consumer.Offsets.CommitInterval 配置提交。另外这个例子省略了错误处理部分,开发时需要注意正确处理生产过程中出现的错误。

译者注:这篇文章虽然是今年5月发布,但是这里的提交方式还是有些过时了,目前sarama已经废弃了Consumer.Offsets.CommitInterval,相关配置目前在 Consumer.Offsets.AutoCommit

尽可能减少Kafka的传输次数

减少kafka的传输次数可以通过优化从kafka中读取和写入数据的方式来实现:

1. 增加批次的大小

使用kafka批量发送消息的效果优于逐个发送消息,批次越大,kafka发送数据效率就越高。但是需要权衡延迟和吞吐量之间的关系。较大的批次虽然代表着更大的吞吐量,但也会增加延迟。因为批次越大,填充批次的时间也越久。

在Go中,我们可以在使用sarama包生成消息时设置批次大小:

config := sarama.NewConfig()  config.Producer.Flush.Bytes = 1024 * 1024

以及获取消息的批次大小

config := sarama.NewConfig()  config.Consumer.Fetch.Default = 1024 * 1024

2. 使用长轮询

长轮询是指消费者轮询时如果Kafka中没有数据,则消费者将等待数据到达。这减少了往返次数,因为消费者不需要在没有数据时不断请求数据。

config := sarama.NewConfig() config .Consumer.MaxWaitTime = 500 *time.Millisecond

该配置告诉消费者在返回之前会等待500毫秒

3. 尽可能使用消费者组

消费者组是一组协同工作消费来自kafka主题的消息的消费者。消费者组允许我们在多个消费者之间分配消息,从而提供横向拓展能力。使用消费者组时,kafka负责将分区分配给组中的消费者,并确保每个分区同时仅被一个消费者消费。

接下来是sarama中消费者组的使用:

  1. 使用消费者组需要实现一个ConsumerGroupHandler接口:

该接口具有三个方法: SetupCleanup、 和ConsumeClaim

type exampleConsumerGroupHandler struct { } 
func (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for message := range Claim.Messages() { fmt.Printf( "Message: %s\n" , string (message.Value))
session.MarkMessage(message, "" ) } 返回 nil }

  1. 创建 sarama.ConsumerGroup并开始消费:

brokers := []string{"localhost:9092"} topic := "example_topic"  groupID := "example_consumer_group"    
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer consumerGroup.Close()
handler := &exampleConsumerGroupHandler{}
for { err := consumerGroup.Consume(context.Background(), []string{topic}, handler) if err != nil { log.Printf("Error consuming messages: %v", err) } }

该示例设置了一个消费组,用于消费来自“example_topic”的消息。消费者组可以通过添加更多消费者来提高处理能力。

使用消费者组时,记得处理消费期间rebalance和错误。

调整消费者缓冲区大小

在sarama中,我们可以调整消费者缓冲区的大小,以调整消费者在处理消息之前可以在内存中保存的消息数量。

默认情况下,缓冲区大小设置为256,这代表Sarama在开始处理消息之前将在内存中保存最多256条消息。如果消费者速度很慢,增加缓冲区大小可能有助于提高吞吐量。但是,更大的缓冲区也会消耗更多的内存。

以下是如何增加缓冲区大小的例子:

config := sarama.NewConfig()  config.Consumer.Return.Errors = true  config.Version = sarama.V2_1_0_0  config.Consumer.Offsets.Initial = sarama.OffsetOldest  
config.ChannelBufferSize = 500
group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config) if err != nil { panic(err) }
ctx := context.Background() for { topics := []string{topic} handler := exampleConsumerGroupHandler{}
err := group.Consume(ctx, topics, &handler) if err != nil { panic(err) } }

处理rebalance

当新消费者添加到消费者组或现有消费者离开消费者组时,kafka会重新平衡该组中的消费者。rebalance是kafka确保消费者组中的所有消费者不会消费同一分区的保证。

在sarama中,处理rebalance是通过 Setup 和CleanUp函数来完成的。

通过正确处理重新平衡事件,您可以确保应用程序正常处理消费者组的更改,例如消费者离开或加入,并且在这些事件期间不会丢失或处理两次消息。

译者注:其实更重要的是在ConsumeClaim函数在通道关闭时尽早退出,才能正确的进入CleanUp函数。

监控消费者

监控Kafka消费者对于确保系统的健康和性能至关重要,我们需要时刻关注延迟、处理时间和错误率的指标。

Golang没有内置对 Kafka 监控的支持,但有几个库和工具可以帮助我们。让我们看一下其中的一些:

  1. Sarama的Metrics:Sarama 提供了一个指标注册表,它报告了有助于监控的各种指标,例如请求、响应的数量、请求和响应的大小等。这些指标可以使用 Prometheus 等监控系统来收集和监控。

  2. JMX Exporter:如果您在 JVM 上运行 Kafka, 则可以使用 JMX Exporter 将kafka的 MBeans 发送给Prometheus

  3. Kafka Exporter:Kafka Exporter是一个第三方工具,可以提供有关Kafka的更详细的指标。它可以提供消费者组延迟,这是消费kafka消息时要监控的关键指标。

  4. Jaeger 或 OpenTelemetry:这些工具可用于分布式追踪,这有助于追踪消息如何流经系统以及可能出现瓶颈的位置。

  5. 日志:时刻关注应用程序日志,记录消费者中的任何错误或异常行为。这些日志可以帮助我们诊断问题。

  6. 消费者组命令, 可以使用kafka-consumer-groups 命令来描述消费者组的状态。

请记住,不仅要追踪这些指标,还要针对任何需要关注的场景设置警报。通过这些方法,我们可以在问题还在初始阶段时快速做出响应。

以上工作有助于确保使用kafka的应用程序健壮、可靠且高效。

链接:https://juejin.cn/post/7306756690728419354

(版权归原作者所有,侵删)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
【LGA 已开业,JFK 即将开业!】Chase 机场休息室 Sapphire Lounge 介绍YOLO再升级!华为诺亚提出Gold-YOLO,聚集-分发机制打造新SOTA2024招聘季 | BCG中国区冬春季PTA正式启动!BCG中国区执行合伙人吴淳:中国医疗行业展望Heilongjiang Gymnasium Collapse Kills 3 Middle School Students敢“逼走”李宗盛,这档10年前的综艺太狠了Preparing for the 2023 Tax Year: Ensuring a Strong Tax Saving?注意!3月截止的综合性大学转学申请汇总,想申请的要抓紧了!KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用Golang中的观察者模式:优化订单处理系统是性格决定婚姻质量?-我妈三姐妹婚姻的启示“多巴胺”的“胺”应读为àn,“2023年十大语文差错”发布红色日记 胜利闭幕 10.13-15现在的综艺都这么敢了吗中使馆提醒中国公民注意冰岛申根签证有效期问题!墨西哥格雷罗州全境停电、航班停飞!中使馆发布安全提醒正视AI技术发展利弊,高校招生部如何看待文书中使用ChatGPT“多巴胺”的“胺”不读ān?岩浆喷发、紧急疏散,中使馆提醒!岩浆喷发、紧急疏散,中使馆提醒!复旦大学重磅开源:AIGC图像检测方法的综合测评平台实例解析,肺动脉高压合并右心衰的综合管理《现在就出发》:没有边界感的综艺能好看?招聘 | BCG中国区2024年冬春季PTA招聘如何用Kubernetes实战快速搭建typecho个人博客?招聘 | BCG中国区2024社会招聘全面启动Github Copilot Chat 公测:已可在 Visual Studio 和 VS Code 中使用大模型总弄错「事实」怎么办?这有一份汇聚了300多篇文献的综述聊聊曼哈顿我喜欢的中餐馆教你如何在 Bash 脚本中使用强大的 Linux test 命令你爱刷的综艺,可能都是他们做的 | 重叙计划02国庆黄金周的"黄金",英文是 gold 还是 golden?是时候基于云重新设计 Kafka 了!AutoMQ 如何实现 Kafka 十倍的降本增效邻家王姐为我偷书 (二)BCG中国区执行合伙人吴淳博士荣登2023年《财富》中国最具影响力的商界女性榜单童年追忆 (四)golang string和[]byte的对比
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。