KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用
在本文中,我们将会探讨 KafkaFlow 提供提供的特性。如果你正在使用.NET 构建 Apache Kafka 生产者和消费者,那么本文将会介绍如何借助 KafkaFlow 来简化你的生活。
KafkaFlow 为 Confluent .NET Kafka 客户端提供了一个抽象层。它使得使用、维护和测试 Kafka 消费者和生产者均更加容易。
假设你要为市场营销活动创建一个客户端目录(Client Catalog)。我们需要一项服务来消费那些捕获新客户端的消息。当开始设计所需的服务时,你会发现现有的服务在如何消费消息方面并不一致。
常见的情形是,团队在解决一些简单的问题(如优雅关机)时,往往会陷入困境。你会发现整个组织有四种不同的 JSON 序列化器实现,这只是挑战之一。
采用 KafkaFlow 这样的框架能够简化流程并加快开发周期。KafkaFlow 拥有一系列旨在提升开发人员体验的特性:
中间件(Middleware):KafkaFlow 允许开发人员创建中间件来处理消息,从而实现对 Kafka 生产者 / 消费者管道的更多控制和定制。
处理器(Handler):引入了处理器的概念,允许开发人员将主题中的消息处理转发给专用消息类型的处理器。
反序列化算法(Deserialization Algorithms):提供了一套开箱即用的序列化和反序列化算法。
多线程消费者:提供了保证消息处理顺序的多线程功能,有助于优化系统资源的使用。
管理 API 和仪表盘:提供了 API 和仪表盘来管理消费者和消费者群组,可以在运行时进行暂停、恢复或倒回偏移。
消费者限流:提供了一种简便的方式,为主题的消费提供优先级。接下来,我们探讨一下这些特性,看看它们在解决类似问题方面的潜力。
我们从消息的生产者开始。
向 Kafka 中生成消息并不是什么高难的火箭科学。即便如此,KafkaFlow 还是为 Confluent 的.NET Kafka 客户端的生产者接口提供了更高级别的抽象,从而能够简化代码并提升可维护性。
下面是一个如何使用 KafkaFlow 生产者发送消息的样例:
await _producers["my-topic-events"]
.ProduceAsync("my-topic", message.Id.ToString(), message);
这样,我们就可以向 Kafka 生成消息,而无需直接处理序列化或底层 Kafka 客户端的其他复杂问题。不仅如此,定义和管理生产者还可以通过服务配置上的流畅接口(Fluent Interface)轻松实现。
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "host:9092" })
.AddProducer(
"product-events",
producer =>
producer
...
)
)
);
生产者往往很简单,但也有一些常见的问题需要解决,比如压缩或序列化。我们来探讨一下。
序列化 / 反序列化
在 Apache Kafka 中,一个很有吸引力的特性就是与数据格式无关。但是,这就将责任转移给了生产者和消费者。如果考虑不周全,可能会导致在整个系统中出现由多种方式实现同一种结果的现象。因此,序列化显然是一个由客户端框架处理的用例。
KafkaFlow 具有适用于 JSON、Protobuf 甚至 Avro 的序列化器。只需将它们添加到中间件配置中就可以使用。
.AddProducer<ProductEventsProducer>(producer => producer
...
.AddMiddlewares(middlewares => middleware
...
.AddSerializer<JsonMessageSerializer>()
)
)
鉴于我们可以为消息使用自定义的序列化器 / 反序列化器,所以这个列表并不局限于这三种。虽然 Confluent 的.NET Kafka 客户端已经支持自定义序列化 / 反序列化,但 KafkaFlow 通过提供更优雅的处理方式简化了这一过程。举例来说,要使用自定义序列化器,我们可以这样做:
public class MySerializer : ISerializer
{
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
{
// 序列化逻辑在这里
}
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
// 反序列化逻辑在这里
}
}
// 在设置 Kafka 消费者 / 生产者的时候,注册自定义的序列化器
.AddProducer<MyProducer>(producer => producer
...
.AddMiddlewares(middlewares => middleware
...
.AddSerializer<MySerializer>()
)
)
消费者带来了大量的问题和可能性。第一个问题就是“如何处理消息?”
我们从最简单的方式开始。随着像 MediatR 这样的库的出现,CQRS 和 Meditor 模式得到了普及,.NET 开发人员习惯于将消息处理器与请求 / 消息接收器解耦。KafkaFlow 将同样的原则引入到了 Kafka 消费者中。
KafkaFlow 消息处理器允许开发人员定义特定的逻辑来处理来自 Kafka 主题的消息。按照设计,KafkaFlow 的消息处理器结构能够更好地分离关注点,并使代码更整洁、更易于维护。
如下是一个消息处理器的示例:
public class MyMessageHandler : IMessageHandler<MyMessageType>
{
public Task Handle(IMessageContext context, MyMessageType message)
{
// 消息处理逻辑在这里
}
}
这个处理器可以在消费者配置中进行注册:
.AddConsumer(consumer => consumer
...
.AddMiddlewares(middlewares => middlewares
...
.AddTypedHandlers(handlers => handlers
.AddHandler<MyMessageHandler>()
)
)
)
通过这种方式,可以轻松地将消费者和处理器分开,从而提升了可维护性和可测性。如果你的微服务只处理具有一种消息类型的一个主题,这可能会显得引入了不必要的复杂性。在这种情况下,你可以使用中间件。
KafkaFlow 是面向中间件的。你可能已经注意到,在消息处理器的代码片段中提到了“中间件”。所以,你可能会问什么是中间件。
中间件使得类型化处理器(Typed Handler)成为可能。消息会被传递到一个中间件管道,该管道将会被依次调用。如果你使用过 MediatR 管道的话,可能会对这一概念有所了解。此外,中间件还可以用来进行一系列的转换。换句话说,给定的中间件可以将传入的消息转换到下一个中间件。
KafkaFlow 中的中间件封装了处理消息的逻辑。管道是可扩展的,允许开发人员在消息处理管道中添加行为。
如下是一个中间件的样例:
public class MyMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
// 预处理逻辑位于这里
await next(context);
// 后处理逻辑位于这里
}
}
要使用该中间件,可以在消费者配置中进行注册:
.AddConsumer(consumer => consumer
...
.AddMiddlewares(middlewares => middlewares
...
.Add<MyMiddleware>()
)
)
通过这种方式,开发人员就可以在消息处理管道中插入自定义逻辑,从而提供灵活性和控制力。类型化处理器是中间件的一种形式。所以,你甚至可以在没有类型化处理器的情况下处理消息,实现自己的中间件,或者也可以使用中间件来构建消息管道,在处理消息之前执行校验、丰富化等操作。
一旦开始思考基础设施的效率,你就会发现许多 Kafka 消费者没有得到充分利用。最常见的实现方式是单线程的,这限制了资源的利用率。因此,当我们需要扩展的时候,只能进行横向扩展,以保持所需的吞吐量。
KafkaFlow 为实现基础设施的高效率带来了另外一种可选方案。KafkaFlow 让开发人员可以控制单个消费者可以并发处理多少消息。它使用了 Worker 的理念,这些 Worker 可以协同消费一个主题。这一功能能够让你优化 Kafka 消费者,使其更好地匹配系统的能力。
如下是一个如何为消费者设置并发 worker 数量的样例:
.AddConsumer(consumer => consumer
.Topic("topic-name")
.WithGroupId("sample-group")
.WithBufferSize(100)
.WithWorkersCount(10) // 设置 worker 的数量
.AddMiddlewares(middlewares => middlewares
...
)
)
即便有并发 worker,KafkaFlow 也能确保顺序。
随着规模的扩大,你将会面临延迟和吞吐量之间的权衡。为了解决这个问题,KafkaFlow 有一个重要的特性,叫做“批量消费”。这个特性满足了以批量方式消费和处理来自 Kafka 的消息时对效率和性能的要求。在需要一起处理一组消息,而不是单个处理消息的场景下,该特性发挥着重要作用。
在批量消费方式中,系统不是在收到消息后对其进行原子性地处理,而是将多条消息分组,然后一次性地对其进行处理。这种方法在处理大量数据时更为有效,尤其是在消息相互独立的情况下。批量执行操作会提高整体性能。
KafkaFlow 利用中间件系统提供批量处理功能。批量处理中间件能够让你根据批量大小或时间跨度(timespan)对消息进行分组。一旦达到其中的某个条件,中间件就会将这组消息转发给下一个中间件。
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "host:9092" })
.AddConsumer(
consumerBuilder => consumerBuilder
...
.AddMiddlewares(
middlewares => middlewares
...
.BatchConsume(100, TimeSpan.FromSeconds(10))
.Add<HandlingMiddleware>()
)
)
)
);
通过批量处理,开发人员可以在基于 Kafka 的应用程序中实现更高的吞吐量。它可以加快处理速度,因为与启动和完成每个处理任务相关的开销会大大减少。这将全面提升系统的性能。
同时,这种方式还能减少网络 I/O 操作,因为数据是以更大的分块获取的,这能够进一步提高处理速度,尤其是在需要关注网络延迟的系统中。
KafkaFlow 还简化了 Kafka 消费者管理相关的任务。通过 KafkaFlow 的管理 API,我们可以启动、停止、暂停消费者以及倒回偏移(rewind offset)。
管理 API 可以在编程接口、REST API 或 Dashboard UI 中使用。
KafkaFlow 的管理仪表盘
通常,底层技术可能无法像 Kafka 消费者那样以相同的方式应对高负载期。这会带来稳定性的问题,而这正是限流的用武之地。
消费者限流是一种管理消息消费的方式,它能够使应用程序根据指标动态调整消息消费的速度。
假设你正在运行一个应用程序,希望将原子操作和批量操作分隔到不同的消费者和主题中。与批量操作相比,你可能更愿意优先处理原子操作。按照传统方式,由于消息生成的速度可能存在差异,所以管理这种差异化可能很具挑战性。
在这种情况下,消费者限流就很有价值了,它允许我们监控那些负责原子操作的消费者的滞后(lag)情况。根据这一指标,我们可以对处理批量操作的消费者实施限流,确保优先处理原子操作。
那结果是什么呢?高效、灵活和优化的消费流程。
借助 KafkaFlow 的流畅接口,为消费者添加限流功能是非常简单的。下面是一个简单的样例:
.AddConsumer(
consumer => consumer
.Topic("bulk-topic")
.WithName("bulkConsumer")
.AddMiddlewares(
middlewares => middlewares
.ThrottleConsumer(
t => t
.ByOtherConsumersLag("singleConsumer")
.WithInterval(TimeSpan.FromSeconds(5))
.AddAction(a => a.AboveThreshold(10).ApplyDelay(100))
.AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))
.AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))
.AddSerializer<JsonCoreSerializer>()
)
)
目前,KafkaFlow 在 Kafka 的基础上提供了一个健壮的、对开发人员友好的抽象,简化了使用.NET 构建实时数据处理应用程序的过程。但是,与其他活跃的开源项目一样,KafkaFlow 也在不断演进和完善。
从项目目前的发展轨迹来看,我们可以预测几个方面的发展方向。例如,KafkaFlow 可能会进一步增强其中间件系统,为消息处理提供更多的控制权和灵活性。我们可能还会看到更广泛的管理 API,为开发人员提供对 Kafka 集群更大的控制权。
由于设计上的可扩展性,我们可以期待 KafkaFlow 社区会不断壮大,带来更多的贡献、创新特性、扩展和支持。随着越来越多的开发人员和组织采用 KafkaFlow,我们会看到学习资源、教程、案例和其他社区内容不断涌现,这些内容可以帮助新用户入门,也可以帮助现有的用户从库中学习更多的知识。
KafkaFlow 是一个便利、对开发人员友好的工具,它简化了在.NET 中使用 Kafka 的工作。在开发人员体验和可用性方面,它均表现出色。该框架的设计非常适合整洁、可读性强的代码。在 Apache Kafka 上构建应用程序时,KafkaFlow 通过中间件、消息处理器以及对复杂问题的抽象,实现了清晰的分离,这有助于保持代码库的可管理性和可理解性。
除此之外,围绕 KafkaFlow 的社区在不断壮大。如果你正在使用 Kafka 并希望提高生产力和可靠性,那 KafkaFlow 绝对值得考虑。
原文链接:
Building Kafka Event-Driven Applications with KafkaFlow(https://www.infoq.com/articles/kafkaflow-dotnet-framework/)
声明:本文为 InfoQ 翻译,未经许可禁止转载。
点击底部阅读原文访问 InfoQ 官网,获取更多精彩内容!
微信扫码关注该文公众号作者