Redian新闻
>
通过 TCP/IP 每分钟发送数十亿条消息

通过 TCP/IP 每分钟发送数十亿条消息

公众号新闻

作者 | George Ball
译者 | 明知山
策划 | 丁晓昀

这个问题最常见的解决方案是定义一种可以被不同进程(甚至不同编程语言)理解的“规范”数据表示,并在发送数据之前将其转换为这种格式,然后在接收到数据后再转换为接收方自己的格式。现在已经有几种这样的“有线格式”,从基于文本的标准,如 YAML、JSON 或 XML,到二进制格式,如 Protobuf。

在 Chronicle 公司,我们开发了许多库来支持基于低延迟消息传递的应用程序,主要用于金融服务行业。我们为来自世界各地的客户提供定制的解决方案开发和咨询服务,其中大多数客户来自金融领域。

其中一个库是 Chronicle Wire,它为 Java 对象的内部 JVM 表示和持久化状态(或与其他 Java 进程通信的格式)之间提供了高性能转换。

Chronicle Wire 源于 Chronicle Queue 项目。在每秒数百万条消息的规模下,Chronicle Queue 可以为同一机器不同 JVM 之间的消息传递提供个位数的微秒级延迟,或者为不同的机器之间提供数十微秒的稳定延迟。Wire 现在成为 Chronicle 开发的大多数软件组件的关键组成部分,从用于组件之间对象状态的序列化和反序列化,到用于管理这些组件配置的高效模型。

随着软件架构越来越多地转向分布式、基于事件驱动,我们希望扩展 Chronicle Wire 的使用场景,支持组件之间的 TCP/IP 互连。本文将对此特性进行基本的概述,并提供一些如何使用这些特性的简单示例。

我们已经看到了一些令人吃惊的性能数据——例如,Peter Lawrey 的文章“如果不创建很多对象,Java 其实非常快”提供的一个基准测试显示,在一台机器上构建的环回 TCP/IP 网络每分钟能够传递超过 40 亿个事件。

我们将其与其他类似的数据交换技术(特别是 Jackson 和 BSON)进行了基准测试比较。在处理 100 字节消息的测试中,使用 Chronicle Wire,每条消息的 99.99 可用性处理时间约为 10.5 微秒,而使用 Jackson/BSON 则为 1400 微秒,这是一个显著的差异。

我们将在本文中介绍一些相关的关键概念。我们正在将这些特性设计得既灵活又高效,以后的文章将会展示一些更高级的用例。

什么是 Chronicle Wire

Chronicle Wire 作为应用程序和字节流之间的一个层,充当数据的源或接收器。Wire 序列化 Java 对象的状态并将其放到字节流中,或者从字节流中读取字节序列并基于消息中携带的信息将其反序列化成 Java 对象。

我们来看一个简单的例子。我们将模拟 Java 对象的持久化,将对象的状态序列化到 Wire,然后再读取成对象。我们将使用一个叫作 Person 的类。

public class Person extends SelfDescribingMarshallable {   private String name;   @NanoTime   private long timestampNS;   @Base85   private long userName;
}

这个类的完整代码可以在 Chronicle Wire 的 Github 代码库中找到。

父类型 SelfDescribingMarshallable 包含与 Wire 交互所必需的功能——它大致相当于 Java 序列化所使用的 java.io.Serializable 接口,不过它更强大并且不存在安全缺陷。SelfDescribingMarshallable 对象不需要额外的东西来支持编组和解组——比如 XML 的模式或 Protobuf(或 SBE)的代码生成器。此外,这个接口还提供了 Java 数据对象方法 equals()、hashcode() 和 toString() 的实现。

Chronicle Wire 使用 @NanoTime 注解将属性值编码为时间戳,使用 @Base85 注解编码短字符串来节省空间。这两个注解还提供了从紧凑的内部表示到友好的字符串表示的转换。

我们来创建一个 Chronicle Wire 实例,它将使用 Java 堆中的一个内存区域将对象编组成 YAML 并解组。

Wire yWire = Wire.newYamlWireOnHeap();

创建和初始化 Person 实例:

Person p1 = new Person()       .name("George Ball")       .timestampNS(CLOCK.currentTimeNanos())       .userName(Base85.INSTANCE.parse("georgeb"));System.out.println("p1: " + p1);

我们使用了重载方法和链式方法而不是 get…() 和 set…() 方法来访问和修改对象属性。代码打印出了 Person 对象的初始化状态,调用了父类 SelfDescribingMarshallable 的 toString() 方法:

p1: !Person {  name: George Ball,  timestampNS: 2022-11-11T10:11:26.1922124,  userName: georgeb}

现在我们将对象序列化到 Wire。因为创建的 Wire 使用了 YAML,所以可以很容易显示其中的内容:

Wire yWire = Wire.newYamlWireOnHeap();p1.writeMarshallable(yWire);System.out.println(yWire);

我们可以看到被序列化的属性:

name: George BalltimestampNS: 2022-11-11T10:11:54.7071341userName: georgeb

现在我们可以创建一个空的 Person 实例,然后用从 Wire 中回读的属性值来填充它,并将其打印出来:

Person p2 = new Person();p2.readMarshallable(yWire);System.out.println("p2: " + p2);

从输出可以看到,新创建的对象的状态是对的:

p2: !Person {  name: George Ball,  timestampNS: 2022-11-11T10:13:29.388,  userName: georgeb}

完整的代码可以在 Chronicle Wire 的 Github 代码库 中找到。

MethodWriter 和 MethodReader

通常,使用 Wire 序列化和反序列化的对象都是与我们的应用程序相关的某种类型的数据。如果使用 Chronicle Queue 作为消息传输,那么这些对象将构成消息的有效负荷,我们把它们叫作数据传输对象(Data Transfer Object,DTO)。

我们也可以从不同的角度来看待这个功能。序列化的 Person 对象包含了 YAML 格式的属性:

name: George BalltimestampNS: 2022-11-11T10:11:54.7071341userName: georgeb

如果再进一步,我们可以使用 Wire 编码和发送请求来调用带有参数的方法。由于消息传输的单向性,这些方法必须是 viod 的,即不能返回值。我们假设有一个可以操作 Person 对象的接口,暂时还没有提供方法的实现:

public interface PersonOps {   void addPerson(Person p);}

为简单起见,这里只指定了一个方法。它只接受一个 Person 类型的参数,并将其添加到集合中。根据前面的示例,我们或许会将这个类的实例编码为:

addPerson: {  name: George Ball,  timestampNS: 2022-11-11T10:11:54.7071341,  userName: georgeb}

然后解码为方法调用的形式:

personOps.addPerson(       Marshallable.fromString(Person.class, "" +               "name: Alice Smithl\n" +               "timestampNS: 2022-11-11T10:11:54.7071341\n" +               "userName: alices\n"));

Chronicle Wire 提供了这种对方法调用进行编码和解码的能力。发送方使用 MethodWriter 类,接收方使用 MethodReader 类。

例如,对于上面显示的 PersonOps 类,我们可以创建一个 MethodWriter:

final PersonOps personOps = yWire.methodWriter(PersonOps.class);

methodWriter 将返回一个包含 addPerson() 存根实现的接口实例,用于将调用请求编码到 Wire。我们可以这样调用这个方法:

personOps.addPerson(p1);
personOps.addPerson(new Person() .name("Bob Singh") .timestampNS(CLOCK.currentTimeNanos()) .userName(Base85.INSTANCE.parse("bobs")));

如果我们看一下 Wire,将会看到调用请求被编码成消息:

addPerson: {  name: Alice Smith,  timestampNS: 2022-11-11T10:11:54.7071341,  userName: alices}...addPerson: {  name: George Ball,  timestampNS: 2022-11-11T10:28:47.466,  userName: georgeb}...addPerson: {  name: Bob Singh,  timestampNS: 2022-11-11T10:28:48.3001121,  userName: bobs}...

在接收端,我们可以创建一个 MethodReader 对象,它将提供在解码时被调用的方法的实现:

MethodReader reader = yWire.methodReader(       (PersonOps) p -> System.out.println("added " + p));

当消息被读取和解码时,这个方法将被调用:

for (int i = 0; i < 3; i++)   reader.readOne();

当方法被调用时,我们将看到 System.out.println() 的输出:

added !Person {  name: Alice Smith,  timestampNS: 2022-11-11T10:11:54.7071341,  userName: alices}
added !Person { name: George Ball, timestampNS: 2022-11-11T10:28:47.466, userName: georgeb}
added !Person { name: Bob Jones, timestampNS: 2022-11-11T10:28:48.3001121, userName: bobj}

这看起来非常强大,因为它为我们提供了一种高度灵活和高效的方式来编码事件或消息,并将它们与处理程序关联起来。Wire 编码的灵活性都是可用的——文本格式或二进制格式——正如 Wire 许多不同类型的底层传输一样。

接下来,我们来了解一下基于 TCP/IP 网络通信的 Wire 传输将带来怎样的可能性。

概念简介

新的功能基于以下三个抽象概念。

Channel

Chronicle Channel 是对两个组件之间的双向点对点连接的抽象。在创建 Channel 时指定的通道类型定义了底层传输的类型。初始实现使用异步套接字或连接同一进程内两个端点的内部通道来支持 TCP/IP,主要目标是支持更高级的传输,如 GRPC、REST 或 WebSocket 等。

Channel 在两个组件之间来回传输被打包成 Chronicle Wire 消息的 Event。初始实现支持 TCP/IP 或“本地”(进程内)通道,但也可以为不同的传输定义 Chennel 类型。

Context

Context 是 Channel 的管理容器,负责管理 Channel 的配置和生命周期。

Handler

Handler 是与 Channel 绑定在一起的组件,它定义了如何处理传入的事件,以及如何传输传出(结果)事件。这样可以实现各种形式的会话管理。框架提供了许多预定义的 Handler,也支持自定义。

在建立连接时,一个 Handler 会与 Channel 相关联,通常由连接的“发起者”(即客户端)指定。

使用 Channel

我们来看一些实际的示例。

示例 1:Hello, World

一般来说,第一个示例是简单地打印“Hello”消息。代码注释中的编号表示关键点,并与下面的列表对应:

public class Channel1ReadWrite {
private static final String URL = System.getProperty("url", "tcp://:3334"); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL).name("Channel1"); // ===> (2) ChronicleChannel channel = context.newChannelSupplier(new EchoHandler()).get()) {
Jvm.startup().on(Channel1.class, "Channel set up on port: " + channel.channelCfg().port());
Says says = channel.methodWriter(Says.class); // ===> (3) says.say("Well hello there");
StringBuilder eventType = new StringBuilder(); // ===> (4) String text = channel.readOne(eventType, String.class); Jvm.startup().on(Channel1.class, ">>>> " + eventType + ": " + text);
} }}

1. 创建 Channel 的关键参数是一个 URL 字符串。目前只支持 TCP/IP 作为传输机制,但未来会支持更多的传输机制。这个字符串在 Chronicle Channel 中的含义如下表所示。

2. 我们使用 try-with-resources 来确保所有创建的组件在使用完以后都会被关闭。首先,我们创建 Context,用于管理 Channel 的生命周期和配置。Context 提供了一个工厂方法,可以用来创建新的 Channel。在请求新的 Channel 时,我们指定使用哪个 Handler 来处理传入的事件。在本例中,我们使用 EchoHandler,顾名思义,它会将事件发送回发送方。

为连接设置服务器端套接字所需的工作都由工厂方法完成,它返回的 Channel 可以被我们使用。

3. TCP/IP 是全双工协议,所以我们获得的 Channel 是双向的。我们可以通过 Channel 发送事件,使用下面的类生成的 MethodWriter:

public interface Says extends Syncable {   void say(String say);}

Says says = channel.methodWriter(Says.class);says.say("Well hello there");

4. 然后,我们可以使用 Chronicle Wire 从通道读取回传的事件并将其显示出来。当运行这个简单的示例时,我们可以看到这样的输出:

[main] INFO run.chronicle.wire.channel.demo1.Channel1 - Channel set up on port: 3334[main] INFO run.chronicle.wire.channel.demo1.Channel1 - >>>> say: Well hello there
示例 2:客户端和服务器端分离

第一个示例太过简单,因为它将客户端和服务器端的功能都放在同一个进程里。这对于测试或调试来说可能比较方便,但现在我们希望将它们分离到各自的进程中。我们来看看分离之后的服务器:

public class ChannelService {   static final int PORT = Integer.getInteger("port", 4441);
public static void main(String[] args) throws IOException { System.setProperty("port", "" + PORT); // set if not set. ChronicleGatewayMain.main(args); }}

由于我们使用了辅助类 ChronicleGatewayMain,代码变得非常简短。辅助类封装了设置服务器端(Channel 接收器)、移除模板代码和尽可能多地使用默认设置的功能。

客户端的代码如下所示,注释中的编号表示关键点:

public class ChannelClient {
private static final String URL = System.getProperty("url", "tcp://localhost:" + ChannelService.PORT); // ===> (1)
public static void main(String[] args) {
try (ChronicleContext context = ChronicleContext.newContext(URL).name("ChannelClient"); // ===> (2) ChronicleChannel channel = context.newChannelSupplier(new EchoHandler()).get()) {
Jvm.startup().on(ChannelClient.class, "Channel set up on port: " + channel.channelCfg().port()); Says says = channel.methodWriter(Says.class); // ===> (3) says.say("Well hello there");
StringBuilder eventType = new StringBuilder(); String text = channel.readOne(eventType, String.class);
Jvm.startup().on(ChannelClient.class, ">>>> " + eventType + ": " + text); } }
  1. URL 字符串包含了主机名和端口号,它告诉创建通道的逻辑我们正在初始化客户端通道。

  2. 根据 URL 字符串格式创建客户端 Context。在从客户端 Context 创建通道时,我们指定了在接收端使用哪个 Handler。

  3. 通道建立起来之后,剩下的代码与第一个示例中的代码一样。当客户端和服务器端应用程序同时运行起来时,输出如下所示:

[main] INFO run.chronicle.wire.channel.demo2.ChannelClient - Channel set up on port: 4441[main] INFO run.chronicle.wire.channel.demo2.ChannelClient - >>>> say: Well hello there
示例 3:简单的请求 / 响应交互

前面我们已经了解如何使用 Wire 的 MethodReader 和 MethodWriter 来实现请求进程外方法调用。现在,我们可以扩展这个示例,演示使用基于 TCP/IP 通道的 Wire 来实现类似于远程过程调用的服务请求 / 响应处理能力。

服务本身很简单,只提供了一个方法——我们的目的是演示构造服务和访问服务所需的步骤。

这个例子涉及四个部分:

  1. Service——根据输入和输出的消息类型实现业务逻辑。

  2. Channel Handler——将服务连接到底层的 Channel。

  3. Service Driver——作为服务器端的入口点,创建和配置服务和 Channel Handler。

  4. Client——一个单独的应用程序,创建和发送请求,并接收响应。

Service

服务提供了一个可以处理受支持请求的接口,其定义为:

public interface PersonOps {   void addPerson ( Person p );}

Person 类与之前定义的一样。

Chronicle 中的消息传递是单向的,所以服务 API 的方法是 void 的。因此,我们需要为响应消息定义第二个接口:

public interface ResponseSender {   void respond(ReqStatus status);}

ReqStatus 类表示方法是否执行成功,其定义为:

public enum ReqStatus {   OK,   ERROR}

这两个接口连接在一起形成了一个处理传入请求的 Handler:

public class PersonOpsProcessor implements PersonOpsHandler {   private transient ResponseSender responder;                                                  // ===> (1)   public PersonOpsProcessor responder(ResponseSender responseSender) {        // ===> (2)       this.responder = responseSender;       return this;   }   @Override   public void addPerson(Person p) {                                                                  // ===> (3)       responder.respond(ReqStatus.OK);   }}
  1. 这个字段将保存对该服务的输出的引用。

  2. 在本例中,ResponseSender 是通过 setter 方法注入的,当然也可以通过构造函数注入。

  3. 实现了 PersonOps 接口中的方法,为简单起见,它只发送一个成功的状态响应。

    Channel Handler

根据之前的概念简介,Channel Handler 的职责是处理在其关联通道上传递的消息 / 事件。

对于本例,我们需要定义一个类,它将通道上的传入消息分派给服务的 Handler,并将服务输出连接到通道:

public class PersonSvcHandler extends AbstractHandler<PersonSvcHandler> {                  // ===> (1)   private final PersonOpsHandler personOpsHandler;                                                       // ===> (2)   public PersonSvcHandler(PersonOpsHandler personOpsHandler) {                                  // ===> (3)       this.personOpsHandler = personOpsHandler;   }   public void run(ChronicleContext context, ChronicleChannel channel) {                           // ===> (4)       channel.eventHandlerAsRunnable(           personOpsHandler.responder(channel.methodWriter(ResponseSender.class))       ).run();   }   @Override   public ChronicleChannel asInternalChannel(ChronicleContext context,                             // ===> (5)                                                                          ChronicleChannelCfg channelCfg) {       throw new UnsupportedOperationException("Internal Channel not supported");   }}
  1. 基类实现了通用的平台功能,子类将为我们的服务提供定制的逻辑。

  2. 对 Handler 实现类的引用。

  3. PersonOpsHandler 是通过构造函数注入的。

  4. 当发起一个新的通道连接时,就会启动一个 Handler,并初始化必要的 MethodReader 和 MethodWriter 对象。这些逻辑被封装在 run() 方法中,每个发起的通道连接都会执行这一步。

  5. 在这个示例类中,我们显式禁止创建在内部通道运行的 Handler。

Service Driver

完成了这些步骤后,编写服务的驱动类就简单了,与之前的例子或多或少相同,就是使用 ChronicleGatewayMain 类来创建配置通道。

public class PersonSvcMain {   static final int PORT = Integer.getInteger("port", 7771);   public static void main(String... args) throws IOException {       System.setProperty("port", "" + PORT);       ChronicleGatewayMain.main(args);   }}
Client

要实现一个简单的 Person 服务客户端,我们可以创建一个通道,然后向服务发出请求。

public class PersonClient {   private static final String URL = System.getProperty("url", "tcp://localhost:" + PersonSvcMain.PORT);                           // ===> (1)   public static void main(String[] args) {       try (ChronicleContext context = ChronicleContext.newContext(URL)) {           ChronicleChannel channel = context.newChannelSupplier(new PersonSvcHandler(new PersonOpsProcessor()))      // ===> (2)                                                               .get();           final PersonOps personOps = channel.methodWriter(PersonOps.class);                                                               // ===> (3)           Person thePerson = new Person()                                                   .name("George")                                                   .timestampNS(SystemTimeProvider.CLOCK.currentTimeNanos())                                                   .userName(Base85.INSTANCE.parse("georgeb")));;           personOps.addPerson(thePerson);           StringBuilder evtType = new StringBuilder();           ReqStatus response = channel.readOne(evtType, ReqStatus.class);           Jvm.startup().on(PersonClient.class, " >>> " + evtType + ": " + response);       }   }}
  1. 默认情况下,URL 的端口号与服务器中配置的端口号一样。

  2. 创建 Channel,注入自定义 Handler 实例。

  3. 在创建好以后,我们就可以使用 Channel 的 MethodWriter 方法来生成存根方法,这些方法将向服务发送序列化的事件。

总   结

Chronicle Wire 增加了一些新功能,允许通过 TCP/IP 与其他组件通信。本文介绍了 Wire 实现这些功能的基本思想,并提供了一些简单的示例。

这种快速高效的通信在分布式服务架构中还有很多应用场景。除了本文的示例之外,Chronicle Wire 的 GitHub 项目库中还提供了其他示例。

作者简介

George Ball 目前在 Chronicle 公司的技术文档组工作,致力于构建 Chronicle 低延迟 Java 库和框架文档。在此之前,他是摩根士丹利 Java 平台工程团队的一员,在公司的 Java 基础设施向云端迁移过程中增强 Java 库,并改善开发者体验。他在分布式系统方面有超过 35 年的经验,特别是在 JVM 生态系统方面。

原文链接

https://www.infoq.com/articles/billions-messages-minute/

相关阅读:

TCP 协议已不适用现今的数据中心 (https://www.infoq.cn/article/hGEXgDo8C2wPTok21NUf )

性能提升 57% ,SMC-R 透明加速 TCP 实战解析 (https://www.infoq.cn/article/0v4hIYUxAH6QQXB8LkBh )

声明:本文为 InfoQ 翻译,未经许可禁止转载。

点击底部阅读原文访问 InfoQ 官网,获取更多精彩内容!

今日好文推荐

《2023 大语言模型综合能力测评报告》出炉:以文心一言为代表的国内产品即将冲出重围

免费版“Github Copilot”,编程能力还翻倍?!谷歌硬刚微软,推出全新Colab编程平台

百度回应 Bing 成中国桌面搜索第一;阿里回应大裁员传闻;文心一言市场负责人怒怼科大讯飞|Q资讯

中国的“贝尔实验室”:我们的数据库从内核的第一行代码写起

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
失踪4年突然出现!14岁“高功能”自闭症少女留下纸条消失 如今……数十亿规模认知障碍筛查井喷在即,这四地模式值得一看101岁杨振宁最新露面,两分钟发言视频公布!信号通路的“导航”数据库!这样做分分钟发高分SCI……(快收藏)FBI警告:美国数十亿邮箱用户存在风险隐患 揭开血汗钱被骗黑幕……格力董明珠花数十亿收购后,银隆净资产去年缩水19亿元哈佛大学用这15张图,教育了数十亿人2022南美南极行(29)南极之旅SpaceX星舰发射推迟至4月20日,发射前9分钟发现压力阀故障;贫困是美国第四大死因|环球科学要闻震惊!澳国防军被指控浪费数十亿澳元!数十架战机停在美军基地吃灰!在雷暴中起飞苹果FBAR滤波器订单给了博通,价值数十亿美元加州黑人赔偿历史性投票通过!要求赔款数十亿!带你全方位了解非裔赔偿案小朋友尴尬的撤回了一条消息数十亿英镑!捷豹、路虎母公司将在英国打造电动汽车电池厂!英国4月零售销售额超预期增长!这条消息,王老师再也看不到了…六十一 减租减息意大利遭百年难遇洪灾,13人死亡,2万人转移,损失数十亿欧元!这条消息赶在端午前披露了,让全国中老年人直接受益!紧急寻人!墨尔本莫纳什大学一名中国女留学生失联,请帮忙扩散这条消息!谷歌用机器人大规模删除代码:二十多年积累了数十亿行,已删除5%C++代码FBI警告:数十亿邮箱用户存在风险隐患 揭开血汗钱被骗黑幕……趋势探究|日本失落三十年,最硬的两条消费赛道每分钟送8台苹果手机,广东夫妇618首日带货GMV超5亿;90后制香人“展羽”小红书涨粉11万 | 涨粉周榜数十家企业参编中国大模型标准;大模型创企获2.5亿美元投资;微软签署数十亿美元AI算力协议丨AIGC大事日报每分钟可处理8.14亿笔交易、腾讯云数据库TDSQL刷新TPC-C纪录对国产数据库行业意味着什么?六十二 战场归来忙活4年多,规划投资数十亿,电动MINI还没冰淇淋火?两架包机运送数十非法移民至加州首府疑为佛州州长德桑蒂斯手笔年均数十亿市场规模,认知障碍筛查或将迎来爆发增长苹果Vision Pro是下一个iPhone?来看看全球媒体怎么说丨两分钟发布会入门首选!小白走这个路子分分钟发表5分生信文章!(附工具推荐)马英九(2)澳洲房东的贷款还款金额竟然超过房租!负扣税优惠竟使澳洲政府每年损失数十亿澳元!加州黑人赔偿委员会历史性投票通过!要求赔款数十亿!带您全方位了解非裔赔偿案
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。