4

自动增量计算:构建高性能数据分析系统的任务编排

 1 year ago
source link: https://www.phodal.com/blog/incremental-computing-for-financial-python/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

自动增量计算:构建高性能数据分析系统的任务编排

Posted by: Phodal Huang Nov. 5, 2022, 8:24 p.m.

在起始的那篇《金融 Python 即服务:业务自助的数据服务模式》,我们介绍了:使用 Python 如何使用作为数据系统的 wrapper 层?在这一篇文章里,我们将继续之前的话题,介绍如何使用 Python 作为计算引擎核心的胶水层,即:如何使用 Python 构建 DAG(有向无环图,Directed Acyclic Graph) 任务?

除此,还可以了解一下,如何设计增量 DAG 计算?先看一下增量计算的概念:

增量计算(Incremental computing),是一种软件功能,每当一条数据发生更改时,它都会尝试通过仅重新计算依赖于更改数据的输出来节省时间。

常见的领域有:

  • GUI 应用, 诸如于 React 的 Dom Diff
  • 不断变化的大型计算,诸如于金融计算、电子表格、大数据系统
  • 构建系统,诸如于 Gradle、Bazel、Rustc 等

所以,在开始之前,让我们先看一个简单的例子,Excel 如何实现增量计算。

引子 1:Excel 的增量计算

众所周知,Excel 是使用最广泛的数据分析工具。当我们使用了 Excel 中的公式之后,当我们修改了 A 单元格的值,对应的结果会自动发生变化。而如果在这时,还有其它依赖于此单元格的值时,对应的结果也会发生变化。如下图所示:

出自 《How to Recalculate a Spreadsheet》

在 Microsoft 官方的文档里(Excel 重新计算),可以看到对应的触发重新计算场景:输入新数据、删除或插入行或列等等。在 Excel 中,工作表的计算可视为包含三个阶段的过程:

  1. 构造依赖关系树
  2. 构造计算链
  3. 重新计算单元格

一旦触发了重新计算,Excel 会重新构造依赖关系树和计算链,并依赖于此的所有单元格标记为 ”脏单元格“。随后,根据计算链指定的顺序重新计算。通常来说,在我们设计依赖分析时,假定的是函数是不可变的。但是呢,还存在一些特殊的函数类型,诸如于文档中提到的:

  • 异步函数 (UDF)。
  • 可变函数。即哪怕参数没有变化时,值也可能修改。诸如于 Now、Today 等。

这意味着,我们在设计增量计算时,需要考虑到这个场景的问题。从原理和实现来说,它一点并不算太复杂,有诸如于

从注解 DAG 到增量 DAG 设计

DAG (有向无环图,Directed Acyclic Graph)是一种常用数据结构,仅就 DAG 而言,它已经在我们日常的各种工具中存在:

  • 依赖系统。诸如如 NPM、Yarn、Gradle、Cargo 等
  • 人工智能。如机器学习等
  • 数据流系统。如编译器、Apache Spark、Apache Airflow 等。
  • 数据可视化。常用的 Graphviz,又或者是各个语言里的 Network 相关的库,诸如于 Python 的 NetworkX

当我们从任务编排和数据等的角度来看,DAG 的面向普通人术语是叫工作流(Workflow)。

常规 DAG 到函数式 DAG

通常情况下,实现一个 DAG 非常的简单 —— 只是数据结构。在使用时,也比较简单,如下是 Cytoscape 的 API 示例:

cy.add([
  { group: 'nodes', data: { id: 'n0' }, position: { x: 100, y: 100 } },
  { group: 'nodes', data: { id: 'n1' }, position: { x: 200, y: 200 } },
  { group: 'edges', data: { id: 'e0', source: 'n0', target: 'n1' } }
]);

而这一类 DAG 是静态的,当我们需要结合些任务时,就会需要添加函数。由此便会稍微复杂一些,再现看个示例:

comp = Computation()
comp.add_node('a')
comp.add_node('b', lambda a: a+1)
comp.add_node('c', lambda a, b: 2*a)
comp.add_node('d', lambda b, c: b + c)
comp.add_node('e', lambda c: c + 1)
comp.compute('d')
comp.get_value_dict()

上述的代码中,是 Loman 框架的示例,其中的 lambda a: a+1 是 Python 的 Lambda 表达式。Loman 会在运行时,分析这个 Lambda,获得 Lambda 中的参数,随后添加对应的计算依赖。

Loman 示例

而在多数场景之下,往往是采用注解的形式,诸如于 Airflow、Gradle 等。

基于注解与条件的 DAG 函数

回到研究的开始,如美银证券的 Quartz 的 DSL 扩展(Little languages),便是在 Loman 的形式上进行了一步扩展。使用注解代替了 Lambda:

class C:
  @dag
  def f1(self, x, y):
    return self.f2(x) + y
  @dag
  def f2(self, x):
    return x * x

围绕于这个注解,Quartz 在这一层的实现上,包含了四个特性: DAG、记忆化(memoization)、持久化、时间旅行调试(time travel)。考虑到 Quartz 并不是一个开源的实现,社区上的材料不一定靠谱,所以我们还是再看看 Apache Ariflow 的实现。引用官网的示例:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()

从实现上来说,Apache Airflow 的 DAG 实现本着 “工作流即代码” 的思想设计的。上面代码中,比较有意思的是 >> 语法,其是在任务之间定义了一个依赖关系并控制任务的执行顺序。

增量 DAG 注解:Gradle —— 监听输入与输出

在编译上,Gradle 也是支持增量编译(也是一种增量计算)的,我们可以先看个简单的示例:

abstract class IncrementalReverseTask extends DefaultTask {
    @Incremental
    @InputDirectory
    abstract DirectoryProperty getInputDir()

    @OutputDirectory
    abstract DirectoryProperty getOutputDir()

    @TaskAction
    void execute(InputChanges inputChanges) {
        inputChanges.getFileChanges(inputDir).each { change ->
            if (change.fileType == FileType.DIRECTORY) return

            def targetFile = outputDir.file(change.normalizedPath).get().asFile
            if (change.changeType == ChangeType.REMOVED) {
                targetFile.delete()
            } else {
                targetFile.text = change.file.text.reverse()
            }
        }
    }
}

对于 Gradle 的增量任务来说,通常只需要关注输入和输出,只要 InputDirectoryOutputDirectory 不变,那么就认为 Task 不需要再执行。因为在实现处理逻辑时,只关注于这两个值是否发生变化。

Rust 语言:Salsa 框架的增量 DAG 设计

Rust 编译器的文档上也包含了相关的介绍:Incremental compilation,而这里我们是一个相关的实现 —— 增量编译的设计者之一(Niko Matsakis)编写的库 Salsa。Salsa 是一个用于编写增量 (incremental) 、按需 (on-demand) 程序的 Rust 框架,其采用的是 “红-绿”算法。与 Gradle 相似的,Salsa 结构体(Structs)是使用一种 Salsa 属性宏进行了标注的结构体:

  • #[salsa::input]:用于指定计算的“基本输入”
  • #[salsa::tracked]:用于指定在计算过程中创建的中间值
  • #[salsa::interned]:用于指定易于进行相等比较的小型值

由于 Salsa 相比于 Gradle 是位于更底层的基础设施,所以需要手动构建存储层,即 Jar 和数据库)。数据库是一个结构体,它最终存储 Salsa 的所有中间状态,例如来自跟踪函数的被记忆的 (memoized) 返回值。数据库本身是以一些中间结构 (intermediate structure) 的形式定义的,这些中间结构被称为 jars,并包含每个函数的数据。

缓存计算与存储计算

既然,我们已经通过注解将输入、输出、函数等内容标注出来,下一步就是缓存结果。如此一来,我们就可以通过缓存来提升计算性能。对于计算的缓存来说,至少需要包含这三个部分:

  • 函数表达式(Fn 类型)。
  • 零个或多个参数。
  • 一个可选名称。

由此,我们才能获得缓存后的结果。在一些框架的设计里,诸如于 Python 语言

内存:Memoization —— 函数式编程的记忆

Memoization(记忆化)是函数式语言的一种特性,使用一组参数初次调用函数时,缓存参数和计算结果,当再次使用相同的参数调用该函数时,直接返回相应的缓存结果。在一些不支持 memoization 的语言里,需要手动引入这种设计,如 Java:

Map<Integer, Integer> cache = new ConcurrentHashMap<>();

Integer addOne(Integer x) {
    return cache.computeIfAbsent(x -> x + 1);
}

上述只是一个加法的示例,万能的 StackOverflow 上有更多的示例:Java memoization method

当然了,缓存是有负作用的 —— 第一次计算时存储结果会花费一定的时间,不过大部分情况下可以忽略不计。

数据库存储

对于耗时更长的 AI 或者是金融计算场景时,需要采用分布式的任务调度器,才能更快的得到计算结果。于是乎,采用分布式键值存储来对结果进行缓存就是更好的选择。在 Salsa 框架里,由于考虑到不同的类型(input、output、tracked 等),对于数据结构函数等来说,其对应的 Index 由三部分组成:

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct DatabaseKeyIndex {
    group_index: u16,
    query_index: u16,
    key_index: u32,
}

增量计算框架与算法

由于时间与精力限制(主要是我看不懂一些用英语写的公式,还有暂时没打算学 OCaml),这里就没有展开对于各类计算框架论文的讨论。诸如于 IncrementalAdapton 就是相关的论文与实现,就包含了非常不错的资料。

除此:https://lord.io/spreadsheets/ 一文也给了非常好的介绍。

这里,我就不展开了。

有了增量计算,然后呢?

后续的计算部分,可以参考 Apache Airflow 来实现。它是一个支持开源分布式任务调度框架,其架构

  • 调度程序,它处理触发计划的工作流,并将任务提交给执行程序以运行。
  • 执行器,它处理正在运行的任务。 在默认的 Airflow 安装中,这会在调度程序中运行所有内容,但大多数适合生产的执行程序实际上会将任务执行推送给工作人员。
  • Web 服务器,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。
  • DAG 文件的文件夹,由调度程序和执行程序(以及执行程序拥有的任何工作人员)读取
  • 元数据数据库,由调度程序、执行程序和网络服务器用来存储状态。

其架构图如下:

Apache Airflow 架构

不过、过了、还是不过,考虑到 Airflow 的 DAG 实现是 Python,在分布式任务调度并不是那么流行。但是,作为一个参考还是非常不错的。

相关参考资料:

除此,在构建工具方面,在这一方面微软研究院的《Build Systems à la Carte》提供了一个非常不错的介绍,如果你可以参考这一篇《【工业聚看论文】第一期:《Build Systems à la Carte: Theory and Practice》


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK