安裝自訂外掛程式 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

安裝自訂外掛程式

Amazon Managed Workflows for Apache Airflow 支援 Apache Airflow 的內建外掛程式管理員,可讓您使用自訂 Apache Airflow 運算子、勾點、感應器或介面。此頁面說明使用 plugins.zip 檔案在 Amazon MWAA 環境上安裝 Apache Airflow 自訂外掛程式的步驟。

先決條件

您將需要下列項目,才能完成此頁面上的步驟。

  • 許可 — AWS 您的管理員必須已授予您的帳戶存取您環境的 AmazonMWAAFullConsoleAccess 存取控制政策。此外,您的執行角色必須允許 Amazon MWAA 環境存取您的環境所使用的 AWS 資源。

  • 存取 — 如果您需要存取公有儲存庫,才能直接在 Web 伺服器上安裝相依性,您的環境必須設定公有網路 Web 伺服器存取。如需詳細資訊,請參閱Apache Airflow 存取模式

  • Amazon S3 組態 — 用於在 中存放 DAGs、自訂外掛程式plugins.zip和 Python 相依性的 Amazon S3 儲存貯requirements.txt必須設定為啟用公開存取封鎖版本控制

運作方式

若要在您的環境中執行自訂外掛程式,您必須執行三件事:

  1. 在本機建立plugins.zip檔案。

  2. 將本機plugins.zip檔案上傳至您的 Amazon S3 儲存貯體。

  3. 在 Amazon MWAA 主控台的外掛程式檔案欄位中指定此檔案的版本。

注意

如果這是您第一次將 上傳至 Amazon S3 plugins.zip儲存貯體,您也需要在 Amazon MWAA 主控台上指定 檔案的路徑。您只需要完成此步驟一次。

何時使用外掛程式

外掛程式僅適用於擴展 Apache Airflow 使用者介面,如 Apache Airflow 文件中所述。自訂運算子可以直接與您的DAG程式碼放在 /dags 資料夾中。

如果您需要建立與外部系統的整合,請將它們放在 /dags 資料夾或其中的子資料夾,而不是資料夾中plugins.zip。在 Apache Airflow 2.x 中,外掛程式主要用於擴展 UI。

同樣地,其他相依性不應放置在 中plugins.zip。相反地,它們可以存放在 Amazon S3 /dags 資料夾下的位置,在 Apache Airflow 啟動之前,它們將同步到每個 Amazon MWAA 容器。

注意

/dags 資料夾中或 中plugins.zip未明確定義 Apache Airflow DAG 物件的任何檔案都必須列在 .airflowignore 檔案中。

自訂外掛程式概觀

Apache Airflow 的內建外掛程式管理員只需在 $AIRFLOW_HOME/plugins 資料夾中捨棄檔案,即可將外部功能整合至其核心。它可讓您使用自訂 Apache Airflow 運算子、勾點、感應器或界面。下一節提供本機開發環境中平面和巢狀目錄結構的範例,以及產生的匯入陳述式,其決定 plugins.zip 內的目錄結構。

自訂外掛程式目錄和大小限制

Apache Airflow 排程器工作者會在 環境的 AWS受管 Fargate 容器上,於啟動期間尋找自訂外掛程式/usr/local/airflow/plugins/*

  • 目錄結構。目錄結構 (位於 /*) 是以plugins.zip檔案的內容為基礎。例如,如果您的 plugins.zip包含 operators目錄做為頂層目錄,則會將目錄解壓縮至您環境中/usr/local/airflow/plugins/operators的 。

  • 大小限制。我們建議plugins.zip的檔案小於 1 GB。檔案的大小越大plugins.zip,環境的啟動時間就越長。雖然 Amazon MWAA 不會明確限制plugins.zip檔案大小,但如果無法在十分鐘內安裝相依性,則 Fargate 服務將會逾時,並嘗試將環境復原至穩定狀態。

注意

對於使用 Apache Airflow v1.10.12 或 Apache Airflow v2.0.2 的環境,Amazon MWAA 會限制 Apache Airflow Web 伺服器上的傳出流量,不允許您直接在 Web 伺服器上安裝外掛程式或 Python 相依性。從 Apache Airflow 2.2.2 版開始,Amazon MWAA 可以直接在 Web 伺服器上安裝外掛程式和相依性。

自訂外掛程式的範例

下一節使用 Apache Airflow 參考指南中的範例程式碼,示範如何建構您的本機開發環境。

在 plugins.zip 中使用平面目錄結構的範例

Apache Airflow v2

下列範例顯示 Apache Airflow v2 的平面目錄結構plugins.zip檔案。

範例 具有 PythonVirtualenvOperator plugins.zip 的平面目錄

下列範例顯示 中 PythonVirtualenvOperator 自訂外掛程式之 plugins.zip 檔案的頂層樹狀目錄為 Apache Airflow PythonVirtualenvOperator 建立自訂外掛程式

├── virtual_python_plugin.py
範例 plugins/virtual_python_plugin.py

下列範例顯示 PythonVirtualenvOperator 自訂外掛程式。

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

下列範例顯示 Apache Airflow v1 的平面目錄結構plugins.zip檔案。

範例 具有 PythonVirtualenvOperator plugins.zip 的平面目錄

下列範例顯示 中 PythonVirtualenvOperator 自訂外掛程式之 plugins.zip 檔案的頂層樹狀目錄為 Apache Airflow PythonVirtualenvOperator 建立自訂外掛程式

├── virtual_python_plugin.py
範例 plugins/virtual_python_plugin.py

下列範例顯示 PythonVirtualenvOperator 自訂外掛程式。

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

在 plugins.zip 中使用巢狀目錄結構的範例

Apache Airflow v2

下列範例顯示具有 hooksoperatorssensors Apache Airflow v2 目錄之個別目錄plugins.zip的檔案。

範例 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

下列範例顯示使用自訂外掛程式的 DAG (DAGs 資料夾) 中的匯入陳述式。

範例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
範例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

下列範例顯示自訂外掛程式檔案中所需的每個匯入陳述式。

範例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
範例 sensor/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
範例 Operator/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
範例 Operator/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

請遵循使用 Amazon MWAA CLI 公用程式測試自訂外掛程式中的步驟,然後建立 plugins.zip 檔案以壓縮plugins目錄中的內容。 例如 cd plugins

Apache Airflow v1

下列範例顯示 plugins.zip 檔案,其中包含 hooksoperators和 Apache Airflow v1.10.12 sensors的個別目錄。

範例 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

下列範例顯示使用自訂外掛程式的 DAG (DAGs 資料夾) 中的匯入陳述式。

範例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
範例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

下列範例顯示自訂外掛程式檔案中所需的每個匯入陳述式。

範例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
範例 sensor/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
範例 Operator/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
範例 Operator/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

請遵循使用 Amazon MWAA CLI 公用程式測試自訂外掛程式中的步驟,然後建立 plugins.zip 檔案以壓縮plugins目錄中的內容。 例如 cd plugins

建立 plugins.zip 檔案

下列步驟說明我們建議在本機建立 plugins.zip 檔案的步驟。

步驟一:使用 Amazon MWAA CLI 公用程式測試自訂外掛程式

  • 命令列界面 (CLI) 公用程式會在本機複寫 Amazon Managed Workflows for Apache Airflow 環境。

  • CLI 會在本機建置類似於 Amazon MWAA 生產映像的 Docker 容器映像。這可讓您執行本機 Apache Airflow 環境,以在部署到 Amazon MWAA 之前開發和測試 DAGs、自訂外掛程式和相依性。

  • 若要執行 CLI,請參閱 GitHub 上的 aws-mwaa-local-runner

步驟二:建立 plugins.zip 檔案

您可以使用內建的 ZIP 封存公用程式,或任何其他 ZIP 公用程式 (例如 7zip) 來建立 .zip 檔案。

注意

當您建立 .zip 檔案時,Windows OS 的內建 zip 公用程式可能會新增子資料夾。建議您先驗證 plugins.zip 檔案的內容,再上傳至 Amazon S3 儲存貯體,以確保沒有新增其他目錄。

  1. 將目錄變更為本機 Airflow 外掛程式目錄。例如:

    myproject$ cd plugins
  2. 執行下列命令,以確保內容具有可執行的許可 (僅限 macOS 和 Linux)。

    plugins$ chmod -R 755 .
  3. 壓縮plugins資料夾中的內容。

    plugins$ zip -r plugins.zip .

plugins.zip 上傳至 Amazon S3

您可以使用 Amazon S3 主控台或 AWS Command Line Interface (AWS CLI) 將plugins.zip檔案上傳至 Amazon S3 儲存貯體。

使用 AWS CLI

AWS Command Line Interface (AWS CLI) 是一種開放原始碼工具,可讓您使用命令列 Shell 中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:

使用 上傳 AWS CLI
  1. 在命令提示中,導覽至儲存plugins.zip檔案的目錄。例如:

    cd plugins
  2. 使用下列命令列出所有 Amazon S3 儲存貯體。

    aws s3 ls
  3. 使用下列命令列出您環境的 Amazon S3 儲存貯體中的檔案和資料夾。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. 使用下列命令,將plugins.zip檔案上傳至您環境的 Amazon S3 儲存貯體。

    aws s3 cp plugins.zip s3://amzn-s3-demo-bucket/plugins.zip

使用 Amazon S3 主控台

Amazon S3 主控台是一種 Web 型使用者介面,可讓您建立和管理 Amazon S3 儲存貯體中的資源。

使用 Amazon S3 主控台上傳
  1. 在 Amazon MWAA 主控台上開啟環境頁面

  2. 選擇環境。

  3. S3 窗格中的 DAG 程式碼中選取 S3 儲存貯體連結,以在 Amazon S3 主控台上開啟儲存貯體。

  4. 選擇上傳

  5. 選擇新增檔案

  6. 選取 的本機副本plugins.zip,然後選擇上傳

在您的環境上安裝自訂外掛程式

本節說明如何透過指定 plugins.zip 檔案的路徑,以及在每次更新 zip 檔案時指定 plugins.zip 檔案的版本,來安裝您上傳至 Amazon S3 儲存貯體的自訂外掛程式。

在 Amazon MWAA 主控台plugins.zip上指定 的路徑 (第一次)

如果這是您第一次將 上傳至 Amazon S3 plugins.zip儲存貯體,您也需要在 Amazon MWAA 主控台上指定 檔案的路徑。您只需要完成此步驟一次。

  1. 在 Amazon MWAA 主控台上開啟環境頁面

  2. 選擇環境。

  3. 選擇編輯

  4. Amazon S3 窗格中的 DAG 程式碼上,選擇外掛程式檔案 - 選用欄位旁的瀏覽 S3

  5. 選取 Amazon S3 儲存貯體上的 plugins.zip 檔案。

  6. 選擇 Choose (選擇)

  7. 選擇下一步更新環境

在 Amazon MWAA 主控台上指定plugins.zip版本

每次在 Amazon S3 儲存貯plugins.zip體中上傳新版本的 時,您需要在 Amazon MWAA 主控台上指定 plugins.zip 檔案的版本。

  1. 在 Amazon MWAA 主控台上開啟環境頁面

  2. 選擇環境。

  3. 選擇編輯

  4. Amazon S3 窗格中的 DAG 程式碼上,在下拉式清單中選擇plugins.zip版本。

  5. 選擇下一步

plugins.zip 的範例使用案例

後續步驟?

  • 使用 GitHub 上的 aws-mwaa-local-runner,在本機測試您的 DAGs、自訂外掛程式和 Python 相依性。