在 Amazon MWAA 中使用 dbt - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon MWAA 中使用 dbt

本主题演示了如何在 Amazon MWAA 中使用 dbt 和 Postgres。在以下步骤中,您将所需的依赖项添加到 requirements.txt 中,并将示例 dbt 项目上传到环境的 Amazon S3 存储桶。然后,您将使用示例 DAG 来验证 Amazon MWAA 是否已安装依赖项,最后使用 BashOperator 来运行 dbt 项目。

版本

您可以在 Python 3.10 中将本页上的代码示例与 Apache Airflow v2 一起使用,在 Python 3.11 中与 Apache Airflow v3 一起使用。

先决条件

在完成以下步骤之前,您需要具备以下条件:

  • 使用 Apache Airflow v2.2.2 的 Amazon MWAA 环境。此示例已编写,并使用 v2.2.2 进行了测试。您可能需要修改示例以与其他 Apache Airflow 版本一起使用。

  • dbt 项目示例。要开始在 Amazon MWAA 中使用 dbt,您可以创建一个分支并从 dbt-labs GitHub 存储库中克隆 dbt 入门项目

依赖项

要将 Amazon MWAA 与 dbt 配合使用,请将以下启动脚本添加到环境中。要了解更多信息,请参阅在 Amazon MWAA 中使用启动脚本

#!/bin/bash if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]] then exit 0 fi echo "------------------------------" echo "Installing virtual Python env" echo "------------------------------" pip3 install --upgrade pip echo "Current Python version:" python3 --version echo "..." sudo pip3 install --user virtualenv sudo mkdir python3-virtualenv cd python3-virtualenv sudo python3 -m venv dbt-env sudo chmod -R 777 * echo "------------------------------" echo "Activating venv in" $DBT_ENV_PATH echo "------------------------------" source dbt-env/bin/activate pip3 list echo "------------------------------" echo "Installing libraries..." echo "------------------------------" # do not use sudo, as it will install outside the venv pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1 echo "------------------------------" echo "Venv libraries..." echo "------------------------------" pip3 list dbt --version echo "------------------------------" echo "Deactivating venv..." echo "------------------------------" deactivate

在以下章节中,您可将 dbt 项目目录上传到 Amazon S3 并运行 DAG 来验证 Amazon MWAA 是否已成功安装所需的 dbt 依赖项。

将 dbt 项目上传到 Amazon S3

为了能够在 Amazon MWAA 环境中使用 dbt 项目,您可以将整个项目目录上传到环境的 dags 文件夹中。当环境更新时,Amazon MWAA 会将 dbt 目录下载到本地 usr/local/airflow/dags/ 文件夹。

要将 dbt 项目上传到 Amazon S3,请执行以下操作
  1. 导航到您克隆 dbt 入门项目的目录。

  2. 运行以下 Amazon S3 AWS CLI 命令,使用 --recursive 参数以递归方式将项目内容复制到环境的 dags 文件夹。该命令会创建一个名为 dbt 的子目录,您可以将其用于所有 dbt 项目。如果子目录已经存在,则项目文件将被复制到现有目录中,并且不会创建新目录。该命令还会为该特定入门项目在 dbt 目录中创建一个子目录。

    aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive

    您可以为项目子目录使用不同的名称,以便在 dbt 父目录中组织多个 dbt 项目。

使用 DAG 验证 dbt 依赖项的安装

以下 DAG 使用 BashOperator 和 bash 命令来验证 Amazon MWAA 是否已成功安装 requirements.txt 中指定的 dbt 依赖项。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version" )

执行以下操作以访问任务日志并验证是否已安装 dbt 及其依赖项。

  1. 导航到 Amazon MWAA 控制台,然后从可用环境列表中选择打开 Airflow UI

  2. 在 Apache Airflow UI 上,从列表中找到 dbt-installation-test DAG,然后在 Last Run 列中选择日期以打开上一个成功的任务。

  3. 使用图表视图,选择 bash_command 任务以打开任务实例的详细信息。

  4. 选择日志来打开任务日志,然后验证日志是否成功列出了我们在 requirements.txt 中指定的 dbt 版本。

使用 DAG 来运行 dbt 项目

以下 DAG 使用 BashOperator 将您从本地 usr/local/airflow/dags/ 目录上传到 Amazon S3 的 dbt 项目复制到可写入的 /tmp 目录,然后运行 dbt 项目。bash 命令假设一个名为 dbt-starter-project 的 入门 dbt 项目。根据您项目目录的名称修改目录名称。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") # assumes all files are in a subfolder of DAGs called dbt with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\ cp -R /usr/local/airflow/dags/dbt /tmp;\ echo 'listing project files:';\ ls -R /tmp;\ cd /tmp/dbt/mwaa_dbt_test_project;\ /usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log;\ rm -rf /tmp/dbt/mwaa_dbt_test_project" )