我如何使用 Airflow 将每月 10,000 美元的 AWS Glue 账单减少到 400 美元
在我担任Vance的 DevOps 工程师期间,我们在AWS Glue上运行了大约 80 条 ETL 管道,但随着工作负载的增加,成本也随之增加,每月高达10,000 美元。这种成本是不可持续的。在分析了我们的管道之后,我们意识到 AWS Glue 的无服务器特性导致我们在空闲时间和不必要的计算上花费了大量成本。
为了解决这个问题,我将我们的 ETL 工作负载迁移到了Apache Airflow,运行在基于 ECS 的 EC2 实例上,并使用 Terraform 进行编排。结果如何?成本降低了 96% ,每月账单仅需400 美元,而且性能丝毫不受影响。
虽然 Airflow 是 Glue 的绝佳替代方案,但关于如何利用 Terraform 和 Celery Executor 进行正确设置 Airflow 的文档却很少,尤其是在成本优化方面。本篇博客将带您了解我们的操作方法、面临的挑战,以及如何通过同样的方法降低 AWS Glue 成本。
不用说,这确实是一场战争,感谢我的经纪人Rishabh Lakhotia与我一起经历了这场闹剧,先生,您确实是一位神。
介绍
我是 Akash Singh,来自班加罗尔,工程专业三年级学生,也是一位开源贡献者。我的LinkedIn、GitHub和Twitter 账号
如下:
我在网上使用的名字是 SkySingh04。
疼痛的三个部分
从AWS Glue迁移到Apache Airflow涉及设置三个核心组件:
- Web 服务器– 用于管理 DAG(有向无环图)和监控作业执行情况的 UI。
- 调度程序——负责触发和调度 DAG 运行。
- 工作者——执行 DAG 中的实际任务。
使用Terraform,我们配置了 ECS 来并行运行这三个系统并使它们能够相互通信,接下来我们将讨论这一点。
一旦 Airflow 启动并运行,下一步就是迁移我们的 ETL 工作流。我们会将 Glue 作业集成到Airflow DAG中,然后删除 Glue 作业,这标志着降低 AWS 成本的最后一步。
神奇的 Dockerfile
您可以使用以下 Dockerfile 并将其推送到 ECR 并在即将到来的配置中引用它:
FROM apache/airflow:latest-python3.9
USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends \
git \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/airflow/dags /opt/airflow/logs && \
chown -R airflow:root /opt/airflow && \
chmod -R 755 /opt/airflow/logs
USER airflow
RUN pip install --no-cache-dir \
apache-airflow-providers-github \
apache-airflow-providers-amazon \
apache-airflow-providers-mysql \
apache-airflow-providers-mongo \
apache-airflow[celery,redis] \
pandas
COPY --chown=airflow:root dags/* /opt/airflow/dags/
ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs \
AIRFLOW__LOGGING__WORKER_LOG_SERVER_PORT=8793 \
AIRFLOW__LOGGING__LOGGING_LEVEL=INFO \
AIRFLOW__LOGGING__LOG_FORMAT='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s' \
AIRFLOW__LOGGING__SIMPLE_LOG_FORMAT='%(asctime)s %(levelname)s - %(message)s' \
AIRFLOW__LOGGING__DAG_PROCESSOR_LOG_TARGET=file \
AIRFLOW__LOGGING__TASK_LOG_READER=task \
AIRFLOW__LOGGING__DAG_FILE_PROCESSOR_LOG_TARGET=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log \
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
ENV AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
RUN mkdir -p /opt/airflow/logs/scheduler \
/opt/airflow/logs/web \
/opt/airflow/logs/worker \
/opt/airflow/logs/dag_processor_manager \
/opt/airflow/logs/task_logs
USER root
RUN chown -R airflow:root /opt/airflow && \
chmod -R 755 /opt/airflow
USER airflow
这个 Dockerfile 将用于我们所有三个组件,并且还能很好地设置日志记录。DAG 直接嵌入到 Docker 镜像中,我们稍后会讲到。构建镜像、添加标签、推送到 ECR,然后进入下一步!
Airflow Web 服务器
可以编写一个 Terraform 脚本,使用ECS(弹性容器服务)和 EC2 启动类型在 AWS 上设置Apache Airflow。我们需要确保添加:
-
CloudWatch 日志记录:
/ecs/airflow
创建一个保留期为 3 天的日志组 ( )。
-
安全组:
- 允许应用程序负载均衡器 (ALB) 的入站 HTTP(端口 80)和 HTTPS(端口 443)流量。
- 启用不受限制的出站流量。
-
带有 ACM 和 Route 53 的 TLS/SSL:
- 使用 DNS 验证为airflow.internal.example.com提供 ACM(AWS 证书管理器)证书。
- 配置 Route 53 DNS 记录以将 Airflow URL 解析到 ALB。
-
应用程序负载均衡器(ALB):
- 为 Airflow 网络服务器创建内部 ALB
dualstack
,支持 IPv4 和 IPv6( )。 - 配置 HTTP 侦听器(端口 80)以将流量重定向到 HTTPS(端口 443)。
- 设置 HTTPS 侦听器(端口 443)以将请求转发到 ECS 目标组。
- 为 Airflow 网络服务器创建内部 ALB
-
Airflow Web 服务器的 ECS 任务定义:
- 为在EC2 支持的 ECS 集群上运行的Airflow web 服务器定义一个 ECS 任务。
- 使用存储在 AWS ECR ( ) 中的Docker 镜像
aws_ecr_repository.airflow.repository_url:latest
。 - 分配2GB 内存(
2048MB
)。 - 将容器的8080端口映射到主机上,以供Web访问。
- 定义健康检查(
http://localhost:8080/health
)。
-
Airflow 的 ECS 服务:
- 创建一个名为“airflow-webserver”的 ECS 服务,其中包含 1 个所需任务。
- 将 ECS 服务与ALB 目标组关联以实现负载平衡。
- 允许
execute-command
通过 AWS SSM 进行调试。 - 使用容量提供商策略进行 ECS 资源管理。
-
DNS配置:
- 配置指向 ALB 的Route 53 A 记录( )。
airflow.internal.example.com
- 配置指向 ALB 的Route 53 A 记录( )。
Terraform 脚本在 ECS 任务定义中包含几个环境变量:
-
数据库连接(
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
):- 指定Airflow 元数据数据库的PostgreSQL数据库连接字符串。
- 使用 AWS KMS 加密密钥安全地存储数据库密码。
-
用户管理:
_AIRFLOW_WWW_USER_CREATE
:确保创建默认的 Airflow Web 用户。_AIRFLOW_WWW_USER_USERNAME
:设置用户名(默认值airflow
:)。_AIRFLOW_WWW_USER_PASSWORD
:通过 AWS KMS 机密安全地存储密码。
-
安全和 Web 配置:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG
:允许通过 Web UI 公开 Airflow 配置。AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK
:启用内置调度程序健康检查。
-
数据库迁移和初始化:
_AIRFLOW_DB_MIGRATE
:确保 Airflow 在启动时运行必要的数据库迁移。
现在继续运行terraform plan
,terraform apply
你应该会看到很多资源被创建。如果一切顺利,你将在你指定的 URL 上看到 Airflow UI:
Airflow 调度器
Airflow 调度程序负责编排 DAG 执行并确保计划任务在正确的时间运行。您可以编写 Terraform 脚本,将调度程序配置为 ECS 服务,配置CloudWatch 日志记录,并启用自动扩展功能,以有效管理资源使用情况。
虽然其中大部分与网络服务器类似,但总而言之,我们需要添加:
- 在 CloudWatch 中记录调度程序执行情况(
/ecs/airflow-scheduler/
)。 - 通过 StatsD Metrics (命名空间)监控性能
airflow-metrics
。 - 在具有自动扩展功能的 ECS 集群中运行,确保高效的资源分配。
- 使用CloudWatch代理进行监控,帮助分析任务执行时间。
- 由受限安全组保护,阻止不必要的流量。
现在,继续运行terraform plan
和terraform apply
,Airflow Scheduler将成功配置!🚀
气流工作者
Airflow 工作服务作为 ECS 服务部署在 EC2 实例上,并根据内存利用率自动伸缩。它运行Celery 工作器,执行来自 DAG 的任务,并且需要Redis,我们将在下文中进行设置。
需要注意的重要事项是:
- 使用CeleryExecutor,这意味着任务在工作人员之间分配。
- 日志被发送到CloudWatch进行监控。
- 根据内存利用率,工作者数量在0 到 5之间动态缩放。
- 每个工作进程都作为ECS内部的容器运行,由自动缩放策略管理,目标内存利用率为 60%。
DUMB_INIT_SETSID=0
设置为处理 Celery 关闭的正确信号传播。
整个设置过程让我哭了,因为在 ECS 中调试自动伸缩、日志管理和任务执行简直是一场噩梦。而且,Redis 还没上线,所以痛苦还远未结束。
Redis 和 RDS
设置 redis 并不是那么糟糕,您可以使用以下 terraform 文件:
resource "aws_elasticache_subnet_group" "airflow" {
name = "airflow-redis-subnet-group"
subnet_ids = aws_subnet.airflow[*].id
tags = merge(
{
name = "airflow-redis-subnet-group"
},
local.common_tags
)
}
resource "aws_security_group" "airflow_redis" {
name_prefix = "airflow-redis"
vpc_id = data.aws_vpc.this.id
tags = merge(
{
Name = "airflow-redis"
},
local.common_tags
)
}
resource "aws_security_group_rule" "airflow_redis_inbound" {
type = "ingress"
from_port = 6379
to_port = 6379
protocol = "tcp"
cidr_blocks = [data.aws_vpc.this.cidr_block]
security_group_id = aws_security_group.airflow_redis.id
description = "Allow Redis from internal network"
}
resource "aws_elasticache_cluster" "airflow" {
cluster_id = "airflow"
engine = "redis"
node_type = "cache.t4g.small"
num_cache_nodes = 1
parameter_group_name = "default.redis5.0"
engine_version = "5.0.6"
port = 6379
subnet_group_name = aws_elasticache_subnet_group.airflow.name
security_group_ids = [aws_security_group.airflow_redis.id]
tags = merge(
{
name = "airflow-redis-server"
},
local.common_tags
)
}
resource "aws_security_group_rule" "airflow_redis_outbound" {
type = "egress"
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
security_group_id = aws_security_group.airflow_redis.id
}
同样,我们也将为气流设置 RDS:
# Security Groups
resource "aws_security_group" "airflow_rds" {
lifecycle {
create_before_destroy = true
}
name_prefix = "airflow-rds-default-"
description = "Allow TLS inbound traffic and all outbound traffic for airflow"
vpc_id = data.aws_vpc.this.id
tags = {
Name = "airflow-rds-default"
}
}
resource "aws_security_group_rule" "airflow_rds_inbound" {
type = "ingress"
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = [data.aws_vpc.this.cidr_block]
security_group_id = aws_security_group.airflow_rds.id
description = "Allow all from internal network"
}
resource "aws_db_subnet_group" "airflow" {
name = "postgres-airflow"
subnet_ids = aws_subnet.airflow[*].id
}
resource "aws_db_instance" "airflow" {
db_name = "any db name"
apply_immediately = true
allocated_storage = "100"
storage_type = "gp3"
engine = "postgres"
engine_version = "17.2"
auto_minor_version_upgrade = true
instance_class = "db.t4g.micro"
username = "airflow"
password = data.aws_kms_secrets.airflow.plaintext["db_password"]
multi_az = false
publicly_accessible = false
deletion_protection = false
skip_final_snapshot = true
identifier = "airflow"
vpc_security_group_ids = [aws_security_group.airflow_rds.id]
db_subnet_group_name = aws_db_subnet_group.airflow.name
}
继续使用 terraform 创建所有这些资源!
使所有这些工作顺利进行的 ENV 配置
为了使Airflow 在 ECS 中与 CeleryExecutor 正常工作,需要几个环境变量用于日志记录、任务执行、数据库连接、Redis 作为消息代理以及外部集成。这些在Terraform 本地变量中定义并传递到 Airflow 容器中。
1️⃣ 核心气流配置
-
实例名称:
"AIRFLOW__WEBSERVER__INSTANCE_NAME" = "airflow-webserver"
- 帮助识别网络服务器实例。
-
执行人:
"AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor"
- 使用CeleryExecutor将任务分配给多个工作程序,而不是在单个实例中按顺序运行它们。
-
数据库连接:
"AIRFLOW__CORE__SQL_ALCHEMY_CONN"
- 使用存储在 AWS KMS 机密中的凭证连接到PostgreSQL。
-
负载示例:
"AIRFLOW__CORE__LOAD_EXAMPLES" = "True"
- 控制是否应加载示例 DAG。
2️⃣ 日志配置(AWS CloudWatch 和 S3)
-
日志级别:
"AIRFLOW__LOGGING__LOGGING_LEVEL" = "DEBUG"
- 启用详细日志记录以进行调试。
-
远程记录到 CloudWatch:
"AIRFLOW__LOGGING__REMOTE_LOGGING" = "True"
"AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID" = "aws_conn"
"AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER" = "s3://abc"
- 将日志存储在S3 和 CloudWatch中,即使容器重启也能访问它们。
3️⃣ Celery & Redis 配置(消息队列 & 任务结果存储)
-
消息队列(Redis):
"AIRFLOW__CELERY__BROKER_URL" = "redis://${aws_elasticache_cluster.airflow.cache_nodes[0].address}:6379/0"
- Celery 使用Redis进行任务排队(尚未设置,另一个痛苦来源)。
-
任务结果存储(PostgreSQL):
"AIRFLOW__CELERY__RESULT_BACKEND" = "db+postgresql://airflow:${data.aws_kms_secrets.airflow.plaintext["db_password"]}@${aws_db_instance.airflow.endpoint}/airflow"
- 任务执行结果存储于PostgreSQL中,保证持久化。
-
Celery 运输选项:
"AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT" = "1800"
- 确保任务不会过早被标记为失败。
4️⃣ SMTP(DAG 故障和通知的电子邮件警报)
- SMTP 配置:
"AIRFLOW__SMTP__SMTP_HOST" = "d"
"AIRFLOW__SMTP__SMTP_MAIL_FROM" = "abc@email.com"
"AIRFLOW__SMTP__SMTP_PORT" = "587"
"AIRFLOW__SMTP__SMTP_SSL" = "True"
- 用于通过电子邮件发送失败通知。
5️⃣ AWS 特定配置
-
区域设置:
"AWS_DEFAULT_REGION" = local.region
- 确保 Terraform 和 Airflow 组件在正确的 AWS 区域运行。
-
流畅的位记录以实现可观察性:
- 使用Fluent Bit (
aws-for-fluent-bit:stable
) 进行日志收集。
- 使用Fluent Bit (
6️⃣ 外部集成(GitHub 和 AWS Secrets Manager)
- GitHub 连接(Airflow 提供商):
"AIRFLOW__PROVIDERS__GITHUB__GITHUB_CONN_ID" = "github_default"
"AIRFLOW__PROVIDERS__GITHUB__ACCESS_TOKEN" = data.aws_kms_secrets.airflow.plaintext["github_token"]
- 使 Airflow DAG 能够与 GitHub API 交互。
痛点😭
- 由于网络和 IAM 角色问题,为Celery 设置 Redis非常麻烦。
- 在处理权限的同时调试S3 和 CloudWatch 中的日志存储令人沮丧。
- 管理AWS Secrets Manager 和 KMS凭证解密增加了复杂性。
- 基于Redis 队列深度和 CPU/内存使用情况的自动扩展工作器需要进行微调。
好的,现在让我们最终移动 DAG!
现在 Airflow 的基础架构基本搭建完毕(省去了Redis 的麻烦😭),是时候迁移我们的 DAG 了。我们不再动态挂载它们,而是直接将它们烘焙到 Docker 镜像中。这样可以确保每个运行 Airflow 调度程序或工作器的容器都预加载了DAG,而无需依赖外部存储。
1️⃣ 我们如何将 DAG 烘焙到 Docker 镜像中
在我们的Dockerfile(我们之前编写的)中,我们通过将 DAG 复制到/dags
容器内的目录中来添加它们
2️⃣为什么采用这种方法?
- ✅无需外部 DAG 存储(如 S3、EFS 或 Git 同步)。
- ✅确保版本控制— DAG 是Docker 构建过程的一部分,因此每个部署都会获得一个已知的DAG 版本。
- ✅简化部署— 运行时无需额外步骤复制 DAG。
3️⃣ 构建和推送 Docker 镜像
添加 DAG 后,我们构建并推送镜像:
docker build -t airflow-custom:latest .
docker tag airflow-custom:latest <AWS_ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/airflow:latest
docker push <AWS_ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/airflow:latest
4️⃣ 更新 ECS 以使用新图像
由于我们将 DAG 烘焙到图像中,我们只需更新 ECS 以提取最新图像,DAG 就会在那里。
aws ecs update-service \
--cluster airflow-cluster \
--service airflow-scheduler \
--force-new-deployment
这会触发调度程序的滚动重启,确保加载新的 DAG。
痛苦与下一步😭
- DAG 调试:如果 DAG 有语法错误,ECS 将循环重新启动调度程序,直到问题修复。
- 热重载?烘焙 DAG 意味着每次 DAG 更新时都要重新部署—— 目前还好,但我们以后可能会添加已挂载的卷或Git 同步。
- 烘焙前测试 DAG:为了避免错误的部署,我们应该在将 DAG添加到图像之前在本地对其进行测试。
最后冲刺:DAG 正式入驻! 🏠
DAG 现在位于容器内,这意味着无需运行时复制,不会出现 DAG 丢失的问题,而且不用担心任何事情— — 直到下一次火灾发生。🔥
现在我躺在一堆鲜血和尸体上⚰️
花了两天时间,我们像狙击手一样,一个接一个地慢慢关闭了所有 Glue 任务。每次关闭都充满期待——Airflow 能否顺利接管,还是会再次陷入另一场调试噩梦?
每次转换,我们都看着 DAG 旋转起来,监控任务执行情况,祈祷 Celery 不会背叛我们。我们梳理了日志,调整了重试,喝了无数杯咖啡。
最后,在这场战争结束时,数字说明了一切:
🚀 Glue 作业成本降低 96% 。🔥 Airflow 全面投入运行,任务在 ECS 工作器之间高效运行。💀 Redis 几乎让我们失去理智,但最终勉强存活 了下来
。
这次迁徙不仅仅是一次部署,更是一场意志的较量。不知何故,我们克服了重重困难,最终取得了胜利。如今,尘埃落定,我松了一口气,稍事休息——不是因为战争结束了,而是因为下一场战斗即将到来。
文章来源:https://dev.to/skysingh04/how-i-reduced-10000-monthly-aws-glue-bill-to-400-using-airflow-147k