1. DataFrame 扩展性差距
当我开始开发 Polars 时,我很惊讶 DataFrame 的实现与 SQL 和数据库有多么不同。SQL 可以无处不在地运行1。它可以嵌入式运行,也可以在客户端-服务器模型上运行,或者在完整的 OLAP 数据仓库上运行。
而对于 DataFrame,其 API 因用例而异,性能也大幅落后于 SQL 解决方案。在本地,pandas 占据主导地位;在远程/分布式场景,则是 PySpark。对终端用户而言,pandas 非常容易上手,但它似乎忽略了数据库几十年来积累的经验:它没有查询优化,数据类型实现不佳,存在许多不必要的物化,将内存处理卸载给了 NumPy,以及其他一些导致扩展性差和行为不一致的设计决策。PySpark 则更接近数据库,它遵循关系模型,具有优化功能、分布式查询引擎并能正确扩展。然而,PySpark 是用 Scala 编写的,需要在本地运行 JVM,用户体验非常不符合 Python 风格(例如 Java 回溯),并且对内存不足错误 (OOM) 非常敏感。它还是十年前为商用硬件设计的,并且基于行的 OLAP 执行已被证明不是最优的。
借助 Polars,我们希望在一个灵活的 DataFrame API 下统一这两个世界,并以高性能计算为支撑。我们最初(并已实现)的目标是为 pandas 提供一个替代方案,它拥有灵活的 API,能够实现查询优化和并行流式执行。其次,我们希望让远程运行 DataFrame 代码变得简单。就像 SQL 一样,Polars 的 LazyFrame
是对查询的描述,它可以发送到服务器进行远程执行。这应该非常简单。在云主导的时代,您不应受限于笔记本电脑的界限。
2. 随处运行 Polars
我们的目标是实现可扩展的数据处理,同时保留 Polars API 的所有灵活性和表达力。我们正在开发两项技术:Polars Cloud 和一种全新的流式引擎设计。我们将在后续文章中详细介绍流式引擎;今天我们想分享一下 Polars Cloud 的构建情况。
我们将提供的功能
- 分布式 Polars;一个 API 满足所有高性能 DataFrame 需求;
- 无服务器计算;
- 可配置硬件,包括 GPU 和 CPU;
- 对角线扩展;同时支持横向和纵向扩展;
- 客户自有云;支持 AWS、Azure 和 GCP;
- 本地部署许可;
- 容错性;
- 数据血缘;
- 可观测性;
启动硬件并远程运行 Polars 查询将非常无缝,无论是用于生产 ETL 任务的批处理模式,还是用于数据探索的交互模式。在本文的其余部分,我们希望通过一些代码示例来探讨这一点。
3. 远程查询。
对我们而言,启动远程查询对终端用户来说应是原生且无缝的。远程运行查询将可通过 Polars 的原生 API 实现。
请注意,我们不限制您从何处调用此代码。您可以从本机上的 Jupyter Notebook、Airflow DAG、AWS Lambda 或您的服务器等启动远程查询。数据处理所需的计算资源通常远高于 Airflow 或 Prefect 中编排所需的计算资源。通过不将您限制在必须运行查询的平台上,我们为您提供了将 Polars Cloud 嵌入任何环境的灵活性。
在下面的查询中,我们启动了第一个查询。
import polars as pl
import polars_cloud as pc
from datetime import date
query = (pl.scan_parquet("s3://my-dataset/")
.filter(pl.col("l_shipdate") <= date(1998, 9, 2))
.group_by("l_returnflag", "l_linestatus")
.agg(
avg_price=pl.mean("l_extendedprice"),
avg_disc=pl.mean("l_discount"),
count_order=pl.len()
)
)
in_progress = (
query
.remote(pc.ComputeContext(cpus=16, memory=64))
.sink_parquet("s3://my-dst/")
)
我们创建一个 LazyFrame
,然后不将其在本地收集,而是调用 .remote()
,这会告诉 Polars 使用给定的 pc.ComputeContext
远程运行此查询。ComputeContext
告诉我们应启动哪种硬件,而 sink_parquet
调用则触发查询。我们将收到一个 InProgressQueryRemote
对象,表明我们的查询正在远程运行。同时,我们可以异步处理其他事务,或者阻塞并等待结果。最后,InProgressQueryRemote
可以再次转换为 LazyFrame
,以继续处理远程查询的结果。让我们来操作一下。
result = in_progress.await_result()
print(result)
new_lf: pl.LazyFrame = result.lazy()
shape: (4, 5)
┌──────────────┬──────────────┬──────────────┬──────────┬─────────────┐
│ l_returnflag ┆ l_linestatus ┆ avg_price ┆ avg_disc ┆ count_order │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 ┆ f64 ┆ u32 │
╞══════════════╪══════════════╪══════════════╪══════════╪═════════════╡
│ A ┆ F ┆ 38273.129735 ┆ 0.049985 ┆ 1478493 │
│ N ┆ O ┆ 38249.117989 ┆ 0.049997 ┆ 2920374 │
│ R ┆ F ┆ 38250.854626 ┆ 0.050009 ┆ 1478870 │
│ N ┆ F ┆ 38284.467761 ┆ 0.050093 ┆ 38854 │
└──────────────┴──────────────┴──────────────┴──────────┴─────────────┘
4. 扩展策略
Polars 远程运行这些查询的方式与您对 Python 库的预期有所不同。我们不需要通过 Pickle 进行序列化,因为我们在 Rust 中原生构建了 DSL 树。此 DSL 会发送到服务器进行分析和运行。在集群端,我们在 Rust 中构建了一个分布式调度器-工作器架构,它可以分析这些查询,运行一个经过调整的 Polars 优化器,并根据查询生成一个可横向或纵向扩展的物理计划。Polars 在纵向扩展方面非常强大,我们认识到许多查询在单台机器上运行成本效益最高。然而,查询通常以一系列减少数据集大小的操作开始(例如 group-by 和 filter)。当从云存储查询大量数据时,您会受到单个节点 IO 限制的约束。通过横向扩展,我们可以大幅提高这些下载限制,并在大小缩减完成后在单台机器上完成剩余操作。我们正在构建一种在横向和纵向扩展方面都表现出色的架构,称之为“对角线扩展”,它能动态选择最优策略。
5. 引擎(CPU 和 GPU)
除了多种扩展策略外,我们致力于在工作节点上将开源 Polars 作为我们的引擎运行。这确保了我们的激励措施一致,并且 Polars Cloud 的语义不会偏离。Polars 将允许您运行所有引擎。这意味着也将支持 GPU。您将能够启动一台配备高端 GPU 的机器,并在交互模式下进行本地连接。我们新的流式引擎采用内存外设计,能够高效地溢写到磁盘。结合分布式查询,这将真正地将 Polars 扩展到任何数据集。我们已经有了 PDS-H2 基准测试的初步结果,它们看起来非常有前景。我们已经将内存引擎的性能提高了约 3 倍(而且还有很多性能提升空间),并且内存特性也毋庸置疑地好得多。
6. 分发策略
分布式
如前所述,您可以启动一个机器集群。这使您能够在分布式模式下运行查询。
lf: LazyFrame
result = (
lf.remote()
.distributed()
.collect()
.await_result()
)
分布式查询的语义不变,它只是告诉 Polars Cloud 在需要时可以使用多个节点来完成查询。并非所有 Polars 操作都已支持,但这仍然会有益处,因为分布式查询可以减少数据集大小,直到单个节点能在流式引擎上完成它。目前支持的操作包括所有可流式处理的操作,例如 filter
、explode
、map
,以及可分区操作,例如 group-by
和 equi-joins
。
分区
此外,我们还提供了分区查询,它会在集群中的可用节点上按给定 key
对查询进行分区。这将从语义上改变查询,因为您将为每个唯一键获得一个结果,这意味着单个查询将有 n_unique(key)
个结果。
lf: LazyFrame
result = (
lf.remote(pc.ComputeContext(cpus=16, memory=64, cluster_size=32))
.partition_by("day")
.collect()
.await_result()
)
这对于时间序列数据非常有用,例如您希望按特定时间间隔(如每日、每周、每月等)运行查询时。
并行生成多个查询
最后,我们使远程运行多个查询变得简单。我们提供了一个 spawn_many
函数,它接受一个 LazyFrame
列表,并在集群上运行。
import polars_cloud as pc
lazy_frames: list[LazyFrame]
results = pc.spawn_many(lazy_frames, partition_by="day", dst="s3://result_dst/").await_result()
7. 容错性
一旦您开始处理多个工作器和硬件,就会出现故障,例如磁盘掉线、机器断开连接等。这一切都必须完全对用户隐藏,这些复杂性将由我们来处理。如果工作器失败,我们将重新调度任务,并确保查询能够独立于硬件故障完成。
申请早期访问
本月底,我们将接纳首批客户。此后不久,我们希望扩大规模,邀请那些在 AWS 上拥有云技术栈的个人用户。之后,我们将拓展到其他云服务商和 Kubernetes。如果您想获得早期访问权限,请联系我们!