Pathway是一个面向实时AI应用的数据流处理框架


Pathway 实时数据框架

Pathway 是一个用于流处理、实时分析、LLM 流水线和 RAG 的 Python ETL 框架。

Pathway 附带一个易于使用的 Python API,可让您无缝集成您常用的 Python 机器学习库。
Pathway 代码功能强大且功能强大:您可以在开发和生产环境中使用它,高效处理批量数据和流数据
相同的代码可用于本地开发、CI/CD 测试、运行批量作业、处理流回放以及处理数据流。

Pathway 由基于 Differential Dataflow 的可扩展 Rust 引擎提供支持,并执行增量计算。
您的 Pathway 代码尽管是用 Python 编写的,但由 Rust 引擎运行,从而支持多线程、多处理和分布式计算。
所有流水线都保存在内存中,并且可以通过Docker 和 Kubernetes轻松部署。

您可以使用 pip 安装 Pathway:

复制代码
pip install -U pathway

如有任何疑问,您可以在 Discord 上联系该项目背后的社区和团队。

用例和模板

准备好了解 Pathway 的功能了吗?

试用我们易于运行的示例

这些示例提供 Notebook 和 Docker 两种格式,只需点击几下即可启动。选择一种格式,立即开始使用 Pathway 进行实践体验!

事件处理和实时分析管道

凭借其统一的批处理和流处理引擎以及完全的 Python 兼容性,Pathway 让数据处理变得尽可能简单。它是各种数据处理管道的理想解决方案,包括:

AI 管道

Pathway 提供专用的 LLM 工具,用于构建实时 LLM 和 RAG 管道。其中包含大多数常用 LLM 服务和实用程序的封装器,使 LLM 和 RAG 管道的使用变得异常简单。请查看我们的 LLM xpack 文档

欢迎试用我们包含 LLM 工具的可运行示例。
您可以在此处 找到这些示例。

功能

  • 丰富的连接器:Pathway 附带可连接到外部数据源(例如 Kafka、GDrive、PostgreSQL 或 SharePoint)的连接器。其 Airbyte 连接器允许您连接到 300 多个不同的数据源。如果您所需的连接器不可用,您可以使用 Pathway Python 连接器构建您自己的自定义连接器。
  • 无状态和有状态转换:Pathway 支持有状态转换,例如连接、窗口化和排序。它提供了许多直接用 Rust 实现的转换。除了提供的转换之外,您还可以使用任何 Python 函数。您可以实现自己的函数,也可以使用任何 Python 库来处理数据。
  • 持久性:Pathway 提供持久性来保存计算状态。这允许您在更新或崩溃后重新启动管道。有了 Pathway,您的管道将得到妥善管理!
  • 一致性:Pathway 会为您处理时间问题,确保所有计算的一致性。特别是,每当新的(在本例中为迟到的)数据点进入系统时,Pathway 都会更新其结果,从而管理迟到点和乱序点。Pathway 的免费版本提供“至少一次”一致性,而企业版本提供“恰好一次”一致性。
  • 可扩展的 Rust 引擎:使用 Pathway Rust 引擎,您可以摆脱 Python 常见的限制。您可以轻松实现多线程、多处理和分布式计算。
  • LLM 助手:Pathway 提供了一个 LLM 扩展,其中包含所有实用程序,可将 LLM 与您的数据管道(LLM 包装器、解析器、嵌入器、拆分器)集成,包括内存实时向量索引以及与 LLamaIndex 和 LangChain 的集成。您可以使用实时文档快速构建和部署 RAG 应用程序。

入门

安装

Pathway 需要 Python 3.10 或更高版本。

您可以使用 pip 安装当前版本的 Pathway:

复制代码
$ pip install -U pathway

⚠️ Pathway 可在 MacOS 和 Linux 上使用。其他系统用户请在虚拟机上运行 Pathway。

示例:实时计算正值之和。

python 复制代码
import pathway as pw

# Define the schema of your data (Optional)
class InputSchema(pw.Schema):
  value: int

# Connect to your data using connectors
input_table = pw.io.csv.read(
  "./input/",
  schema=InputSchema
)

#Define your operations on the data
filtered_table = input_table.filter(input_table.value>=0)
result_table = filtered_table.reduce(
  sum_value = pw.reducers.sum(filtered_table.value)
)

# Load your results to external systems
pw.io.jsonlines.write(result_table, "output.jsonl")

# Run the computation
pw.run()

在 Google Colab 中运行 Pathway (https://colab.research.google.com/drive/1aBIJ2HCng-YEUOMrr0qtj0NeZMEyRz55?usp=sharing)。

更多示例请见此处 (https://github.com/pathwaycom/pathway/tree/main/examples)。

部署

本地部署

要使用 Pathway,您只需导入它:

python 复制代码
import pathway as pw

现在,您可以轻松创建处理管道,并让 Pathway 处理更新。创建管道后,您可以使用一行命令启动流数据计算:

python 复制代码
pw.run()

然后,您可以像运行普通 Python 脚本一样运行 Pathway 项目(例如 main.py):$ python main.py
Pathway 附带一个监控仪表板,可让您跟踪每个连接器发送的消息数量和系统延迟。仪表板还包含日志消息。

或者,您也可以使用类似 Pathway 的版本:

复制代码
$ pathway spawn python main.py

Pathway 原生支持多线程。
要使用 3 个线程启动您的应用程序,您可以执行以下操作:

复制代码
$ pathway spawn --threads 3 python main.py

要快速启动 Pathway 项目,您可以使用我们的 cookiecutter 模板

Docker

您可以使用 docker 轻松运行 Pathway。

Pathway 镜像

您可以使用 Pathway docker 镜像,并创建一个 Dockerfile:

dockerfile 复制代码
FROM pathwaycom/pathway:latest

WORKDIR /app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python", "./your-script.py" ]

然后,您可以构建并运行 Docker 镜像:

console 复制代码
docker build -t my-pathway-app .
docker run -it --rm --name my-pathway-app my-pathway-app

运行单个 Python 脚本

处理单文件项目时,创建完整的 Dockerfile似乎没有必要。在这种情况下,您可以直接使用 Pathway Docker 镜像执行 Python 脚本。
例如:

控制台 复制代码
docker run -it --rm --name my-pathway-app -v "$PWD":/app pathwaycom/pathway:latest python my-pathway-app.py

Python docker 镜像

您也可以使用标准 Python 镜像,并使用 pip 和 Dockerfile 安装 Pathway:

dockerfile 复制代码
FROM --platform=linux/x86_64 python:3.10

RUN pip install -U pathway
COPY ./pathway-script.py pathway-script.py

CMD ["python", "-u", "pathway-script.py"]

Kubernetes 和云

Docker 容器非常适合使用 Kubernetes 在云端部署。
如果您想扩展 Pathway 应用,您可能会对我们的企业版 Pathway 感兴趣。
企业版 Pathway 专为端到端数据处理和实时智能分析量身定制。
它使用云端分布式计算进行扩展,并支持分布式 Kubernetes 部署,并具有外部持久化设置。

您可以使用 Render 等服务轻松部署 Pathway:请参阅如何通过几次点击部署 Pathway

如果您感兴趣,请随时联系我们 了解更多信息。

性能

Pathway 旨在超越专为流式和批量数据处理任务而设计的先进技术,包括:Flink、Spark 和 Kafka Streaming。它还能在流式模式下实现许多其他流式框架难以支持的算法/UDF(尤其是:时序连接、迭代图算法和机器学习例程)。

如果您对此感兴趣,这里有一些基准测试 供您参考。

文档和支持

Pathway 的完整文档可在 pathway.com/developers/ 获取,包括 API 文档

如有任何疑问,请随时 在 GitHub 上创建问题、加入我们的 Discord 或发送电子邮件至 contact@pathway.com

许可证

Pathway 基于 BSL 1.1 许可证 分发,该许可证允许无限制的非商业用途,以及免费使用 Pathway 软件包用于大多数商业用途。此代码库中的代码将在 4 年后自动转换为开源(Apache 2.0 许可证)。一些与此代码库互补的公共代码库(示例、库、连接器等)根据 MIT 许可证以开源形式发布。

贡献指南

如果您开发了一个库或连接器并希望将其集成到此代码库中,我们建议您先将其作为单独的代码库以 MIT/Apache 2.0 许可证发布。

如果您对 Pathway 核心功能有任何疑问,欢迎提出问题。如需了解更多信息,请随时联系 Pathway 的 Discord 社区

关于项目

Pathway 是一个用于流处理、实时分析、LLM 管道和 RAG 的 Python ETL 框架。Pathway 附带易于使用的 Python API,允许您无缝集成您最喜欢的 Python ML 库。
Other
Python
46,022
1395
90
2022-11-27
2025-10-10

增长趋势 - stars