返回博客

Polars Cloud;随时随地运行 Polars 的分布式云架构

2025 年 2 月 10 日,星期一

1. DataFrame 扩展性差距

当我开始开发 Polars 时,我很惊讶 DataFrame 的实现与 SQL 和数据库有多么不同。SQL 可以无处不在地运行1。它可以嵌入式运行,也可以在客户端-服务器模型上运行,或者在完整的 OLAP 数据仓库上运行。

而对于 DataFrame,其 API 因用例而异,性能也大幅落后于 SQL 解决方案。在本地,pandas 占据主导地位;在远程/分布式场景,则是 PySpark。对终端用户而言,pandas 非常容易上手,但它似乎忽略了数据库几十年来积累的经验:它没有查询优化,数据类型实现不佳,存在许多不必要的物化,将内存处理卸载给了 NumPy,以及其他一些导致扩展性差和行为不一致的设计决策。PySpark 则更接近数据库,它遵循关系模型,具有优化功能、分布式查询引擎并能正确扩展。然而,PySpark 是用 Scala 编写的,需要在本地运行 JVM,用户体验非常不符合 Python 风格(例如 Java 回溯),并且对内存不足错误 (OOM) 非常敏感。它还是十年前为商用硬件设计的,并且基于行的 OLAP 执行已被证明不是最优的。

借助 Polars,我们希望在一个灵活的 DataFrame API 下统一这两个世界,并以高性能计算为支撑。我们最初(并已实现)的目标是为 pandas 提供一个替代方案,它拥有灵活的 API,能够实现查询优化和并行流式执行。其次,我们希望让远程运行 DataFrame 代码变得简单。就像 SQL 一样,Polars 的 LazyFrame 是对查询的描述,它可以发送到服务器进行远程执行。这应该非常简单。在云主导的时代,您不应受限于笔记本电脑的界限。

2. 随处运行 Polars

我们的目标是实现可扩展的数据处理,同时保留 Polars API 的所有灵活性和表达力。我们正在开发两项技术:Polars Cloud 和一种全新的流式引擎设计。我们将在后续文章中详细介绍流式引擎;今天我们想分享一下 Polars Cloud 的构建情况。

我们将提供的功能

  • 分布式 Polars;一个 API 满足所有高性能 DataFrame 需求;
  • 无服务器计算;
  • 可配置硬件,包括 GPU 和 CPU;
  • 对角线扩展;同时支持横向和纵向扩展;
  • 客户自有云;支持 AWS、Azure 和 GCP;
  • 本地部署许可;
  • 容错性;
  • 数据血缘;
  • 可观测性;

启动硬件并远程运行 Polars 查询将非常无缝,无论是用于生产 ETL 任务的批处理模式,还是用于数据探索的交互模式。在本文的其余部分,我们希望通过一些代码示例来探讨这一点。

3. 远程查询。

对我们而言,启动远程查询对终端用户来说应是原生且无缝的。远程运行查询将可通过 Polars 的原生 API 实现。

请注意,我们不限制您从何处调用此代码。您可以从本机上的 Jupyter Notebook、Airflow DAG、AWS Lambda 或您的服务器等启动远程查询。数据处理所需的计算资源通常远高于 Airflow 或 Prefect 中编排所需的计算资源。通过不将您限制在必须运行查询的平台上,我们为您提供了将 Polars Cloud 嵌入任何环境的灵活性。

在下面的查询中,我们启动了第一个查询。

import polars as pl
import polars_cloud as pc
from datetime import date

query = (pl.scan_parquet("s3://my-dataset/")
   .filter(pl.col("l_shipdate") <= date(1998, 9, 2))
      .group_by("l_returnflag", "l_linestatus")
       .agg(
            avg_price=pl.mean("l_extendedprice"),
            avg_disc=pl.mean("l_discount"),
            count_order=pl.len()
        )
 )

in_progress = (
    query
    .remote(pc.ComputeContext(cpus=16, memory=64))
    .sink_parquet("s3://my-dst/")
)

我们创建一个 LazyFrame,然后不将其在本地收集,而是调用 .remote(),这会告诉 Polars 使用给定的 pc.ComputeContext 远程运行此查询。ComputeContext 告诉我们应启动哪种硬件,而 sink_parquet 调用则触发查询。我们将收到一个 InProgressQueryRemote 对象,表明我们的查询正在远程运行。同时,我们可以异步处理其他事务,或者阻塞并等待结果。最后,InProgressQueryRemote 可以再次转换为 LazyFrame,以继续处理远程查询的结果。让我们来操作一下。

result = in_progress.await_result()
print(result)

new_lf: pl.LazyFrame = result.lazy()
shape: (4, 5)
┌──────────────┬──────────────┬──────────────┬──────────┬─────────────┐
│ l_returnflag ┆ l_linestatus ┆ avg_price    ┆ avg_disc ┆ count_order │
│ ---          ┆ ---          ┆ ---          ┆ ---      ┆ ---         │
│ str          ┆ str          ┆ f64          ┆ f64      ┆ u32         │
╞══════════════╪══════════════╪══════════════╪══════════╪═════════════╡
│ A            ┆ F            ┆ 38273.129735 ┆ 0.049985 ┆ 1478493     │
│ N            ┆ O            ┆ 38249.117989 ┆ 0.049997 ┆ 2920374     │
│ R            ┆ F            ┆ 38250.854626 ┆ 0.050009 ┆ 1478870     │
│ N            ┆ F            ┆ 38284.467761 ┆ 0.050093 ┆ 38854       │
└──────────────┴──────────────┴──────────────┴──────────┴─────────────┘

4. 扩展策略

Polars 远程运行这些查询的方式与您对 Python 库的预期有所不同。我们不需要通过 Pickle 进行序列化,因为我们在 Rust 中原生构建了 DSL 树。此 DSL 会发送到服务器进行分析和运行。在集群端,我们在 Rust 中构建了一个分布式调度器-工作器架构,它可以分析这些查询,运行一个经过调整的 Polars 优化器,并根据查询生成一个可横向或纵向扩展的物理计划。Polars 在纵向扩展方面非常强大,我们认识到许多查询在单台机器上运行成本效益最高。然而,查询通常以一系列减少数据集大小的操作开始(例如 group-by 和 filter)。当从云存储查询大量数据时,您会受到单个节点 IO 限制的约束。通过横向扩展,我们可以大幅提高这些下载限制,并在大小缩减完成后在单台机器上完成剩余操作。我们正在构建一种在横向和纵向扩展方面都表现出色的架构,称之为“对角线扩展”,它能动态选择最优策略。

5. 引擎(CPU 和 GPU)

除了多种扩展策略外,我们致力于在工作节点上将开源 Polars 作为我们的引擎运行。这确保了我们的激励措施一致,并且 Polars Cloud 的语义不会偏离。Polars 将允许您运行所有引擎。这意味着也将支持 GPU。您将能够启动一台配备高端 GPU 的机器,并在交互模式下进行本地连接。我们新的流式引擎采用内存外设计,能够高效地溢写到磁盘。结合分布式查询,这将真正地将 Polars 扩展到任何数据集。我们已经有了 PDS-H2 基准测试的初步结果,它们看起来非常有前景。我们已经将内存引擎的性能提高了约 3 倍(而且还有很多性能提升空间),并且内存特性也毋庸置疑地好得多。

6. 分发策略

分布式

如前所述,您可以启动一个机器集群。这使您能够在分布式模式下运行查询。

lf: LazyFrame

result = (
      lf.remote()
      .distributed()
      .collect()
      .await_result()
)

分布式查询的语义不变,它只是告诉 Polars Cloud 在需要时可以使用多个节点来完成查询。并非所有 Polars 操作都已支持,但这仍然会有益处,因为分布式查询可以减少数据集大小,直到单个节点能在流式引擎上完成它。目前支持的操作包括所有可流式处理的操作,例如 filterexplodemap,以及可分区操作,例如 group-byequi-joins

分区

此外,我们还提供了分区查询,它会在集群中的可用节点上按给定 key 对查询进行分区。这将从语义上改变查询,因为您将为每个唯一键获得一个结果,这意味着单个查询将有 n_unique(key) 个结果。

lf: LazyFrame

result = (
      lf.remote(pc.ComputeContext(cpus=16, memory=64, cluster_size=32))
      .partition_by("day")
      .collect()
      .await_result()
)

这对于时间序列数据非常有用,例如您希望按特定时间间隔(如每日、每周、每月等)运行查询时。

并行生成多个查询

最后,我们使远程运行多个查询变得简单。我们提供了一个 spawn_many 函数,它接受一个 LazyFrame 列表,并在集群上运行。

import polars_cloud as pc

lazy_frames: list[LazyFrame]
results = pc.spawn_many(lazy_frames, partition_by="day", dst="s3://result_dst/").await_result()

7. 容错性

一旦您开始处理多个工作器和硬件,就会出现故障,例如磁盘掉线、机器断开连接等。这一切都必须完全对用户隐藏,这些复杂性将由我们来处理。如果工作器失败,我们将重新调度任务,并确保查询能够独立于硬件故障完成。

申请早期访问

本月底,我们将接纳首批客户。此后不久,我们希望扩大规模,邀请那些在 AWS 上拥有云技术栈的个人用户。之后,我们将拓展到其他云服务商和 Kubernetes。如果您想获得早期访问权限,请联系我们!

注释

  1. 我并不是指您可以轻松切换后端。不同的解决方案在排序、空值处理、支持的函数、数据类型等方面具有略微不同的语义。

  2. 这项基准测试源自 TPC-H,但其结果与 TPC-H 的结果不具可比性。更多详情请参阅此处

1
2
4
3
5
6
7
8
9
10
11
12