数据工程 101:编写你的第一个管道
批处理与流
ETL 工具选项
Airflow 中的管道
Luigi 中的管道
那么你选择哪一个?
数据工程师的主要职责之一可以概括为将数据从 A 点传输到 B 点。
我们经常需要将数据从一个系统提取出来并导入另一个系统。这可能出于各种目的,包括分析、集成和机器学习。
但为了让数据流动起来,我们需要使用所谓的 ETL/数据管道。
这些是将数据从一个数据系统传输到另一个数据系统的过程。
作为数据工程师,我们需要回答的一个问题是,我们需要多久更新一次这些数据。这就是批处理与流处理的问题所在。目前,ETL/ELT 主要分为两种类型。
批处理与流
长期以来,几乎所有数据管道都被认为是批处理管道。这意味着管道通常每天、每小时、每周等运行一次。虽然有一定的时间间隔,但数据并非实时的。
批处理作业是指数据以块或批次的形式加载,而不是立即加载。因此,术语“批处理作业”指的是数据以批量方式加载。
将其与流数据进行比较,一旦新行被添加到应用程序数据库中,它就会传递到分析系统中。
这通常使用各种形式的发布/订阅或事件总线模型来实现。所有这些系统都允许事务数据几乎在事务发生时立即传递。
有人可能会问,为什么我们不把所有事情都用流式传输呢?一直有实时数据不是更好吗?
从某种程度上来说确实如此。但创建流媒体系统往往在技术上更具挑战性,维护起来也很困难。
尽管以正常间隔运行的批处理作业可能会失败,但它们不需要立即修复,因为它们通常需要几个小时或几天的时间才能再次运行。
相比之下,流媒体系统始终处于在线状态。故障和错误需要尽快修复。
目前,我们将专注于开发传统上更多的批处理作业。
除了为您的 ETL 选择整体范例之外,您还需要决定您的 ETL 工具。
ETL 工具选项
如果您只想进入编码部分,请直接跳至下面的部分。
但是,如果不参考数据团队必须使用的几个选项,我们就无法在开发数据管道方面取得太大进展。
有大量的数据管道和工作流自动化工具。
让我们将它们分解为两个具体选项。
拖放与框架。
拖放选项使您几乎不需要了解任何代码——这就像 SSIS 和 Informatica。
对于几乎不需要实现自定义代码的人来说,这些非常有用。
尽管许多此类工具都提供了可添加的自定义代码,但这有点违背了目的。
如果您的团队能够编写代码,我们发现使用框架编写管道更有益,因为它们通常可以更好地进行调整。尽管 Informatica 功能强大,只要您负担得起,它就能完成很多繁重的工作。
即便如此,许多人仍然依赖基于代码的框架来实现 ETL(一些公司,如 Airbnb 和 Spotify,已经开发了自己的框架)。
这些框架通常用 Python 实现,分别称为 Airflow 和 Luigi。这两个框架都可以用作工作流,并提供各种优势。
但现在,让我们看看在 Airflow 和 Luigi 中构建基本管道是什么样的。
在后续的文章中,我们会更多地讨论设计。但目前,我们只是演示如何编写 ETL 管道。
Airflow 中的管道
为了在 Airflow 中创建管道,您需要设置几个特定的配置。
您需要设置一组参数,然后还需要使用这些默认参数调用您正在创建的实际 DAG。
请参阅下面的配置。
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
这只是您的 DAG 的基础。
您可以设置实际数据管道的运行频率等内容——例如,如果您想每天运行计划,请使用以下代码参数。例如,您可以使用schedule_interval='@daily'
。或者,您也可以改用 cron,如下所示: schedule_interval='0 0 * * *'
。
设置好基线配置后,即可开始组合 Airflow 的操作符。
操作符本质上是你想要完成的独立任务。例如提取数据、移动文件、运行某些数据转换等等。
例如,如果您查看下文,我们会用到几个运算符。其中包括 PythonOperator 和 BashOperator。
这允许您在 Python 或 bash 中运行命令并在所述任务之间创建依赖关系。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_func():
print('Hello from my_func')
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)
我们在这里对 Airflow 管道进行了更深入的探讨。但其总体要点如下。
您可以继续创建更多任务或开发抽象来帮助管理管道的复杂性。但上面对 Airflow 操作符的用法已经是一个很好的介绍。
现在来说说路易吉。
Luigi 中的管道
Luigi 是另一个可用于开发管道的工作流框架。在某些方面,我们觉得它更简单,而在其他方面,它很快就会变得更加复杂。
我们个人认为 Luigi 更简单的原因是它将主要任务分为三个主要步骤。
这些可以在 Luigi 定义的“任务”中看到。
任务
在 Luigi 任务中,最常用的三类函数是 require()、run() 和 output()。
这些功能在 Luigi 中分别起什么作用?
require() 与 airflow 中的依赖项类似。
您实际上是在引用先前的任务类、文件输出或其他输出。
例如:
import luigi
class BasicTask(luigi.Task):
def requires(self):
[FileExistsTask(self.input_filepath)]
在这种情况下,requires 函数正在等待文件到达。
但它也可以等待某个任务完成或其他输出。
并非每个任务都需要一个 require 函数。但它可以用来引用需要完成的先前任务,以便当前任务能够启动。
但是任务确实需要 run() 函数。run() 函数本质上就是实际的任务本身。你想完成什么?
例如:
class LoadTask(luigi.Task):
file_path = luigi.Parameter()
def run(self):
cnx = mysql.connector.connect(user='joe', database='test')
stmt = "insert into basic_date(col1,col2,col3) select distinct col1, col2, col3 from table1"
curs=cnx.cursor()
curs.execute(stmt)
curs.commit()
curs.close()
with self.output().open('w') as out_file:
print >> out_file, strDate1, strDate2
def output(self):
return luigi.file.LocalTarget(path='/tmp/123')
任务的输出是一个目标,可以是本地文件系统上的文件、Amazon S3 上的文件、数据库中的某些数据等。
您可以看到这两个管道框架之间的细微差别。Airflow 被包装在一个特定的操作符中,而 Luigi 则被开发为一个更大的类。
最终,这种细微的差别可能会导致管道设计发生很大变化。
那么你选择哪一个?
就我个人而言,我们喜欢 Airflow 是因为它的社区规模更大。然而,在很多方面,Luigi 的入门门槛略低。它没有太多可用的操作符。相反,你可以决定每个任务的具体功能。
这不仅可以提供更多的自由,还可以为设计和开发提供更多的思考。
所以最终,你必须选择自己想要处理的事情。无论你选择什么框架,你的代码中总会有 bug。
祝你好运,感谢阅读!
连接 DynamoDB 和 S3 中的数据以进行实时临时分析
鏂囩珷鏉ユ簮锛�https://dev.to/seattledataguy/data-engineering-101-writing-your-first-pipeline-2k23