DAG の追加と更新 - Amazon Managed Workflows for Apache Airflow

DAG の追加と更新

有向非巡回グラフ (DAG) は、DAG の構造をコードとして定義する Python ファイル内で定義されます。AWS CLI および Amazon S3 コンソールを使用して、DAG をご利用の環境にアップロードできます。このトピックでは、Amazon S3 バケット内の dags フォルダを使用して、Amazon Managed Workflows for Apache Airflow 環境に Apache Airflow DAG を追加または更新する手順について説明します。

前提条件

このページのステップを完了するには、以下のものが必要です。

  • アクセス許可 — AWS アカウント には、管理者から、ご使用の環境の AmazonMWAAFullConsoleAccess アクセスコントロールポリシーへのアクセス権限が付与されている必要があります。さらに、Amazon MWAA 環境には、その環境で使用される AWS のリソースへのアクセスを 実行ロール で許可されている必要があります。

  • アクセス — 依存関係をウェブサーバーに直接インストールするためにパブリックリポジトリにアクセスする必要がある場合は、パブリックネットワーク のウェブサーバーアクセスが環境に設定されている必要があります。詳細については、Apache Airflow のアクセスモード を参照してください。

  • Amazon S3 設定plugins.zip で DAG、カスタムプラグイン、および requirements.txt で Python の依存関係を保存するために使用される Amazon S3 バケット は、Public Access BlockedVersioning Enabled で構成する必要があります。

仕組み

有向非巡回グラフ (DAG) は、DAG の構造をコードとして定義する単一の Python ファイル内で定義されます。その構成は以下の通りである:

Amazon MWAA 環境で Apache Airflow プラットフォームを実行するには、DAG 定義をストレージバケット内の dags フォルダにコピーする必要があります。例えば、ストレージバケットの DAG フォルダは次のようになります。

例 DAG フォルダ
dags/ └ dag_def.py

Amazon MWAA は、Amazon S3 バケットの新規オブジェクトと変更されたオブジェクトを 30 秒ごとに Amazon MWAA スケジューラとワーカーコンテナの /usr/local/airflow/dags フォルダに自動的に同期し、ファイルタイプに関係なく Amazon S3 ソースのファイル階層を維持します。新しい DAG が Apache Airflow UI にリストされるまでの時間は、scheduler.dag_dir_list_interval によって制御されます。既存の DAG への変更は、次の DAG 処理ループ で取り込まれます。

注記

DAG フォルダには airflow.cfg 設定ファイルを含める必要はありません。Amazon MWAA コンソールからデフォルトの Apache Airflow 設定を上書きできます。詳細については、Amazon MWAA での Apache Airflow 設定オプションの使用 を参照してください。

何が変わったのですか?

特定の Apache Airflow リリースの変更を確認するには、リリースノート ページを参照してください。

Amazon MWAA CLI ユーティリティを使用した DAG のテスト

  • コマンドラインインターフェイス (CLI) ユーティリティは、Amazon Managed Workflows for Apache Airflow 環境をローカルに複製します。

  • CLI は、Amazon MWAA のプロダクションイメージに似た Docker コンテナイメージをローカルでビルドします。これを使用すると、Amazon MWAA にデプロイする前に、ローカルの Apache Airflow 環境を実行して DAG、カスタムプラグイン、依存関係を開発およびテストできます。

  • CLI を実行するには、GitHub の aws-mwaa-docker-images を参照してください。

Amazon S3 への DAG コードのアップロード

Amazon S3 コンソールまたは AWS Command Line Interface (AWS CLI) を使用して、Amazon S3 バケットに DAG コードをアップロードできます。以下の手順では、コード(.py)をAmazon S3バケット内の dags という名前のフォルダにアップロードすることを前提としています。

AWS CLI の使用

AWS Command Line Interface (AWS CLI) は、コマンドラインシェルでコマンドを使用して AWS サービスとやり取りできるオープンソースツールです。このページのステップを完了するには、以下のものが必要です。

AWS CLI を使用してアップロードするには
  1. 以下のコマンドを使って、Amazon S3 バケットをすべてリストアップします

    aws s3 ls
  2. 以下のコマンドを使用して、ご使用の環境の Amazon S3 バケット内のファイルとフォルダを一覧表示します。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  3. 以下のコマンドは、dag_def.py ファイルを dags フォルダにアップロードします。

    aws s3 cp dag_def.py s3://amzn-s3-demo-bucket/dags/

    Amazon S3 バケットに dags という名前のフォルダがまだ存在しない場合、このコマンドは dags フォルダを作成し、新しいフォルダに名前が dag_def.py のファイルをアップロードします。

Amazon S3 コンソールの使用

Amazon S3 コンソールは、Amazon S3 バケット内のリソースを作成および管理するために使用できるウェブベースのユーザーインターフェイスです。以下の手順では、DAG フォルダーが dags という名前であると仮定しています。

Amazon S3 コンソールを使ってアップロードするには
  1. Amazon MWAA コンソールで、環境ページ を開きます。

  2. 環境を選択します。

  3. コンソールの DAG コード in S3 ペインで S3 バケット リンクを選択して、ストレージバケットを開きます。

  4. dags フォルダを選択します。

  5. アップロード を選択します。

  6. ファイルの追加 を選択します。

  7. dag_def.py のローカルコピーを選択し、アップロード を選択します。

Amazon MWAA コンソールで DAG フォルダへのパスを指定する (初回)

以下の手順では、dags という名前の Amazon S3 バケット上のフォルダへのパスを指定していると仮定します。

  1. Amazon MWAA コンソールで、環境ページ を開きます。

  2. DAG を実行する環境を選択します。

  3. 編集 を選択します。

  4. Amazon S3 ペインの DAG コード で、DAG フォルダ のフィールドに隣接する S3 を参照 を選択します。

  5. dags フォルダを選択します。

  6. 選択 を選択します。

  7. 次へ環境の更新 を選択します。

Apache Airflow UI での変更にアクセスする

Apache Airflow UI にアクセスするためには、AWS Identity and Access Management (IAM) で AWS アカウント に対して Apache Airflow UI アクセスポリシー: AmazonMWAAWebServerAccess のアクセス許可が必要です。

Apache Airflow UI にアクセスするには
  1. Amazon MWAA コンソールで、環境ページ を開きます。

  2. 環境を選択します。

  3. Airflow UI を開く を選択します。

次のステップ

GitHub の aws-mwaa-docker-images を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。