数据工程 101:自动化您的第一次数据提取
创建 JSON 提取函数
使用 Airflow 将数据加载到 MySQL
设置气流管道
完成您的第一个数据管道
照片由 Dennis Kummer 在 Unsplash上拍摄。
在过去的几周里,我们讨论了 数据工程和自动化领域的几个重要话题。
我们已经为理解数据工程师使用的词汇和基本概念奠定了基础 。现在是时候开始构建您的第一组批处理作业了。
为此,我们将使用 Apache Airflow 库来帮助自动化我们的工作。
此外,我们将使用 sfgov 311 数据集作为实时数据源。您可以随时提取此信息,获取有关 311 报告的最新数据。这些数据通常涉及故意破坏、违规停车等情况。
对于这个管道,我们首先将数据提取到原始 CSV 中,然后将其加载到 MySQL 数据库中。
在本文中,我们将概述执行此操作所需的代码,并指出我们采取各个步骤的一些原因。
创建 JSON 提取函数
在开始之前,您需要设置一个 Airflow 环境,以便能够按照本文讨论的每个步骤进行操作。如果您还没有这样做,我们认为这篇 文章是我们个人最喜欢的文章之一。
在创建数据管道时,尤其是那些更面向批处理的管道,将数据提取到原始数据层是有益的。这样做可以让你备份原始数据。
当数据出现错误时,原始文件可以帮助您确定数据错误是出在源中还是出在整体过程中。
在本例中,我们将从在线的 JSON 数据集中提取数据。我们可以使用名为 的 pandas 函数来实现 [read_json](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html)
。该函数可以读取文件或 URL。
我们将通过创建一个可调用的函数来实现这一点,该函数将从 URL 或文件中提取基于 JSON 的数据。
我们还可以给文件名添加时间戳,以确保我们知道何时提取了数据。这可以在以后尝试跟踪数据变化时使用,这些变化可能更像是快照而不是更新数据。
这可以通过使用 DateTime
对象来完成,如下所示:
该文件名稍后将与我们下面创建的提取功能一起使用:
# Step 1: Import all necessary packages.
# For scheduling
import datetime as dt
# For function jsonToCsv
import pandas as pd
# For function csvToSql
import csv
import pymysql
# Backwards compatibility of pymysql to mysqldb
pymysql.install_as_MySQLdb()
# Importing MySQLdb now
import MySQLdb
# For Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Step 2: Define functions for operators.
# A JSON string reader to .csv writer function.
def jsonToCsv(url, outputcsv):
# Reads the JSON string into a pandas DataFrame object.
data = pd.read_json(url)
# Convert the object to a .csv file.
# It is unnecessary to separate the JSON reading and the .csv writing.
data.to_csv(outputcsv)
return 'Read JSON and written to .csv'
这项任务仍需实现。不过,一旦加载功能设置完成,我们就会最终向您展示。
使用 Airflow 将数据加载到 MySQL
一旦您有了摘录,下一步就是将数据加载到数据仓库中的某种原始层中。
这一步的关键在于不要操纵数据。因为这样,即使你的数据存在某种数据问题,或者数据源本身存在问题,也更容易追溯。
您可以通过在每个步骤中进行数据质量检查来实现这一点。在原始检查中,您通常会检查数据类型是否合理。
例如,所有日期字段都是日期吗?所有州都是有效州吗?信不信由你,我们在这里遇到过问题。“WE”不是州的缩写。
这些是健全性检查,以确保您提取的数据是正确的。
理论上,你的应用程序应该仔细检查用户输入。然而,我们永远不会信任应用层。
除此之外,您可以使用下面的代码。您会注意到,我们首先使用 MySQL 建立数据库连接,然后逐行加载 CSV 文件:
def csvToSql():
# Attempt connection to a database
try:
dbconnect = MySQLdb.connect(
host='localhost',
user='root',
passwd='databasepwd',
db='mydb'
)
except:
print('Can\'t connect.')
# Define a cursor iterator object to function and to traverse the database.
cursor = dbconnect.cursor()
# Open and read from the .csv file
with open('./rogoben.csv') as csv_file:
# Assign the .csv data that will be iterated by the cursor.
csv_data = csv.reader(csv_file)
# Insert data using SQL statements and Python
for row in csv_data:
cursor.execute(
'INSERT INTO rogobenDB3(number, docusignid, publicurl, filingtype, \
cityagencyname, cityagencycontactname, \
cityagencycontacttelephone, cityagencycontactemail, \
bidrfpnumber, natureofcontract, datesigned, comments, \
filenumber, originalfilingdate, amendmentdescription, \
additionalnamesrequired, signername, signertitle) ' \
'VALUES("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", \
"%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s")',
row
)
# Commit the changes
dbconnect.commit()
'''
# Print all rows - FOR DEBUGGING ONLY
cursor.execute("SELECT * FROM rogobenDB3")
rows = cursor.fetchall()
print(cursor.rowcount)
for row in rows:
print(row)
'''
# Close the connection
cursor.close()
# Confirm completion
return 'Read .csv and written to the MySQL database'
如果您正在构建一个更强大的系统,那么您可能会设置某种形式的数据库管理器类,该类只采用您导入的连接字符串。
但是,由于我们只是为了演示而构建它,所以我们将所有代码放在一个函数中。
设置完所有这些功能后,您现在可以正式设置您的 DAG 了。 正如我在之前的文章中提到的,DAG 就像一张流程图。它指导哪些任务首先运行,以及哪些任务依赖于其他任务。
在我们的例子中,我们有一个提取数据的任务,以及另一个将数据加载到 MySQL 表中的任务。这两个基本任务将帮助您启动管道,它看起来如下所示。
设置气流管道
现在有了所有这些功能,我们就可以设置管道了。
在 Airflow 中设置实际的管道需要设置一组默认参数。这允许您设置所有者、开始日期、管道重试频率以及其他几个参数:
# Step 3: Define the DAG, i.e. the workflow
# DAG's arguments
default_args = {
'owner': 'rogoben',
'start_date':dt.datetime(2020, 4, 16, 11, 00, 00),
'concurrency': 1,
'retries': 0
}
# DAG's operators, or bones of the workflow
with DAG('parsing_govt_data',
catchup=False, # To skip any intervals we didn't run
default_args=default_args,
schedule_interval='* 1 * * * *', # 's m h d mo y'; set to run every minute.
) as dag:
opr_json_to_csv = PythonOperator(
task_id='json_to_csv',
python_callable=jsonToCsv,
op_kwargs={
'url':'https://data.sfgov.org/resource/pv99-gzft.json',
'outputcsv':'./rogoben.csv'
}
)
opr_csv_to_sql = PythonOperator(
task_id='csv_to_sql',
python_callable=csvToSql
)
# The actual workflow
opr_json_to_csv >> opr_csv_to_sql
除了参数之外,您还需要实际设置具体的操作符。在本例中,我们有两个函数: jsonToCSV
和 csvToSql
。它们将在 中使用 PythonOperator
。这允许您创建我们所谓的任务。
为了确保任务按照合理的顺序运行,您需要定义依赖关系。
你可以使用位移运算符来定义依赖关系。对于不熟悉位移运算符的人来说,它看起来像 >>
或 <<
。
在这种情况下,您可以将其定义如下 opr_json_to_csv >> opr_csv_to_sql
。
这确保了 opr_json_to_csv
在之前运行 opr_csv_to_sql
。
说实话,通过这种方式你会得到重复的数据加载。
为了处理重复数据,您可以加载原始层,然后检查以确保在后续的暂存层中没有加载重复数据。因此,我们暂时不用担心这个问题。
这样,您基本上就完成了您的第一个管道。
那么现在您将管道放在哪里?
为了运行此管道,您需要将其保存在 airflow/dags
您设置的文件夹中。如果您尚未设置,请参阅我们 推荐的 Airflow 设置指南。
airflow # airflow root directory.
├── dags # the dag root folder
│ ├── first_dag.py # where you put your first task
一旦保存了此管道 --- 并且只要 Airflow 在后台运行 --- 您的 DAG 就会自动被 Airflow 拾取。
您可以通过访问 localhost:8080 进行检查,这是 Airflow 仪表板默认运行的地方。
从那里,您的 DAG 应该会出现。
一旦它出现,您就可以开始查看您的 DAG 以确保所有各种任务都已设置。
它看起来如下图所示:
现在您的管道已准备就绪。
完成您的第一个数据管道
恭喜您成功构建并自动化了您的第一个 Airflow 数据管道!现在,您可以将这个框架运用到其他 ETL 和数据管道中。
当然,这只是数据平台或数据仓库的第一层。从这里开始,您仍然需要创建生产层、指标层以及某种数据可视化或数据科学层。
然后您就可以真正开始利用您的数据产生影响。
如果您想了解更多有关数据科学、云计算和技术的信息,请查看以下文章!
连接 DynamoDB 和 S3 中的数据以进行实时临时分析
鏂囩珷鏉ユ簮锛�https://dev.to/seattledataguy/data-engineering-101-automating-your-first-data-extract-g6j