翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
カスタムプラグインのインストール
Amazon Managed Workflows for Apache Airflow は Apache Airflow の組み込みプラグインマネージャーをサポートしているため、カスタム Apache Airflow オペレータ、フック、センサー、またはインターフェイスを使用できます。このページでは、plugins.zip
ファイルを使用して、Amazon MWAA 環境に Apache Airflow カスタムプラグインをインストールする手順について説明します。
前提条件
このページのステップを完了するには、以下のものが必要です。
-
アクセス許可 — 管理者から環境の AmazonMWAAFullConsoleAccess アクセスコントロールポリシーへのアクセス権が付与されている AWS アカウント 必要があります。さらに、Amazon MWAA 環境は、環境で使用される AWS リソースにアクセスするために実行ロールによって許可されている必要があります。
-
アクセス — ウェブサーバーに直接依存関係をインストールするためにパブリックリポジトリへのアクセスが必要な場合は、環境をパブリックネットワークウェブサーバーアクセスで設定する必要があります。詳細については、「Apache Airflow のアクセスモード」を参照してください。
-
Amazon S3 設定 — plugins.zip
で DAG、カスタムプラグイン、および requirements.txt
で Python の依存関係を保存するために使用される Amazon S3 バケットは、Public Access Blocked と Versioning Enabledで構成する必要があります。
仕組み
カスタムプラグインを環境で実行するには、次の 3 つのことを行う必要があります。
-
plugins.zip
ファイルをローカルに作成します。
-
plugins.zip
のファイルを Amazon S3 のバケットにアップロードしてください。
-
Amazon MWAA コンソールの [プラグインファイル] フィールドに、このファイルのバージョンを指定します。
これが初めて plugins.zip
を Amazon S3 バケットにアップロードする場合、Amazon MWAA コンソールでファイルへのパスも指定する必要があります。1 回だけこのステップを行ってください。
プラグインを使用するタイミング
プラグインは、Apache Airflow のドキュメントで説明されているように、Apache Airflow ユーザーインターフェイスを拡張する場合にのみ必要です。カスタム演算子は、DAG
コードとともに /dags
フォルダに直接配置できます。
外部システムと独自の統合を作成する必要がある場合は、/dags
フォルダまたはサブフォルダ内に配置しますが、plugins.zip
フォルダには配置しません。Apache Airflow 2.x では、プラグインは主に UI を拡張するために使用されます。
同様に、他の依存関係を に配置することはできませんplugins.zip
。代わりに、Apache Airflow の開始前に各 Amazon MWAA コンテナに同期される Amazon S3 /dags
フォルダ内の場所に保存できます。
Apache Airflow DAG オブジェクトを明示的に定義していない /dags
フォルダ内または plugins.zip
内のファイルは、.airflowignore
ファイルにリストする必要があります。
カスタムプラグイン数
Apache Airflow の組み込みプラグインマネージャは、単にファイルを$AIRFLOW_HOME/plugins
フォルダにドロップすることで外部の機能をコアに統合できます。これにより、カスタムの Apache Airflow オペレータ、フック、センサー、またはインターフェースを使用できます。次のセクションでは、ローカル開発環境におけるフラットでネストされたディレクトリ構造の例と、plugins.zip 内のディレクトリ構造を決定する import ステートメントの例を示します。
カスタムプラグインのディレクトリとサイズの制限
Apache Airflow スケジューラとワーカーは、 の環境の AWSマネージド Fargate コンテナで起動中にカスタムプラグインを検索します/usr/local/airflow/plugins/*
。
-
ディレクトリ構造。(/*
での) ディレクトリ構造は、plugins.zip
ファイルの内容に基づいています。たとえば、 に operators
ディレクトリがメインレベルのディレクトリとしてplugins.zip
含まれている場合、ディレクトリは環境/usr/local/airflow/plugins/operators
上の に抽出されます。
-
サイズ制限。1 GB 未満の plugins.zip
ファイルをお勧めします。plugins.zip
ファイルのサイズが大きいほど、環境でのスタートアップ時間が長くなります。Amazon MWAA は plugins.zip
ファイルのサイズを明示的に制限していませんが、10 分以内に依存関係をインストールできない場合、Fargate サービスはタイムアウトし、環境を安定した状態にロールバックしようとします。
Apache Airflow v2.0.2 を使用する環境では、Amazon MWAA は Apache Airflow ウェブサーバー上のアウトバウンドトラフィックを制限し、プラグインや Python 依存関係をウェブサーバーに直接インストールすることはできません。Apache Airflow v2.2.2 以降、Amazon MWAA はプラグインと依存関係をウェブサーバーに直接インストールできます。
カスタムプラグイン数
次のセクションでは、Apache Airflow リファレンスガイドのサンプルコードを使用して、ローカル開発環境を構築する方法について説明します。
plugins.zip でフラットなディレクトリ構造を使用する例
- Apache Airflow v3
-
次の例は、Apache Airflow v3 のフラットディレクトリ構造を持つplugins.zip
ファイルを示しています。
例 plugins/virtual_python_plugin.py
次の の例では、PythonVirtualenvOperator
カスタムプラグインを表示します。
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
- Apache Airflow v2
-
次の例は、Apache Airflow v2 のフラットディレクトリ構造を持つplugins.zip
ファイルを示しています。
例 plugins/virtual_python_plugin.py
次の の例では、PythonVirtualenvOperator
カスタムプラグインを表示します。
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
plugins.zip のネストされたディレクトリ構造を使用する例
- Apache Airflow v3
-
次の例では、、hooks
、operators
および ディレクトリの個別のsensors
ディレクトリを持つplugins.zip
ファイルを示します。
例 plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
次の の例では、カスタムプラグインを使用する DAG (DAGs フォルダ) のインポートステートメントを表示します。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
次の例は、カスタムプラグインファイルに必要な各インポートステートメントを示しています。
例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
「Amazon MWAA CLI ユーティリティを使用したカスタムプラグインのテスト」の手順に従い、次に「plugins.zip ファイルの作成」で plugins
ディレクトリ内のコンテンツを圧縮してください。例えば、cd plugins
。
- Apache Airflow v2
-
次の例では、、hooks
、operators
および ディレクトリの個別のsensors
ディレクトリを持つplugins.zip
ファイルを示します。
例 plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
次の の例では、カスタムプラグインを使用する DAG (DAGs フォルダ) にインポートステートメントを表示します。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
次の例は、カスタムプラグインファイルに必要な各インポートステートメントを示しています。
例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
「Amazon MWAA CLI ユーティリティを使用したカスタムプラグインのテスト」の手順に従い、次に「plugins.zip ファイルの作成」で plugins
ディレクトリ内のコンテンツを圧縮してください。例えば、cd plugins
。
plugins.zip ファイルの作成
以下の手順では、plugins.zip ファイルをローカルで作成する場合に推奨される手順について説明します。
ステップ 1: Amazon MWAA CLI ユーティリティを使用してカスタムプラグインをテストする
-
コマンドラインインターフェイス (CLI) ユーティリティは、Amazon Managed Workflows for Apache Airflow 環境をローカルに複製します。
-
CLI は、Amazon MWAA のプロダクションイメージに似た Docker コンテナイメージをローカルでビルドします。これにより、Amazon MWAA にデプロイする前に、ローカルの Apache Airflow 環境を実行して DAG、カスタムプラグイン、依存関係を開発およびテストできます。
-
CLI を実行するには、GitHub の aws-mwaa-docker-images を参照してください。
ステップ 2: plugins.zip ファイルを作成する
ビルトインの ZIP アーカイブユーティリティやその他の ZIP ユーティリティ (7zip など) を使用して.zip ファイルを作成できます。
Windows OS 用の組み込み zip ユーティリティは、.zip ファイルを作成するときにサブフォルダを追加する場合があります。Amazon S3 バケットにアップロードする前に plugins.zip ファイルの内容を確認して、ディレクトリが追加されていないことを確認することをお勧めします。
-
ディレクトリをローカルの Airflow プラグインディレクトリに変更します。例:
myproject$ cd plugins
-
次のコマンドを実行して、コンテンツに実行権限があることを確認します (macOS と Linux のみ)。
plugins$ chmod -R 755 .
-
plugins
フォルダ内のコンテンツを圧縮します。
plugins$ zip -r plugins.zip .
plugins.zip
を Amazon S3 にアップロードします。
Amazon S3 コンソールまたは AWS Command Line Interface (AWS CLI) を使用して、Amazon S3 バケットにplugins.zip
ファイルをアップロードできます。
の使用 AWS CLI
AWS Command Line Interface (AWS CLI) は、コマンドラインシェルのコマンドを使用して AWS サービスとやり取りできるようにするオープンソースツールです。このページのステップを完了するには、以下のものが必要です。
を使用してアップロードするには AWS CLI
-
コマンドプロンプトで、plugins.zip
ファイルが保存されているディレクトリに移動します。例:
cd plugins
-
以下のコマンドを使って、Amazon S3 バケットをすべてリストアップします
aws s3 ls
-
以下のコマンドを使用して、ご使用の環境の Amazon S3 バケット内のファイルとフォルダを一覧表示します。
aws s3 ls s3://YOUR_S3_BUCKET_NAME
-
環境の Amazon S3 バケットに plugins.zip
ファイルをアップロードするには、次のコマンドを使用します。
aws s3 cp plugins.zip s3://amzn-s3-demo-bucket
/plugins.zip
Amazon S3 コンソールの使用
Amazon S3 コンソールは、Amazon S3 バケット内のリソースを作成および管理できるウェブベースのユーザーインターフェイスです。
Amazon S3 コンソールを使ってアップロードするには
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
S3 ペインの DAG コードで S3 バケットリンクを選択して、コンソールでストレージバケットを開きます。 S3
-
[アップロード] を選択します。
-
[ファイルの追加] を選択します。
-
plugins.zip
のローカルコピーを選択し、[アップロード] を選択します。
環境へのカスタムプラグインのインストール
このセクションでは、plugins.zip ファイルへのパスを指定し、zip ファイルが更新されるたびに plugins.zip ファイルのバージョンを指定することで、Amazon S3 バケットにアップロードしたカスタムプラグインをインストールする方法について説明します。
Amazon MWAA コンソールで plugins.zip
へのパスを指定する(初回)
これが初めて plugins.zip
を Amazon S3 バケットにアップロードする場合、Amazon MWAA コンソールでファイルへのパスも指定する必要があります。1 回だけこのステップを行ってください。
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
[編集] を選択します。
-
Amazon S3 ペインの DAG コードで、プラグインファイル - オプションフィールドの横にある S3 を参照を選択します。
-
Amazon S3 バケットの plugins.zip
ファイルを選択します。
-
[選択] を選択します。
-
[次へ] → [環境の更新] を選択します。
Amazon MWAA コンソールで plugins.zip
のバージョンを指定する。
新しいバージョンの plugins.zip
を Amazon S3 バケットにアップロードするたびに、Amazon MWAA コンソールで plugins.zip
ファイルのバージョンを指定する必要があります。
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
[編集] を選択します。
-
[Amazon S3 の DAG コードペイン] で、ドロップダウンリストから plugins.zip
のバージョンを選択します。
-
[次へ] を選択します。
plugins.zip のユースケースの例
次のステップ
GitHub DAGs、カスタムプラグイン、Python 依存関係をローカルでテストします。 aws-mwaa-docker-images