Redian新闻
>
KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用

KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用

公众号新闻

作者 | Guilherme Ferreira
译者 | 张卫滨
策划 | 丁晓昀

在本文中,我们将会探讨 KafkaFlow 提供提供的特性。如果你正在使用.NET 构建 Apache Kafka 生产者和消费者,那么本文将会介绍如何借助 KafkaFlow 来简化你的生活。

为何要关注它?

KafkaFlow 为 Confluent .NET Kafka 客户端提供了一个抽象层。它使得使用、维护和测试 Kafka 消费者和生产者均更加容易。

假设你要为市场营销活动创建一个客户端目录(Client Catalog)。我们需要一项服务来消费那些捕获新客户端的消息。当开始设计所需的服务时,你会发现现有的服务在如何消费消息方面并不一致。

常见的情形是,团队在解决一些简单的问题(如优雅关机)时,往往会陷入困境。你会发现整个组织有四种不同的 JSON 序列化器实现,这只是挑战之一。

采用 KafkaFlow 这样的框架能够简化流程并加快开发周期。KafkaFlow 拥有一系列旨在提升开发人员体验的特性:

  1. 中间件(Middleware):KafkaFlow 允许开发人员创建中间件来处理消息,从而实现对 Kafka 生产者 / 消费者管道的更多控制和定制。

  2. 处理器(Handler):引入了处理器的概念,允许开发人员将主题中的消息处理转发给专用消息类型的处理器。

  3. 反序列化算法(Deserialization Algorithms):提供了一套开箱即用的序列化和反序列化算法。

  4. 多线程消费者:提供了保证消息处理顺序的多线程功能,有助于优化系统资源的使用。

  5. 管理 API 和仪表盘:提供了 API 和仪表盘来管理消费者和消费者群组,可以在运行时进行暂停、恢复或倒回偏移。

  6. 消费者限流:提供了一种简便的方式,为主题的消费提供优先级。接下来,我们探讨一下这些特性,看看它们在解决类似问题方面的潜力。

KafkaFlow 生产者:简化消息的生成

我们从消息的生产者开始。

向 Kafka 中生成消息并不是什么高难的火箭科学。即便如此,KafkaFlow 还是为 Confluent 的.NET Kafka 客户端的生产者接口提供了更高级别的抽象,从而能够简化代码并提升可维护性。

下面是一个如何使用 KafkaFlow 生产者发送消息的样例:

await _producers["my-topic-events"]    .ProduceAsync("my-topic", message.Id.ToString(), message);

这样,我们就可以向 Kafka 生成消息,而无需直接处理序列化或底层 Kafka 客户端的其他复杂问题。不仅如此,定义和管理生产者还可以通过服务配置上的流畅接口(Fluent Interface)轻松实现。

services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddProducer(            "product-events",            producer =>                producer            ...        )    ));

生产者往往很简单,但也有一些常见的问题需要解决,比如压缩或序列化。我们来探讨一下。

在 KafkaFlow 中自定义
序列化 / 反序列化

在 Apache Kafka 中,一个很有吸引力的特性就是与数据格式无关。但是,这就将责任转移给了生产者和消费者。如果考虑不周全,可能会导致在整个系统中出现由多种方式实现同一种结果的现象。因此,序列化显然是一个由客户端框架处理的用例。

KafkaFlow 具有适用于 JSON、Protobuf 甚至 Avro 的序列化器。只需将它们添加到中间件配置中就可以使用。

.AddProducer<ProductEventsProducer>(producer => producer       ...       .AddMiddlewares(middlewares => middleware           ...           .AddSerializer<JsonMessageSerializer>()       ))

鉴于我们可以为消息使用自定义的序列化器 / 反序列化器,所以这个列表并不局限于这三种。虽然 Confluent 的.NET Kafka 客户端已经支持自定义序列化 / 反序列化,但 KafkaFlow 通过提供更优雅的处理方式简化了这一过程。举例来说,要使用自定义序列化器,我们可以这样做:

public class MySerializer : ISerializer{       public Task SerializeAsync(object message, Stream output, ISerializerContext context)       {             // 序列化逻辑在这里       }
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context) { // 反序列化逻辑在这里 }}
// 在设置 Kafka 消费者 / 生产者的时候,注册自定义的序列化器
.AddProducer<MyProducer>(producer => producer ... .AddMiddlewares(middlewares => middleware ... .AddSerializer<MySerializer>() ))
KafkaFlow 中的消息处理

消费者带来了大量的问题和可能性。第一个问题就是“如何处理消息?”

我们从最简单的方式开始。随着像 MediatR 这样的库的出现,CQRS 和 Meditor 模式得到了普及,.NET 开发人员习惯于将消息处理器与请求 / 消息接收器解耦。KafkaFlow 将同样的原则引入到了 Kafka 消费者中。

KafkaFlow 消息处理器允许开发人员定义特定的逻辑来处理来自 Kafka 主题的消息。按照设计,KafkaFlow 的消息处理器结构能够更好地分离关注点,并使代码更整洁、更易于维护。

如下是一个消息处理器的示例:

public class MyMessageHandler : IMessageHandler<MyMessageType>{    public Task Handle(IMessageContext context, MyMessageType message){        // 消息处理逻辑在这里    }}

这个处理器可以在消费者配置中进行注册:

.AddConsumer(consumer => consumer...       .AddMiddlewares(middlewares => middlewares           ...             .AddTypedHandlers(handlers => handlers                     .AddHandler<MyMessageHandler>()              )       ))

通过这种方式,可以轻松地将消费者和处理器分开,从而提升了可维护性和可测性。如果你的微服务只处理具有一种消息类型的一个主题,这可能会显得引入了不必要的复杂性。在这种情况下,你可以使用中间件。

KafkaFlow 中的中间件

KafkaFlow 是面向中间件的。你可能已经注意到,在消息处理器的代码片段中提到了“中间件”。所以,你可能会问什么是中间件。

中间件使得类型化处理器(Typed Handler)成为可能。消息会被传递到一个中间件管道,该管道将会被依次调用。如果你使用过 MediatR 管道的话,可能会对这一概念有所了解。此外,中间件还可以用来进行一系列的转换。换句话说,给定的中间件可以将传入的消息转换到下一个中间件。

KafkaFlow 中的中间件封装了处理消息的逻辑。管道是可扩展的,允许开发人员在消息处理管道中添加行为。

如下是一个中间件的样例:

public class MyMiddleware : IMessageMiddleware{    public async Task Invoke(IMessageContext context, MiddlewareDelegate next)    {         // 预处理逻辑位于这里                  await next(context);                   // 后处理逻辑位于这里           }}

要使用该中间件,可以在消费者配置中进行注册:

.AddConsumer(consumer => consumer       ...       .AddMiddlewares(middlewares => middlewares             ...             .Add<MyMiddleware>()         ))

通过这种方式,开发人员就可以在消息处理管道中插入自定义逻辑,从而提供灵活性和控制力。类型化处理器是中间件的一种形式。所以,你甚至可以在没有类型化处理器的情况下处理消息,实现自己的中间件,或者也可以使用中间件来构建消息管道,在处理消息之前执行校验、丰富化等操作。

在 KafkaFlow 中处理并发

一旦开始思考基础设施的效率,你就会发现许多 Kafka 消费者没有得到充分利用。最常见的实现方式是单线程的,这限制了资源的利用率。因此,当我们需要扩展的时候,只能进行横向扩展,以保持所需的吞吐量。

KafkaFlow 为实现基础设施的高效率带来了另外一种可选方案。KafkaFlow 让开发人员可以控制单个消费者可以并发处理多少消息。它使用了 Worker 的理念,这些 Worker 可以协同消费一个主题。这一功能能够让你优化 Kafka 消费者,使其更好地匹配系统的能力。

如下是一个如何为消费者设置并发 worker 数量的样例:

.AddConsumer(consumer => consumer.Topic("topic-name")       .WithGroupId("sample-group")       .WithBufferSize(100)       .WithWorkersCount(10) // 设置 worker 的数量       .AddMiddlewares(middlewares => middlewares        ...        ))

即便有并发 worker,KafkaFlow 也能确保顺序。

批处理

随着规模的扩大,你将会面临延迟和吞吐量之间的权衡。为了解决这个问题,KafkaFlow 有一个重要的特性,叫做“批量消费”。这个特性满足了以批量方式消费和处理来自 Kafka 的消息时对效率和性能的要求。在需要一起处理一组消息,而不是单个处理消息的场景下,该特性发挥着重要作用。

什么是批量消费?

在批量消费方式中,系统不是在收到消息后对其进行原子性地处理,而是将多条消息分组,然后一次性地对其进行处理。这种方法在处理大量数据时更为有效,尤其是在消息相互独立的情况下。批量执行操作会提高整体性能。

KafkaFlow 的批量消费方式

KafkaFlow 利用中间件系统提供批量处理功能。批量处理中间件能够让你根据批量大小或时间跨度(timespan)对消息进行分组。一旦达到其中的某个条件,中间件就会将这组消息转发给下一个中间件。

services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddConsumer(            consumerBuilder => consumerBuilder            ...            .AddMiddlewares(                middlewares => middlewares                    ...                    .BatchConsume(100, TimeSpan.FromSeconds(10))                    .Add<HandlingMiddleware>()            )        )    ));
批量消费对性能的影响

通过批量处理,开发人员可以在基于 Kafka 的应用程序中实现更高的吞吐量。它可以加快处理速度,因为与启动和完成每个处理任务相关的开销会大大减少。这将全面提升系统的性能。

同时,这种方式还能减少网络 I/O 操作,因为数据是以更大的分块获取的,这能够进一步提高处理速度,尤其是在需要关注网络延迟的系统中。

KafkaFlow 的消费者管理

KafkaFlow 还简化了 Kafka 消费者管理相关的任务。通过 KafkaFlow 的管理 API,我们可以启动、停止、暂停消费者以及倒回偏移(rewind offset)。

管理 API 可以在编程接口、REST API 或 Dashboard UI 中使用。

KafkaFlow 的管理仪表盘

消费者限流

通常,底层技术可能无法像 Kafka 消费者那样以相同的方式应对高负载期。这会带来稳定性的问题,而这正是限流的用武之地。

消费者限流是一种管理消息消费的方式,它能够使应用程序根据指标动态调整消息消费的速度。

优先级

假设你正在运行一个应用程序,希望将原子操作和批量操作分隔到不同的消费者和主题中。与批量操作相比,你可能更愿意优先处理原子操作。按照传统方式,由于消息生成的速度可能存在差异,所以管理这种差异化可能很具挑战性。

在这种情况下,消费者限流就很有价值了,它允许我们监控那些负责原子操作的消费者的滞后(lag)情况。根据这一指标,我们可以对处理批量操作的消费者实施限流,确保优先处理原子操作。

那结果是什么呢?高效、灵活和优化的消费流程。

借助 KafkaFlow 的流畅接口,为消费者添加限流功能是非常简单的。下面是一个简单的样例:

.AddConsumer(    consumer => consumer        .Topic("bulk-topic")        .WithName("bulkConsumer")        .AddMiddlewares(            middlewares => middlewares                .ThrottleConsumer(                    t => t                        .ByOtherConsumersLag("singleConsumer")                        .WithInterval(TimeSpan.FromSeconds(5))                        .AddAction(a => a.AboveThreshold(10).ApplyDelay(100))                        .AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))                        .AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))                .AddSerializer<JsonCoreSerializer>()        ))
KafkaFlow:展望未来

目前,KafkaFlow 在 Kafka 的基础上提供了一个健壮的、对开发人员友好的抽象,简化了使用.NET 构建实时数据处理应用程序的过程。但是,与其他活跃的开源项目一样,KafkaFlow 也在不断演进和完善。

从项目目前的发展轨迹来看,我们可以预测几个方面的发展方向。例如,KafkaFlow 可能会进一步增强其中间件系统,为消息处理提供更多的控制权和灵活性。我们可能还会看到更广泛的管理 API,为开发人员提供对 Kafka 集群更大的控制权。

由于设计上的可扩展性,我们可以期待 KafkaFlow 社区会不断壮大,带来更多的贡献、创新特性、扩展和支持。随着越来越多的开发人员和组织采用 KafkaFlow,我们会看到学习资源、教程、案例和其他社区内容不断涌现,这些内容可以帮助新用户入门,也可以帮助现有的用户从库中学习更多的知识。

结   论

KafkaFlow 是一个便利、对开发人员友好的工具,它简化了在.NET 中使用 Kafka 的工作。在开发人员体验和可用性方面,它均表现出色。该框架的设计非常适合整洁、可读性强的代码。在 Apache Kafka 上构建应用程序时,KafkaFlow 通过中间件、消息处理器以及对复杂问题的抽象,实现了清晰的分离,这有助于保持代码库的可管理性和可理解性。

除此之外,围绕 KafkaFlow 的社区在不断壮大。如果你正在使用 Kafka 并希望提高生产力和可靠性,那 KafkaFlow 绝对值得考虑。

原文链接:

Building Kafka Event-Driven Applications with KafkaFlow(https://www.infoq.com/articles/kafkaflow-dotnet-framework/)

声明:本文为 InfoQ 翻译,未经许可禁止转载。

点击底部阅读原文访问 InfoQ 官网,获取更多精彩内容!

今日好文推荐
AutoGPT 宣布不再使用向量数据库!向量数据库是小题大作的方案?
下一代 Docker 来了!1小时构建缩至1.5分钟,还能结合 LangChain、Ollama 等做 AI 应用开发
苹果中国App Store将不允许未备案应用上架;iPhone 15发热严重,问题源于第三方软件?Meta又要裁员了 | Q资讯
微软裁员内幕

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
从单体到微服务的系统改造:采用事件驱动架构优化会员系统从Snowflake迁移到Databricks,成本下降50%?Snowflake被迫解释传字节成立AI应用新部门Flow;淘天集团筹建大模型团队;浪潮信息开源千亿级大模型丨AIGC大事日报Myriam Kryger on How Rivers Inspire the Flow of Art, Ideas独家|OpenAI超级对齐负责人Jan Leike:如何破解对齐难题?用可扩展监督Bengio等人88页新论文:构建有意识的AI没有明显障碍AI 工程化:构建高效 AI 应用的全面指南建信量化事件驱动基金合同终止,公募量化优胜劣汰加剧KEDA:基于事件驱动扩展K8S应用的深度实践OpenAI破解对齐难题?超级对齐负责人Jan Leike采访实录:「可扩展监督」是良策2023美国感恩节出门指南,这些地方会关门,出门前一定注意!!卡特兰,半水培萌出新芽新根遇见。记下。放弃支持Windows GPU、bug多,TensorFlow被吐槽:2.0后慢慢死去OpenAI被曝“在憋大招”:构建ChatGPT应用成本暴降95%草原诗人——邢奇字节跳动成立新部门Flow,发力AI应用层|36氪独家AI早知道|传字节成立AI应用新部门Flow;Azure AI云开发平台新增40个大模型英国学者:构建人类命运共同体,谱写人类未来新篇章Spring Cloud :打造可扩展的微服务网关科学治理人工智能:构建最佳治理框架无限量访问GPT-4!ChatGPT企业版来了,可扩展32k上下文,代码解释器随便用微服务 vs. 事件驱动架构:重新开始理解差异DoorDash如何通过重构缓存来提升性能和可扩展性特斯拉回应成都连撞11车事故;爆款短剧制作方否认8天利润过亿;字节跳动成立新部门Flow,发力AI应用层丨邦早报古斯塔夫·克林姆特(Gustav Klimt)的金色2024,寻找“拐点”|德赛西威:保持开放,构建可持续产业生态温哥华小哥哥小姐姐们的烈酒入门指南!附年度烈酒盛会信息!Python实战 | 使用 Python 和 TensorFlow 构建卷积神经网络(CNN)进行人脸识别杀人偿命,欠债还钱!构建可比的交易信号Bengio等人88页新论文:构建有意识的AI没有明显障碍!全文发布:《共建“一带一路”:构建人类命运共同体的重大实践》Facebook 的全球网络大揭秘:构建社交帝国的科技奇迹《共建“一带一路”:构建人类命运共同体的重大实践》白皮书发布
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。