golang中使用kafka的综合指南
介绍
kafka是一个比较流行的分布式、可拓展、高性能、可靠的流处理平台。在处理kafka的数据时,这里有确保处理效率和可靠性的多种最佳实践。本文将介绍这几种实践方式,并通过sarama实现他们。
以下是一些kafka消费的最佳实践:
选择合适的提交策略:Kafka提供两种提交策略,自动和手动。虽然自动操作很容易使用,但它可能会导致数据丢失或重复。手动提交提供了更高级别的控制,确保消息至少处理一次或恰好一次,具体取决于用例。
尽可能减少Kafka的传输次数:大批量读取消息可以显著提高吞吐量。这可以通过调整 fetch.min.bytes 和 fetch.max.wait.ms 等参数来实现。
尽可能使用消费者组:Kafka允许多个消费者组成一个消费者组来并行消费数据。这使得 Kafka 能够将数据分发给一个组中的所有消费者,从而实现高效的数据消费。
调整消费者缓冲区大小:通过调整消费者的缓冲区大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根据消息的预期大小和消费者的内存容量进行调整。这可以提高消费者的表现。
处理rebalance:当新的消费者加入消费者组,或者现有的消费者离开时,Kafka会触发rebalance以重新分配负载。在此过程中,消费者停止消费数据。因此,快速有效地处理重新平衡可以提高整体吞吐量。
监控消费者:使用 Kafka 的消费者指标来监控消费者的性能。定期监控可以帮助我们识别性能瓶颈并调整消费者的配置。
选择合适的提交策略
1.自动提交
Sarama 的 ConsumerGroup 默认情况下会自动提交偏移量。这意味着它会定期提交已成功消费的消息的偏移量,这允许消费者在重新启动或消费失败时从中断的地方继续。
下面是一个自动提交的消费者组消费消息的例子:
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Panicf( "创建消费者组客户端时出错: %v" , err)
}
Consumer := Consumer{}
ctx := context.Background()
for {
err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)
if err != nil {
log.Panicf( "来自消费者的错误: %v" , err)
}
}
根据config.Consumer.Offsets.AutoCommit.Interval
可以看到,消费者会每秒自动提交offset。
2. 手动提交
手动提交使我们更好地控制何时提交消息偏移量。下面是一个手动提交的消费者组消费消息的例子:
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
config.Consumer.Offsets.AutoCommit.Enable = false
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)
if err != nil {
log.Panicf( "创建消费者组客户端时出错: %v" , err)
}
Consumer := Consumer{}
ctx := context.Background()
for {
err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)
if err != nil {
log.Panicf( "Error from Consumer: %v" , err)
}
}
type Consumer struct {}
func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil }
func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {
for msg : = range Claim.Messages() {
fmt.Printf( "Message topic:%q partition:%d offset:%d\n" , msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "" )
}
return nil
}
在该示例中, 使用MarkMessage手动将消息标记为已处理,最终根据Consumer.Offsets.CommitInterval
配置提交。另外这个例子省略了错误处理部分,开发时需要注意正确处理生产过程中出现的错误。
译者注:这篇文章虽然是今年5月发布,但是这里的提交方式还是有些过时了,目前sarama已经废弃了
Consumer.Offsets.CommitInterval
,相关配置目前在Consumer.Offsets.AutoCommit
尽可能减少Kafka的传输次数
减少kafka的传输次数可以通过优化从kafka中读取和写入数据的方式来实现:
1. 增加批次的大小
使用kafka批量发送消息的效果优于逐个发送消息,批次越大,kafka发送数据效率就越高。但是需要权衡延迟和吞吐量之间的关系。较大的批次虽然代表着更大的吞吐量,但也会增加延迟。因为批次越大,填充批次的时间也越久。
在Go中,我们可以在使用sarama包生成消息时设置批次大小:
config := sarama.NewConfig()
config.Producer.Flush.Bytes = 1024 * 1024
以及获取消息的批次大小
config := sarama.NewConfig()
config.Consumer.Fetch.Default = 1024 * 1024
2. 使用长轮询
长轮询是指消费者轮询时如果Kafka中没有数据,则消费者将等待数据到达。这减少了往返次数,因为消费者不需要在没有数据时不断请求数据。
config := sarama.NewConfig()
config .Consumer.MaxWaitTime = 500 *time.Millisecond
该配置告诉消费者在返回之前会等待500毫秒
3. 尽可能使用消费者组
消费者组是一组协同工作消费来自kafka主题的消息的消费者。消费者组允许我们在多个消费者之间分配消息,从而提供横向拓展能力。使用消费者组时,kafka负责将分区分配给组中的消费者,并确保每个分区同时仅被一个消费者消费。
接下来是sarama中消费者组的使用:
使用消费者组需要实现一个
ConsumerGroupHandler
接口:
该接口具有三个方法: Setup
、Cleanup
、 和ConsumeClaim
type exampleConsumerGroupHandler struct {
}
func (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {
for message := range Claim.Messages() {
fmt.Printf( "Message: %s\n" , string (message.Value))
session.MarkMessage(message, "" )
}
返回 nil
}
创建
sarama.ConsumerGroup
并开始消费:
brokers := []string{"localhost:9092"}
topic := "example_topic"
groupID := "example_consumer_group"
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumerGroup.Close()
handler := &exampleConsumerGroupHandler{}
for {
err := consumerGroup.Consume(context.Background(), []string{topic}, handler)
if err != nil {
log.Printf("Error consuming messages: %v", err)
}
}
该示例设置了一个消费组,用于消费来自“example_topic”的消息。消费者组可以通过添加更多消费者来提高处理能力。
使用消费者组时,记得处理消费期间rebalance和错误。
调整消费者缓冲区大小
在sarama中,我们可以调整消费者缓冲区的大小,以调整消费者在处理消息之前可以在内存中保存的消息数量。
默认情况下,缓冲区大小设置为256,这代表Sarama在开始处理消息之前将在内存中保存最多256条消息。如果消费者速度很慢,增加缓冲区大小可能有助于提高吞吐量。但是,更大的缓冲区也会消耗更多的内存。
以下是如何增加缓冲区大小的例子:
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.ChannelBufferSize = 500
group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config)
if err != nil {
panic(err)
}
ctx := context.Background()
for {
topics := []string{topic}
handler := exampleConsumerGroupHandler{}
err := group.Consume(ctx, topics, &handler)
if err != nil {
panic(err)
}
}
处理rebalance
当新消费者添加到消费者组或现有消费者离开消费者组时,kafka会重新平衡该组中的消费者。rebalance是kafka确保消费者组中的所有消费者不会消费同一分区的保证。
在sarama中,处理rebalance是通过 Setup 和CleanUp函数来完成的。
通过正确处理重新平衡事件,您可以确保应用程序正常处理消费者组的更改,例如消费者离开或加入,并且在这些事件期间不会丢失或处理两次消息。
译者注:其实更重要的是在ConsumeClaim函数在通道关闭时尽早退出,才能正确的进入CleanUp函数。
监控消费者
监控Kafka消费者对于确保系统的健康和性能至关重要,我们需要时刻关注延迟、处理时间和错误率的指标。
Golang没有内置对 Kafka 监控的支持,但有几个库和工具可以帮助我们。让我们看一下其中的一些:
Sarama的Metrics:Sarama 提供了一个指标注册表,它报告了有助于监控的各种指标,例如请求、响应的数量、请求和响应的大小等。这些指标可以使用 Prometheus 等监控系统来收集和监控。
JMX Exporter:如果您在 JVM 上运行 Kafka, 则可以使用 JMX Exporter 将kafka的 MBeans 发送给Prometheus
Kafka Exporter:Kafka Exporter是一个第三方工具,可以提供有关Kafka的更详细的指标。它可以提供消费者组延迟,这是消费kafka消息时要监控的关键指标。
Jaeger 或 OpenTelemetry:这些工具可用于分布式追踪,这有助于追踪消息如何流经系统以及可能出现瓶颈的位置。
日志:时刻关注应用程序日志,记录消费者中的任何错误或异常行为。这些日志可以帮助我们诊断问题。
消费者组命令, 可以使用
kafka-consumer-groups
命令来描述消费者组的状态。
请记住,不仅要追踪这些指标,还要针对任何需要关注的场景设置警报。通过这些方法,我们可以在问题还在初始阶段时快速做出响应。
以上工作有助于确保使用kafka的应用程序健壮、可靠且高效。
链接:https://juejin.cn/post/7306756690728419354
(版权归原作者所有,侵删)
微信扫码关注该文公众号作者