数据工程 101:自动化您的第一个数据提取 创建 JSON 提取函数 使用 Airflow 将数据加载到 MySQL 设置您的 Airflow 管道 完成您的第一个数据管道

2025-06-10

数据工程 101:自动化您的第一次数据提取

创建 JSON 提取函数

使用 Airflow 将数据加载到 MySQL

设置气流管道

完成您的第一个数据管道

照片由 Dennis Kummer 在 Unsplash上拍摄。

在过去的几周里,我们讨论了 数据工程和自动化领域的几个重要话题。

我们已经为理解数据工程师使用的词汇和基本概念奠定了基础 。现在是时候开始构建您的第一组批处理作业了。

为此,我们将使用 Apache Airflow 库来帮助自动化我们的工作。

此外,我们将使用 sfgov 311 数据集作为实时数据源。您可以随时提取此信息,获取有关 311 报告的最新数据。这些数据通常涉及故意破坏、违规停车等情况。

对于这个管道,我们首先将数据提取到原始 CSV 中,然后将其加载到 MySQL 数据库中。

在本文中,我们将概述执行此操作所需的代码,并指出我们采取各个步骤的一些原因。


创建 JSON 提取函数

在开始之前,您需要设置一个 Airflow 环境,以便能够按照本文讨论的每个步骤进行操作。如果您还没有这样做,我们认为这篇 文章是我们个人最喜欢的文章之一

在创建数据管道时,尤其是那些更面向批处理的管道,将数据提取到原始数据层是有益的。这样做可以让你备份原始数据。

当数据出现错误时,原始文件可以帮助您确定数据错误是出在源中还是出在整体过程中。

在本例中,我们将从在线的 JSON 数据集中提取数据。我们可以使用名为 的 pandas 函数来实现 [read_json](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html)。该函数可以读取文件或 URL。

我们将通过创建一个可调用的函数来实现这一点,该函数将从 URL 或文件中提取基于 JSON 的数据。

我们还可以给文件名添加时间戳,以确保我们知道何时提取了数据。这可以在以后尝试跟踪数据变化时使用,这些变化可能更像是快照而不是更新数据。

这可以通过使用 DateTime 对象来完成,如下所示:

该文件名稍后将与我们下面创建的提取功能一起使用:


# Step 1: Import all necessary packages.

# For scheduling
import datetime as dt

# For function jsonToCsv
import pandas as pd

# For function csvToSql
import csv
import pymysql

# Backwards compatibility of pymysql to mysqldb
pymysql.install_as_MySQLdb()

# Importing MySQLdb now
import MySQLdb

# For Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


# Step 2: Define functions for operators.

# A JSON string reader to .csv writer function.
def jsonToCsv(url, outputcsv):

    # Reads the JSON string into a pandas DataFrame object.
    data = pd.read_json(url)

    # Convert the object to a .csv file.
    # It is unnecessary to separate the JSON reading and the .csv writing.
    data.to_csv(outputcsv)

    return 'Read JSON and written to .csv'

这项任务仍需实现。不过,一旦加载功能设置完成,我们就会最终向您展示。


使用 Airflow 将数据加载到 MySQL

一旦您有了摘录,下一步就是将数据加载到数据仓库中的某种原始层中。

这一步的关键在于不要操纵数据。因为这样,即使你的数据存在某种数据问题,或者数据源本身存在问题,也更容易追溯。

您可以通过在每个步骤中进行数据质量检查来实现这一点。在原始检查中,您通常会检查数据类型是否合理。

例如,所有日期字段都是日期吗?所有州都是有效州吗?信不信由你,我们在这里遇到过问题。“WE”不是州的缩写。

这些是健全性检查,以确保您提取的数据是正确的。

理论上,你的应用程序应该仔细检查用户输入。然而,我们永远不会信任应用层。

除此之外,您可以使用下面的代码。您会注意到,我们首先使用 MySQL 建立数据库连接,然后逐行加载 CSV 文件:

def csvToSql():

    # Attempt connection to a database
    try:
        dbconnect = MySQLdb.connect(
                host='localhost',
                user='root',
                passwd='databasepwd',
                db='mydb'
                )
    except:
        print('Can\'t connect.')

    # Define a cursor iterator object to function and to traverse the database.
    cursor = dbconnect.cursor()
    # Open and read from the .csv file
    with open('./rogoben.csv') as csv_file:

        # Assign the .csv data that will be iterated by the cursor.
        csv_data = csv.reader(csv_file)

        # Insert data using SQL statements and Python
        for row in csv_data:
            cursor.execute(
            'INSERT INTO rogobenDB3(number, docusignid, publicurl, filingtype, \
                    cityagencyname, cityagencycontactname, \
                    cityagencycontacttelephone, cityagencycontactemail, \
                    bidrfpnumber, natureofcontract, datesigned, comments, \
                    filenumber, originalfilingdate, amendmentdescription, \
                    additionalnamesrequired, signername, signertitle) ' \
                    'VALUES("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", \
                    "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s")',
                    row
                    )

    # Commit the changes
    dbconnect.commit()

    '''
    # Print all rows - FOR DEBUGGING ONLY
    cursor.execute("SELECT * FROM rogobenDB3")
    rows = cursor.fetchall()

    print(cursor.rowcount)
    for row in rows:
        print(row)
    '''

    # Close the connection
    cursor.close()

    # Confirm completion
    return 'Read .csv and written to the MySQL database'

如果您正在构建一个更强大的系统,那么您可能会设置某种形式的数据库管理器类,该类只采用您导入的连接字符串。

但是,由于我们只是为了演示而构建它,所以我们将所有代码放在一个函数中。

设置完所有这些功能后,您现在可以正式设置您的 DAG 了。 正如我在之前的文章中提到的,DAG 就像一张流程图。它指导哪些任务首先运行,以及哪些任务依赖于其他任务。

在我们的例子中,我们有一个提取数据的任务,以及另一个将数据加载到 MySQL 表中的任务。这两个基本任务将帮助您启动管道,它看起来如下所示。


设置气流管道

现在有了所有这些功能,我们就可以设置管道了。

在 Airflow 中设置实际的管道需要设置一组默认参数。这允许您设置所有者、开始日期、管道重试频率以及其他几个参数:

# Step 3: Define the DAG, i.e. the workflow

# DAG's arguments
default_args = {
        'owner': 'rogoben',
        'start_date':dt.datetime(2020, 4, 16, 11, 00, 00),
        'concurrency': 1,
        'retries': 0
        }

# DAG's operators, or bones of the workflow
with DAG('parsing_govt_data',
        catchup=False, # To skip any intervals we didn't run
        default_args=default_args,
        schedule_interval='* 1 * * * *', # 's m h d mo y'; set to run every minute.
        ) as dag:

    opr_json_to_csv = PythonOperator(
            task_id='json_to_csv',
            python_callable=jsonToCsv,
            op_kwargs={
                'url':'https://data.sfgov.org/resource/pv99-gzft.json',
                'outputcsv':'./rogoben.csv'
                }
            )

    opr_csv_to_sql = PythonOperator(
            task_id='csv_to_sql',
            python_callable=csvToSql
            )

# The actual workflow
opr_json_to_csv >> opr_csv_to_sql

除了参数之外,您还需要实际设置具体的操作符。在本例中,我们有两个函数: jsonToCSV 和 csvToSql。它们将在 中使用 PythonOperator。这允许您创建我们所谓的任务。

为了确保任务按照合理的顺序运行,您需要定义依赖关系。

你可以使用位移运算符来定义依赖关系。对于不熟悉位移运算符的人来说,它看起来像 >> 或 <<

在这种情况下,您可以将其定义如下 opr_json_to_csv >> opr_csv_to_sql

这确保了 opr_json_to_csv 在之前运行 opr_csv_to_sql

说实话,通过这种方式你会得到重复的数据加载。

为了处理重复数据,您可以加载原始层,然后检查以确保在后续的暂存层中没有加载重复数据。因此,我们暂时不用担心这个问题。

这样,您基本上就完成了您的第一个管道。

那么现在您将管道放在哪里?

为了运行此管道,您需要将其保存在 airflow/dags 您设置的文件夹中。如果您尚未设置,请参阅我们 推荐的 Airflow 设置指南

airflow                  # airflow root directory.
├── dags                 # the dag root folder
│   ├── first_dag.py        # where you put your first task

一旦保存了此管道 --- 并且只要 Airflow 在后台运行 --- 您的 DAG 就会自动被 Airflow 拾取。

您可以通过访问 localhost:8080 进行检查,这是 Airflow 仪表板默认运行的地方。

从那里,您的 DAG 应该会出现。

一旦它出现,您就可以开始查看您的 DAG 以确保所有各种任务都已设置。

它看起来如下图所示:

现在您的管道已准备就绪。


完成您的第一个数据管道

恭喜您成功构建并自动化了您的第一个 Airflow 数据管道!现在,您可以将这个框架运用到其他 ETL 和数据管道中。

当然,这只是数据平台或数据仓库的第一层。从这里开始,您仍然需要创建生产层、指标层以及某种数据可视化或数据科学层。

然后您就可以真正开始利用您的数据产生影响。

如果您想了解更多有关数据科学、云计算和技术的信息,请查看以下文章!

数据工程 101:编写你的第一个管道

5 个管理大数据的优秀库

云计算有哪些不同类型

4 个简单的 Python 想法,实现工作流程自动化

数据科学家必须具备的 4 项技能

SQL最佳实践---设计ETL视频

5 个使用 Python 管理大数据的优秀库

连接 DynamoDB 和 S3 中的数据以进行实时临时分析

鏂囩珷鏉ユ簮锛�https://dev.to/seattledataguy/data-engineering-101-automating-your-first-data-extract-g6j
PREV
数据工程 101:编写您的第一个管道批处理与流 ETL 工具选项 Airflow 中的管道 Luigi 中的管道那么您选择哪一个?
NEXT
给数据科学家和数据分析师的 4 个 SQL 技巧