Redian新闻
>
自动增量计算:构建高性能数据分析系统的任务编排

自动增量计算:构建高性能数据分析系统的任务编排

科技

在起始的那篇《金融 Python 即服务:业务自助的数据服务模式》,我们介绍了:使用 Python 如何使用作为数据系统的 wrapper 层?在这一篇文章里,我们将继续之前的话题,介绍如何使用 Python 作为计算引擎核心的胶水层,即:如何使用 Python 构建 DAG(有向无环图,Directed Acyclic Graph) 任务?

除此,还可以了解一下,如何设计增量 DAG 计算?先看一下增量计算的概念:

增量计算(Incremental computing),是一种软件功能,每当一条数据发生更改时,它都会尝试通过仅重新计算依赖于更改数据的输出来节省时间。

常见的领域有:

  • GUI 应用, 诸如于 React 的 Dom Diff

  • 不断变化的大型计算,诸如于金融计算、电子表格、大数据系统

  • 构建系统,诸如于 Gradle、Bazel、Rustc 等

所以,在开始之前,让我们先看一个简单的例子,Excel 如何实现增量计算。

引子 1:Excel 的增量计算

众所周知,Excel 是使用最广泛的数据分析工具。当我们使用了 Excel 中的公式之后,当我们修改了 A 单元格的值,对应的结果会自动发生变化。而如果在这时,还有其它依赖于此单元格的值时,对应的结果也会发生变化。如下图所示:

出自 《How to Recalculate a Spreadsheet》

在 Microsoft 官方的文档里(Excel 重新计算),可以看到对应的触发重新计算场景:输入新数据、删除或插入行或列等等。在 Excel 中,工作表的计算可视为包含三个阶段的过程:

  1. 构造依赖关系树

  2. 构造计算链

  3. 重新计算单元格

一旦触发了重新计算,Excel 会重新构造依赖关系树和计算链,并依赖于此的所有单元格标记为 ”脏单元格“。随后,根据计算链指定的顺序重新计算。通常来说,在我们设计依赖分析时,假定的是函数是不可变的。但是呢,还存在一些特殊的函数类型,诸如于文档中提到的:

  • 异步函数 (UDF)

  • 可变函数。即哪怕参数没有变化时,值也可能修改。诸如于 Now、Today 等。

这意味着,我们在设计增量计算时,需要考虑到这个场景的问题。从原理和实现来说,它一点并不算太复杂,有诸如于

从注解 DAG 到增量 DAG 设计

DAG (有向无环图,Directed Acyclic Graph)是一种常用数据结构,仅就 DAG 而言,它已经在我们日常的各种工具中存在:

  • 依赖系统。诸如如 NPM、Yarn、Gradle、Cargo 等

  • 人工智能。如机器学习等

  • 数据流系统。如编译器、Apache Spark、Apache Airflow 等。

  • 数据可视化。常用的 Graphviz,又或者是各个语言里的 Network 相关的库,诸如于 Python 的 NetworkX。

当我们从任务编排和数据等的角度来看,DAG 的面向普通人术语是叫工作流(Workflow)。

常规 DAG 到函数式 DAG

通常情况下,实现一个 DAG 非常的简单 —— 只是数据结构。在使用时,也比较简单,如下是 Cytoscape 的 API 示例:

  1. cy.add([

  2.   { group: 'nodes', data: { id: 'n0' }, position: { x: 100, y: 100 } },

  3.   { group: 'nodes', data: { id: 'n1' }, position: { x: 200, y: 200 } },

  4.   { group: 'edges', data: { id: 'e0', source: 'n0', target: 'n1' } }

  5. ]);

而这一类 DAG 是静态的,当我们需要结合些任务时,就会需要添加函数。由此便会稍微复杂一些,再现看个示例:

  1. comp = Computation()

  2. comp.add_node('a')

  3. comp.add_node('b', lambda a: a+1)

  4. comp.add_node('c', lambda a, b: 2*a)

  5. comp.add_node('d', lambda b, c: b + c)

  6. comp.add_node('e', lambda c: c + 1)

  7. comp.compute('d')

  8. comp.get_value_dict()

上述的代码中,是 Loman 框架的示例,其中的 lambda a: a+1 是 Python 的 Lambda 表达式。Loman 会在运行时,分析这个 Lambda,获得 Lambda 中的参数,随后添加对应的计算依赖。

Loman 示例

而在多数场景之下,往往是采用注解的形式,诸如于 Airflow、Gradle 等。

基于注解与条件的 DAG 函数

回到研究的开始,如美银证券的 Quartz 的 DSL 扩展(Little languages),便是在 Loman 的形式上进行了一步扩展。使用注解代替了 Lambda:

  1. class C:


  2.   @dag

  3.   def f1(self, x, y):

  4.     return self.f2(x) + y


  5.   @dag

  6.   def f2(self, x):

  7.     return x * x

围绕于这个注解,Quartz 在这一层的实现上,包含了四个特性:DAG、记忆化(memoization)、持久化、时间旅行调试(time travel)。考虑到 Quartz 并不是一个开源的实现,社区上的材料不一定靠谱,所以我们还是再看看 Apache Ariflow 的实现。引用官网的示例:

  1. from datetime import datetime


  2. from airflow import DAG

  3. from airflow.decorators import task

  4. from airflow.operators.bash import BashOperator


  5. # A DAG represents a workflow, a collection of tasks

  6. with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:


  7.   # Tasks are represented as operators

  8.   hello = BashOperator(task_id="hello", bash_command="echo hello")


  9.   @task()

  10.   def airflow():

  11.     print("airflow")


  12.   # Set dependencies between tasks

  13.   hello >> airflow()

从实现上来说,Apache Airflow 的 DAG 实现本着 “工作流即代码” 的思想设计的。上面代码中,比较有意思的是 >> 语法,其是在任务之间定义了一个依赖关系并控制任务的执行顺序。

增量 DAG 注解:Gradle —— 监听输入与输出

在编译上,Gradle 也是支持增量编译(也是一种增量计算)的,我们可以先看个简单的示例:

  1. abstract class IncrementalReverseTask extends DefaultTask {

  2. @Incremental

  3. @InputDirectory

  4. abstract DirectoryProperty getInputDir()


  5. @OutputDirectory

  6. abstract DirectoryProperty getOutputDir()


  7. @TaskAction

  8. void execute(InputChanges inputChanges) {

  9.   inputChanges.getFileChanges(inputDir).each { change ->

  10.     if (change.fileType == FileType.DIRECTORY) return


  11.     def targetFile = outputDir.file(change.normalizedPath).get().asFile

  12.     if (change.changeType == ChangeType.REMOVED) {

  13.       targetFile.delete()

  14.     } else {

  15.       targetFile.text = change.file.text.reverse()

  16.     }

  17.   }

  18. }

  19. }

对于 Gradle 的增量任务来说,通常只需要关注输入和输出,只要 InputDirectory 和 OutputDirectory 不变,那么就认为 Task 不需要再执行。因为在实现处理逻辑时,只关注于这两个值是否发生变化。

Rust 语言:Salsa 框架的增量 DAG 设计

Rust 编译器的文档上也包含了相关的介绍:Incremental compilation,而这里我们是一个相关的实现 —— 增量编译的设计者之一(Niko Matsakis)编写的库 Salsa。Salsa 是一个用于编写增量 (incremental) 、按需 (on-demand) 程序的 Rust 框架,其采用的是 “红-绿”算法。与 Gradle 相似的,Salsa 结构体(Structs)是使用一种 Salsa 属性宏进行了标注的结构体:

  • #[salsa::input]:用于指定计算的“基本输入”

  • #[salsa::tracked]:用于指定在计算过程中创建的中间值

  • #[salsa::interned]:用于指定易于进行相等比较的小型值

由于 Salsa 相比于 Gradle 是位于更底层的基础设施,所以需要手动构建存储层,即 Jar 和数据库)。数据库是一个结构体,它最终存储 Salsa 的所有中间状态,例如来自跟踪函数的被记忆的 (memoized) 返回值。数据库本身是以一些中间结构 (intermediate structure) 的形式定义的,这些中间结构被称为 jars,并包含每个函数的数据。

缓存计算与存储计算

既然,我们已经通过注解将输入、输出、函数等内容标注出来,下一步就是缓存结果。如此一来,我们就可以通过缓存来提升计算性能。对于计算的缓存来说,至少需要包含这三个部分:

  • 函数表达式(Fn 类型)。

  • 零个或多个参数。

  • 一个可选名称。

由此,我们才能获得缓存后的结果。在一些框架的设计里,诸如于 Python 语言

内存:Memoization —— 函数式编程的记忆

Memoization(记忆化)是函数式语言的一种特性,使用一组参数初次调用函数时,缓存参数和计算结果,当再次使用相同的参数调用该函数时,直接返回相应的缓存结果。在一些不支持 memoization 的语言里,需要手动引入这种设计,如 Java:

  1. Map<Integer, Integer> cache = new ConcurrentHashMap<>();


  2. Integer addOne(Integer x) {

  3.   return cache.computeIfAbsent(-> x + 1);

  4. }

上述只是一个加法的示例,万能的 StackOverflow 上有更多的示例:Java memoization method

当然了,缓存是有负作用的 —— 第一次计算时存储结果会花费一定的时间,不过大部分情况下可以忽略不计。

数据库存储

对于耗时更长的 AI 或者是金融计算场景时,需要采用分布式的任务调度器,才能更快的得到计算结果。于是乎,采用分布式键值存储来对结果进行缓存就是更好的选择。在 Salsa 框架里,由于考虑到不同的类型(input、output、tracked 等),对于数据结构函数等来说,其对应的 Index 由三部分组成:

  1. #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]

  2. pub struct DatabaseKeyIndex {

  3.   group_index: u16,

  4.   query_index: u16,

  5.   key_index: u32,

  6. }

大抵是

增量计算框架与算法

由于时间与精力限制(主要是我看不懂一些用英语写的公式,还有暂时没打算学 OCaml),这里就没有展开对于各类计算框架论文的讨论。诸如于 IncrementalAdapton 就是相关的论文与实现,就包含了非常不错的资料。

除此:https://lord.io/spreadsheets/ 一文也给了非常好的介绍。

这里,我就不展开了。

有了增量计算,然后呢?

后续的计算部分,可以参考 Apache Airflow 来实现。它是一个支持开源分布式任务调度框架,其架构

  • 调度程序,它处理触发计划的工作流,并将任务提交给执行程序以运行。

  • 执行器,它处理正在运行的任务。在默认的 Airflow 安装中,这会在调度程序中运行所有内容,但大多数适合生产的执行程序实际上会将任务执行推送给工作人员。

  • Web 服务器,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。

  • DAG 文件的文件夹,由调度程序和执行程序(以及执行程序拥有的任何工作人员)读取

  • 元数据数据库,由调度程序、执行程序和网络服务器用来存储状态。

其架构图如下:

Apache Airflow 架构

不过、过了、还是不过,考虑到 Airflow 的 DAG 实现是 Python,在分布式任务调度并不是那么流行。但是,作为一个参考还是非常不错的。

其他

相关参考资料:

  • How to Recalculate a Spreadsheet》一篇非常不错的文章,介绍了不同的算法是如何重新计算电子表格的。当然了,也包含作者自己写的新方案 Anchors。对于写库来说,是一个非常不错的参考。

  • Excel 重新计算》介绍了 Excel 重新计算的逻辑。

  • Salsa 文档:https://salsa-rs.netlify.app/ (中文版翻译:https://rust-chinese-translation.github.io/salsa-book/ )

  • Adapton 提供了一个增量计算的编程语言抽象,官网:http://adapton.org/ 提供了非常不错的参考资料

除此,在构建工具方面,在这一方面微软研究院的《Build Systems à la Carte》提供了一个非常不错的介绍,如果你可以参考这一篇《【工业聚看论文】第一期:《Build Systems à la Carte: Theory and Practice

(PS:因微信限制,链接请阅读原文使用)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
2.4K star,一个高性能、无侵入的Java性能监控和统计工具,有点东西!求职干货 | 华为、DELL等2023秋招已开!海归求职:数据(数据分析、数据科学、工程)视频来了!习近平:构建高水平社会主义市场经济体制模型越大,表现越差?谷歌收集了让大模型折戟的任务,还打造了一个新基准生成扩散模型漫谈:构建ODE的一般步骤(上)求职干货|拼多多 2023秋招补录已开!海归求职:数据(数据分析、数据科学、工程)五十年代初期的原上海地下党, 老江是个命好的典型, 转折点是去一汽大陆集团:构建人才护城河,引领组织转型升级2023 年开年应该完成的任务清单他只是一名眼科医生——绝不是什么英雄!停了电就像要没了命乌克兰要干一件不可能完成的任务案例 | Fireblocks:构建数字资产的金融体系5G/6G:构建一个更加互联智能的世界高性能计算:RoCE技术分析及应用云计算:芯片大战必争之地​ ​| 经济学人商业数据如何助力企业发展?从高性能计算、商业智能、数据库三个领域分享赶紧的,赶紧把你的肚子收一收!老二的任务:不让爸爸妈妈一起睡觉中国银行手机银行:构建开放生态,共创美好生活重磅首发|2022全域数据驱动增长指南:用户数据主权争夺战,品牌该出手了田丰、严飞对谈直播 | 欲望与计算:我们的消费经验史再议交易与决策:构建“四维度”脚手架(一、二级市场通用框架)人类首个行星防御演示成功,史无前例的任务完成了!性能提升 2.5 倍!字节开源高性能 C++ JSON 库 sonic-cpp原来count(*)就是我们系统的接口性能变差100倍的真凶…独家视频丨习近平:构建高水平社会主义市场经济体制Treg的功能鉴定:构建Treg和Responder T共培养体系躁动图计算:蚂蚁和字节们想找到“幻视”额头上那颗宝石餐厅的女当家人储能器件也可以计算:一种新型的神经形态器件 | NSR用图计算引擎进行数据分析,智能BI厂商「欧拉认知智能」完成千万级PreA+轮融资|36氪首发求职干货 | 华为等企业2023秋招已开!海归求职:数据(数据分析、数据科学、工程)求职干货|Amazon 2023 暑期实习已开!海外求职:数据(数据分析、数据科学、工程)​财政部:构建引导基金投资新体系,支持国家级基金与深圳加强合作
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。