数据工程 101:编写您的第一个管道批处理与流 ETL 工具选项 Airflow 中的管道 Luigi 中的管道那么您选择哪一个?

2025-06-10

数据工程 101:编写你的第一个管道

批处理与流

ETL 工具选项

Airflow 中的管道

Luigi 中的管道

那么你选择哪一个?

数据工程师的主要职责之一可以概括为将数据从 A 点传输到 B 点。

我们经常需要将数据从一个系统提取出来并导入另一个系统。这可能出于各种目的,包括分析、集成和机器学习。

但为了让数据流动起来,我们需要使用所谓的 ETL/数据管道。

这些是将数据从一个数据系统传输到另一个数据系统的过程。

作为数据工程师,我们需要回答的一个问题是,我们需要多久更新一次这些数据。这就是批处理与流处理的问题所在。目前,ETL/ELT 主要分为两种类型。


批处理与流

长期以来,几乎所有数据管道都被认为是批处理管道。这意味着管道通常每天、每小时、每周等运行一次。虽然有一定的时间间隔,但数据并非实时的。

批处理作业是指数据以块或批次的形式加载,而不是立即加载。因此,术语“批处理作业”指的是数据以批量方式加载。

将其与流数据进行比较,一旦新行被添加到应用程序数据库中,它就会传递到分析系统中。

这通常使用各种形式的发布/订阅或事件总线模型来实现。所有这些系统都允许事务数据几乎在事务发生时立即传递。

有人可能会问,为什么我们不把所有事情都用流式传输呢?一直有实时数据不是更好吗?

从某种程度上来说确实如此。但创建流媒体系统往往在技术上更具挑战性,维护起来也很困难。

尽管以正常间隔运行的批处理作业可能会失败,但它们不需要立即修复,因为它们通常需要几个小时或几天的时间才能再次运行。

相比之下,流媒体系统始终处于在线状态。故障和错误需要尽快修复。

目前,我们将专注于开发传统上更多的批处理作业。

除了为您的 ETL 选择整体范例之外,您还需要决定您的 ETL 工具。


ETL 工具选项

如果您只想进入编码部分,请直接跳至下面的部分。

但是,如果不参考数据团队必须使用的几个选项,我们就无法在开发数据管道方面取得太大进展。

有大量的数据管道和工作流自动化工具。

让我们将它们分解为两个具体选项。

拖放与框架。

拖放选项使您几乎不需要了解任何代码——这就像 SSIS 和 Informatica。

对于几乎不需要实现自定义代码的人来说,这些非常有用。

尽管许多此类工具都提供了可添加的自定义代码,但这有点违背了目的。

如果您的团队能够编写代码,我们发现使用框架编写管道更有益,因为它们通常可以更好地进行调整。尽管 Informatica 功能强大,只要您负担得起,它就能完成很多繁重的工作。

即便如此,许多人仍然依赖基于代码的框架来实现 ETL(一些公司,如 Airbnb 和 Spotify,已经开发了自己的框架)。

这些框架通常用 Python 实现,分别称为 Airflow 和 Luigi。这两个框架都可以用作工作流,并提供各种优势。

但现在,让我们看看在 Airflow 和 Luigi 中构建基本管道是什么样的。

在后续的文章中,我们会更多地讨论设计。但目前,我们只是演示如何编写 ETL 管道。


Airflow 中的管道

为了在 Airflow 中创建管道,您需要设置几个特定的​​配置。

您需要设置一组参数,然后还需要使用这些默认参数调用您正在创建的实际 DAG。

请参阅下面的配置。

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}


dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

这只是您的 DAG 的基础。

您可以设置实际数据管道的运行频率等内容——例如,如果您想每天运行计划,请使用以下代码参数。例如,您可以使用schedule_interval='@daily'。或者,您也可以改用 cron,如下所示:  schedule_interval='0 0 * * *'

设置好基线配置后,即可开始组合 Airflow 的操作符。

操作符本质上是你想要完成的独立任务。例如提取数据、移动文件、运行某些数据转换等等。

例如,如果您查看下文,我们会用到几个运算符。其中包括 PythonOperator 和 BashOperator。

这允许您在 Python 或 bash 中运行命令并在所述任务之间创建依赖关系。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}

dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

dummy_task  = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

我们在这里对 Airflow 管道进行了更深入的探讨。但其总体要点如下。

您可以继续创建更多任务或开发抽象来帮助管理管道的复杂性。但上面对 Airflow 操作符的用法已经是一个很好的介绍。

现在来说说路易吉。


Luigi 中的管道

Luigi 是另一个可用于开发管道的工作流框架。在某些方面,我们觉得它更简单,而在其他方面,它很快就会变得更加复杂。

我们个人认为 Luigi 更简单的原因是它将主要任务分为三个主要步骤。

这些可以在 Luigi 定义的“任务”中看到。

任务

在 Luigi 任务中,最常用的三类函数是 require()、run() 和 output()。

这些功能在 Luigi 中分别起什么作用?

require() 与 airflow 中的依赖项类似。

您实际上是在引用先前的任务类、文件输出或其他输出。

例如:


import luigi

class BasicTask(luigi.Task):

  def requires(self): 
    [FileExistsTask(self.input_filepath)]

在这种情况下,requires 函数正在等待文件到达。

但它也可以等待某个任务完成或其他输出。
并非每个任务都需要一个 require 函数。但它可以用来引用需要完成的先前任务,以便当前任务能够启动。

但是任务确实需要 run() 函数。run() 函数本质上就是实际的任务本身。你想完成什么?
例如:

class LoadTask(luigi.Task):

file_path = luigi.Parameter()
    def run(self):
        cnx = mysql.connector.connect(user='joe', database='test')
        stmt = "insert into basic_date(col1,col2,col3)  select distinct col1, col2, col3 from table1" 
        curs=cnx.cursor()
        curs.execute(stmt)
        curs.commit()
        curs.close()
        with self.output().open('w') as out_file:
            print >> out_file, strDate1, strDate2
    def output(self):
            return luigi.file.LocalTarget(path='/tmp/123')

任务的输出是一个目标,可以是本地文件系统上的文件、Amazon S3 上的文件、数据库中的某些数据等。

您可以看到这两个管道框架之间的细微差别。Airflow 被包装在一个特定的操作符中,而 Luigi 则被开发为一个更大的类。

最终,这种细微的差别可能会导致管道设计发生很大变化。


那么你选择哪一个?

就我个人而言,我们喜欢 Airflow 是因为它的社区规模更大。然而,在很多方面,Luigi 的入门门槛略低。它没有太多可用的操作符。相反,你可以决定每个任务的具体功能。

这不仅可以提供更多的自由,还可以为设计和开发提供更多的思考。

所以最终,你必须选择自己想要处理的事情。无论你选择什么框架,你的代码中总会有 bug。

祝你好运,感谢阅读!

数据工程 101:数据工程简介

云计算有哪些不同类型

4 个简单的 Python 想法,实现工作流程自动化

数据科学家必须具备的 4 项技能

SQL最佳实践---设计ETL视频

5 个使用 Python 管理大数据的优秀库

连接 DynamoDB 和 S3 中的数据以进行实时临时分析

鏂囩珷鏉ユ簮锛�https://dev.to/seattledataguy/data-engineering-101-writing-your-first-pipeline-2k23
PREV
10 行 Vanilla JS 代码实现屏幕录制完整示例创建视频流录制视频流将录制内容转换为 Blob
NEXT
数据工程 101:自动化您的第一个数据提取 创建 JSON 提取函数 使用 Airflow 将数据加载到 MySQL 设置您的 Airflow 管道 完成您的第一个数据管道