🏆如何使用 Taipy 和 PySpark 掌握📊大数据管道🐍
本文将通过一个简单的示例来演示如何将PySpark 与 Taipy 集成,以将您的大数据处理需求与智能作业执行结合起来。
让我们开始吧!

将 PySpark 与 Taipy 结合使用
Taipy 是一款功能强大的工作流编排工具,它拥有易于使用的框架,可轻松应用于您现有的数据应用程序。Taipy
建立在坚实的概念基础之上—— 场景、任务和数据节点——这些概念非常强大,即使使用没有明确支持的第三方软件包,也 能让开发人员轻松地构建自己的流程模型。

我们感谢任何有助于我们发展社区的帮助🌱
如果您已经熟悉 PySpark 和 Taipy,可以直接跳到“2.
Taipy 配置 (*config.py )”。
该部分将深入探讨如何为 Taipy 任务定义一个函数来运行 PySpark 应用程序。否则,请继续阅读!
一个简单的例子:palmerpenguins
我们以palmerpenguins数据集为例:
>>> penguin_df
┌───────┬─────────┬───────────┬────────────────┬───────────────┬───────────────────┬─────────────┬────────┬──────┐
│ index │ species │ island │ bill_length_mm │ bill_depth_mm │ flipper_length_mm │ body_mass_g │ sex │ year │
├───────┼─────────┼───────────┼────────────────┼───────────────┼───────────────────┼─────────────┼────────┼──────┤
│ 0 │ Adelie │ Torgersen │ 39.1 │ 18.7 │ 181.0 │ 3750.0 │ male │ 2007 │
│ 1 │ Adelie │ Torgersen │ 39.5 │ 17.4 │ 186.0 │ 3800.0 │ female │ 2007 │
│ 2 │ Adelie │ Torgersen │ 40.3 │ 18.0 │ 195.0 │ 3250.0 │ female │ 2007 │
│ 3 │ Adelie │ Torgersen │ NaN │ NaN │ NaN │ NaN │ NaN │ 2007 │
│ 4 │ Adelie │ Torgersen │ 36.7 │ 19.3 │ 193.0 │ 3450.0 │ female │ 2007 │
│ ... │ ... │ ... │ ... │ ... │ ... │ ... │ ... │ ... │
└───────┴─────────┴───────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴──────┘
该数据集仅包含 344 条记录,几乎不需要 Spark 进行处理。
然而,该数据集易于访问,其大小与演示 Spark 与 Taipy 的集成无关。
如果您必须使用更大的数据集进行测试,可以根据需要复制数据多次。
我们将设计一个执行两个主要任务的工作流程:
1-Spark任务(spark_process):
- 加载数据;
- 按“物种”、“岛屿”和“性别”对数据进行分组;
- 找出其他列的平均值(“ bill_length_mm ”,“ bill_depth_mm ”,“ flipper_length_mm ”,“ body_mass_g ”);
- 保存数据。
2- Python 任务(过滤器):
- 加载Spark任务先前保存的输出数据;
- 给定“物种”、“岛屿”和“性别”,返回聚合值。
我们的小项目将包含 4 个文件:
app/
├─ penguin_spark_app.py # the spark application
├─ config.py # the configuration for our taipy workflow
├─ main.py # the main script (including our application gui)
├─ penguins.csv # the data as downloaded from the palmerpenguins git repo
您可以在本文的代码块中找到每个文件的内容(除了可以从palmerpenguins 存储库获取的penguins.csv )。
1. Spark 应用程序(penguin_spark_app.py)
通常,我们使用spark-submit命令行实用程序运行 PySpark 任务。您可以在此处
阅读其自己的文档,了解有关以这种方式提交 Spark 作业的原因和含义的更多信息。 当使用 Taipy 进行工作流编排时,我们可以继续执行相同的操作。 唯一的区别是,我们不是在命令行中运行命令,而是让工作流管道生成一个子进程,该子进程使用spark-submit运行 Spark 应用程序。 在深入研究之前,让我们先看一下我们的 Spark 应用程序。 只需浏览一下代码,然后继续阅读以简要了解此脚本的作用:
### app/penguin_spark_app.py
import argparse
import os
import sys
parser = argparse.ArgumentParser()
parser.add_argument("--input-csv-path", required=True, help="Path to the input penguin CSV file.")
parser.add_argument("--output-csv-path", required=True, help="Path to save the output CSV file.")
args = parser.parse_args()
import pyspark.pandas as ps
from pyspark.sql import SparkSession
def read_penguin_df(csv_path: str):
penguin_df = ps.read_csv(csv_path)
return penguin_df
def clean(df: ps.DataFrame) -> ps.DataFrame:
return df[df.sex.isin(["male", "female"])].dropna()
def process(df: ps.DataFrame) -> ps.DataFrame:
"""The mean of measured penguin values, grouped by island and sex."""
mean_df = df.groupby(by=["species", "island", "sex"]).agg("mean").drop(columns="year").reset_index()
return mean_df
if __name__ == "__main__":
spark = SparkSession.builder.appName("Mean Penguin").getOrCreate()
penguin_df = read_penguin_df(args.input_csv_path)
cleaned_penguin_df = clean(penguin_df)
processed_penguin_df = process(cleaned_penguin_df)
processed_penguin_df.to_pandas().to_csv(args.output_csv_path, index=False)
sys.exit(os.EX_OK)
我们可以通过在终端中输入以下命令来提交此 Spark 应用程序以供执行:
spark-submit --master local[8] app/penguin_spark_app.py \
--input-csv-path app/penguins.csv \
--output-csv-path app/output.csv
这将执行以下操作:
- 提交penguin_spark_app.py应用程序在 8 个 CPU 核心上本地执行;
- 从app/penguins.csv CSV 文件加载数据;
- 按“物种”、“岛屿”和“性别”分组,然后按平均值汇总剩余的列;
- 将结果 DataFrame 保存到app/output.csv。
此后, app/output.csv的内容应准确如下:
另请注意,我们已经对Spark 应用程序进行了编码以接收 2 个命令行参数:
- - input-csv-path :输入企鹅 CSV 文件的路径;和
- - output-csv-path :Spark 应用程序处理后保存输出 CSV 文件的路径。
2. Taipy 配置(config.py)
此时,我们有了penguin_spark_app.py PySpark 应用程序,并且需要创建一个Taipy 任务来运行这个 PySpark 应用程序。
再次快速浏览一下app/config.py脚本,然后继续阅读:
### app/config.py
import datetime as dt
import os
import subprocess
import sys
from pathlib import Path
import pandas as pd
import taipy as tp
from taipy import Config
SCRIPT_DIR = Path(__file__).parent
SPARK_APP_PATH = SCRIPT_DIR / "penguin_spark_app.py"
input_csv_path = str(SCRIPT_DIR / "penguins.csv")
# -------------------- Data Nodes --------------------
input_csv_path_cfg = Config.configure_data_node(id="input_csv_path", default_data=input_csv_path)
# Path to save the csv output of the spark app
output_csv_path_cfg = Config.configure_data_node(id="output_csv_path")
processed_penguin_df_cfg = Config.configure_parquet_data_node(
id="processed_penguin_df", validity_period=dt.timedelta(days=1)
)
species_cfg = Config.configure_data_node(id="species") # "Adelie", "Chinstrap", "Gentoo"
island_cfg = Config.configure_data_node(id="island") # "Biscoe", "Dream", "Torgersen"
sex_cfg = Config.configure_data_node(id="sex") # "male", "female"
output_cfg = Config.configure_json_data_node(
id="output",
)
# -------------------- Tasks --------------------
def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame:
proc = subprocess.Popen(
[
str(Path(sys.executable).with_name("spark-submit")),
str(SPARK_APP_PATH),
"--input-csv-path",
input_csv_path,
"--output-csv-path",
output_csv_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
outs, errs = proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
if proc.returncode != os.EX_OK:
raise Exception("Spark training failed")
df = pd.read_csv(output_csv_path)
return df
def filter(penguin_df: pd.DataFrame, species: str, island: str, sex: str) -> dict:
df = penguin_df[(penguin_df.species == species) & (penguin_df.island == island) & (penguin_df.sex == sex)]
output = df[["bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"]].to_dict(orient="records")
return output[0] if output else dict()
spark_process_task_cfg = Config.configure_task(
id="spark_process",
function=spark_process,
skippable=True,
input=[input_csv_path_cfg, output_csv_path_cfg],
output=processed_penguin_df_cfg,
)
filter_task_cfg = Config.configure_task(
id="filter",
function=filter,
skippable=True,
input=[processed_penguin_df_cfg, species_cfg, island_cfg, sex_cfg],
output=output_cfg,
)
scenario_cfg = Config.configure_scenario(
id="scenario", task_configs=[spark_process_task_cfg, filter_task_cfg]
)
您还可以使用Taipy Studio构建 Taipy 配置,Taipy Studio 是一个 Visual Studio Code 扩展,它提供了用于构建 Taipy .toml配置文件的图形编辑器。
Taipy 中的 PySpark 任务
我们对生成 DAG 这一部分的代码段特别感兴趣:
让我们提取并检查config.py脚本的相关部分,该脚本在 Taipy 中创建“ spark_process ”Spark 任务(及其 3 个相关数据节点),如上图所示:
### Code snippet: Spark task in Taipy
# -------------------- Data Nodes --------------------
input_csv_path_cfg = Config.configure_data_node(id="input_csv_path", default_data=input_csv_path)
# Path to save the csv output of the spark app
output_csv_path_cfg = Config.configure_data_node(id="output_csv_path")
processed_penguin_df_cfg = Config.configure_parquet_data_node(
id="processed_penguin_df", validity_period=dt.timedelta(days=1)
)
# -------------------- Tasks --------------------
def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame:
proc = subprocess.Popen(
[
str(Path(sys.executable).with_name("spark-submit")),
str(SPARK_APP_PATH),
"--input-csv-path",
input_csv_path,
"--output-csv-path",
output_csv_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
outs, errs = proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
if proc.returncode != os.EX_OK:
raise Exception("Spark training failed")
df = pd.read_csv(output_csv_path)
return df
spark_process_task_cfg = Config.configure_task(
id="spark_process",
function=spark_process,
skippable=True,
input=[input_csv_path_cfg, output_csv_path_cfg],
output=processed_penguin_df_cfg,
)
由于我们设计了penguin_spark_app.py Spark 应用程序来接收两个参数(input_csv_path和output_csv_path),因此我们选择将这两个参数表示为 Taipy 数据节点。
请注意,您的用例可能有所不同,您可以(并且应该!)根据需要修改任务、函数和关联的数据节点
。 例如,您可以:
- 有一个 Spark 任务执行一些常规 ETL 但不返回任何内容;
- 倾向于对输入和输出路径进行硬编码,而不是将它们持久化为数据节点;或者
- 将额外的应用程序参数保存为数据节点并将其传递给 Spark 应用程序。
然后,我们将spark-submit作为 Python 子进程运行,如下所示:
subprocess.Popen(
[
str(Path(sys.executable).with_name("spark-submit")),
str(SPARK_APP_PATH),
"--input-csv-path",
input_csv_path,
"--output-csv-path",
output_csv_path,
],
)
回想一下,列表元素的顺序应该保留以下格式,就像它们是在命令行上执行的一样:
$ spark-submit [spark-arguments] <pyspark-app-path> [application-arguments]
同样,根据我们的用例,我们可以根据需要指定不同的 spark-submit 脚本路径、Spark 参数(我们在示例中未提供任何参数)或不同的应用程序参数。
读取并返回output_csv_path
请注意,spark_process函数的结束方式如下:
def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame:
...
df = pd.read_csv(output_csv_path)
return df
在本例中,我们希望 Taipy 任务在 Spark 处理数据后将其输出,以便将其写入processed_penguin_df_cfg Parquet 数据节点。
一种方法是手动读取输出目标(在本例中为output_csv_path),然后将其作为 Pandas DataFrame 返回。
但是,如果您不需要 Spark 应用程序的返回数据,则只需让 Taipy 任务(通过spark_process函数)返回None即可。
缓存 Spark 任务
由于我们已将spark_process_task_cfg的skippable 属性设置为True,因此在重新执行该场景时,Taipy 将跳过spark_process任务的重新执行 ,并重用持久化的任务输出:processed_penguin_df_cfg Pandas DataFrame。 然而,我们还为processed_penguin_df_cfg数据节点定义了1 天的validity_period,因此即使 DataFrame 上次缓存的时间超过一天,Taipy 仍会重新运行该任务。
3.构建GUI(main.py)
我们将通过构建本文开头看到的GUI来完成我们的应用程序:
如果您不熟悉 Taipy 的 GUI 功能,可以在此处找到快速入门。
无论如何,您可以直接复制并粘贴以下app/main.py代码,因为这不是我们的重点:
### app/main.py
from pathlib import Path
from typing import Optional
import taipy as tp
from config import scenario_cfg
from taipy.gui import Gui, notify
valid_features: dict[str, list[str]] = {
"species": ["Adelie", "Chinstrap", "Gentoo"],
"island": ["Torgersen", "Biscoe", "Dream"],
"sex": ["Male", "Female"],
}
selected_species = valid_features["species"][0]
selected_island = valid_features["island"][0]
selected_sex = valid_features["sex"][0]
selected_scenario: Optional[tp.Scenario] = None
data_dir = Path(__file__).with_name("data")
data_dir.mkdir(exist_ok=True)
def scenario_on_creation(state, id, payload):
_ = payload["config"]
date = payload["date"]
label = payload["label"]
properties = payload["properties"]
# Create scenario with selected configuration
scenario = tp.create_scenario(scenario_cfg, creation_date=date, name=label)
scenario.properties.update(properties)
# Write the selected GUI values to the scenario
scenario.species.write(state.selected_species)
scenario.island.write(state.selected_island)
scenario.sex.write(state.selected_sex.lower())
output_csv_file = data_dir / f"{scenario.id}.csv"
scenario.output_csv_path.write(str(output_csv_file))
notify(state, "S", f"Created {scenario.id}")
return scenario
def scenario_on_submission_change(state, submittable, details):
"""When the selected_scenario's submission status changes, reassign selected_scenario to force a GUI refresh."""
state.selected_scenario = submittable
selected_data_node = None
main_md = """
<|layout|columns=1 4|gap=1.5rem|
<lhs|part|
# Spark with **Taipy**{: .color-primary}
## Scenario
<|{selected_scenario}|scenario_selector|on_creation=scenario_on_creation|>
----------
## Scenario info
<|{selected_scenario}|scenario|on_submission_change=scenario_on_submission_change|>
|lhs>
<rhs|part|render={selected_scenario}|
## Selections
<selections|layout|columns=1 1 1 2|gap=1.5rem|
<|{selected_species}|selector|lov={valid_features["species"]}|dropdown|label=Species|>
<|{selected_island}|selector|lov={valid_features["island"]}|dropdown|label=Island|>
<|{selected_sex}|selector|lov={valid_features["sex"]}|dropdown|label=Sex|>
|selections>
----------
## Output
**<|{str(selected_scenario.output.read()) if selected_scenario and selected_scenario.output.is_ready_for_reading else 'Submit the scenario using the left panel.'}|text|raw|class_name=color-primary|>**
## Data node inspector
<|{selected_data_node}|data_node_selector|display_cycles=False|>
**Data node value:**
<|{str(selected_data_node.read()) if selected_data_node and selected_data_node.is_ready_for_reading else None}|>
<br/>
----------
## DAG
<|Scenario DAG|expandable|
<|{selected_scenario}|scenario_dag|>
|>
|rhs>
|>
"""
def on_change(state, var_name: str, var_value):
if var_name == "selected_species":
state.selected_scenario.species.write(var_value)
elif var_name == "selected_island":
state.selected_scenario.island.write(var_value)
elif var_name == "selected_sex":
state.selected_scenario.sex.write(var_value.lower())
if __name__ == "__main__":
tp.Core().run()
gui = Gui(main_md)
gui.run(title="Spark with Taipy")
然后,从项目文件夹中,您可以像这样运行主脚本:
$ taipy run app/main.py
结论
现在您已经了解了如何将 PySpark 与 Taipy 结合使用的示例,不妨尝试使用这两个工具来增强您自己的数据应用程序!
如果您曾因其他工作流编排工具拖慢您的工作进度并造成阻碍而苦恼,请不要因此而放弃尝试 Taipy。Taipy
易于使用,并且致力于不限制您使用哪些第三方软件包—— 其强大而灵活的框架使其能够轻松适配任何数据应用程序。

希望您喜欢这篇文章!
您可以在此存储库中找到所有代码和数据。
文章来源:https://dev.to/taipy/how-to-master-big-data-pipelines-with-taipy-and-pyspark-14oe