Amazon MWAA での dbt の使用 - Amazon Managed Workflows for Apache Airflow

Amazon MWAA での dbt の使用

このトピックでは、Amazon MWAA で dbt と Postgres を使用する方法を示します。次のステップでは、必要な依存関係を requirements.txt に追加し、サンプルの dbt プロジェクトを環境の Amazon S3 バケットにアップロードします。次に、サンプル DAG を使用して Amazon MWAA が依存関係をインストールしたことを確認し、最後に BashOperator を使用して dbt プロジェクトを実行します。

バージョン

このページのコード例は、Python 3.10Apache Airflow v2 および Python 3.11Apache Airflow v3 で使用可能です。

前提条件

次の手順を完了するには、以下のものが必要です。

  • Apache Airflow v2.2.2 を使用する Amazon MWAA 環境。このサンプルは v2.2.2 で作成され、テストされています。他の Apache Airflow バージョンで使用するためには、サンプルを変更する必要がある場合があります。

  • dbt プロジェクトのサンプル。Amazon MWAA で dbt を使い始めるには、フォークを作成し、dbt-labs GitHub リポジトリから dbt スタータープロジェクトをクローンすることができます。

依存関係

dbt で Amazon MWAA を使用するには、次のスタートアップスクリプトを環境に追加します。詳細については、Amazon MWAA でのスタートアップスクリプトの使用 を参照してください。

#!/bin/bash if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]] then exit 0 fi echo "------------------------------" echo "Installing virtual Python env" echo "------------------------------" pip3 install --upgrade pip echo "Current Python version:" python3 --version echo "..." sudo pip3 install --user virtualenv sudo mkdir python3-virtualenv cd python3-virtualenv sudo python3 -m venv dbt-env sudo chmod -R 777 * echo "------------------------------" echo "Activating venv in" $DBT_ENV_PATH echo "------------------------------" source dbt-env/bin/activate pip3 list echo "------------------------------" echo "Installing libraries..." echo "------------------------------" # do not use sudo, as it will install outside the venv pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1 echo "------------------------------" echo "Venv libraries..." echo "------------------------------" pip3 list dbt --version echo "------------------------------" echo "Deactivating venv..." echo "------------------------------" deactivate

以下のセクションでは、dbt プロジェクトディレクトリを Amazon S3 にアップロードし、Amazon MWAA が必要な dbt 依存関係を正常にインストールしたかどうかを検証する DAG を実行します。

DBT プロジェクトを Amazon S3 にアップロードする

Amazon MWAA 環境で dbt プロジェクトを使用できるようにするには、プロジェクトディレクトリ全体を環境の dags フォルダにアップロードできます。環境が更新されると、Amazon MWAA は dbt ディレクトリをローカル usr/local/airflow/dags/ フォルダにダウンロードします。

DBT プロジェクトを Amazon S3 にアップロードするには
  1. dbt スタータープロジェクトをクローンしたディレクトリに移動します。

  2. 次の Amazon S3 AWS CLI コマンドを実行して、dags パラメータを使用してプロジェクトの内容を再帰的に環境の --recursive にコピーします。このコマンドは、dbt と呼ばれるサブディレクトリを作成し、これをすべてのdbtプロジェクトに使用できます。サブディレクトリが既に存在する場合、プロジェクトファイルは既存のディレクトリにコピーされ、新しいディレクトリは作成されません。このコマンドは、この特定のスターターのために dbt ディレクトリ内にサブディレクトリも作成します。

    aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive

    プロジェクトのサブディレクトリに異なる名前を使用して、親 dbt ディレクトリ内の複数の dbt プロジェクトを整理できます。

DAG を使用して dbt 依存関係のインストールを検証します。

次の DAGは、BashOperator を使用し、requirements.txt で指定されたdbtの依存関係を正常にインストールしたかどうかを Amazon MWAA が確認します。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version" )

以下を実行してタスクログにアクセスし、dbt とその依存関係がインストールされていることを確認します。

  1. Amazon MWAA コンソールに移動し、使用可能な環境のリストから Airflow UI を開く を選択します。

  2. Apache Airflow UI のリストから dbt-installation-test DAG を探し、Last Run 列にある日付を選択して、最後に成功したタスクを開きます。

  3. グラフビュー を使用してタスクを選択し、bash_commandタスクインスタンスの詳細を開きます。

  4. [Log] を選択してタスクログを開き、次に、requirements.txt で指定された dbt のバージョンをログが正常にリストしていることを確認してください。

DAG を使用して dbt プロジェクトを実行します。

次の DAG は、BashOperator を使用して Amazon S3 にアップロードした dbt プロジェクトをローカル usr/local/airflow/dags/ ディレクトリから書き込み可能な/tmpディレクトリにコピーし、dbt プロジェクトを実行します。bash コマンドは、dbt-starter-projectというタイトルのスターター dbt プロジェクトを想定しています。プロジェクトディレクトリの名前に従ってディレクトリ名を変更します。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") # assumes all files are in a subfolder of DAGs called dbt with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\ cp -R /usr/local/airflow/dags/dbt /tmp;\ echo 'listing project files:';\ ls -R /tmp;\ cd /tmp/dbt/mwaa_dbt_test_project;\ /usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log;\ rm -rf /tmp/dbt/mwaa_dbt_test_project" )