Oracle でのカスタムプラグインの作成
次のサンプルでは、Oracle for Amazon MWAA を使用してカスタムプラグインを作成する手順を詳しく説明しており、これを plugins.zip ファイル内の他のカスタムプラグインやバイナリと組み合わせることができます。
バージョン
このページのコード例は、Python 3.10
前提条件
このページのサンプルコードを使用するには、以下が必要です。
-
ワーカーのログ記録は、環境に応じて、任意のログレベル (
CRITICALまたは前のセクション) で有効になります。Amazon MWAA ログタイプとロググループの管理方法の詳細については、Amazon CloudWatch の Airflow ログへのアクセス を参照してください
アクセス許可
このページのコード例を使用する場合、追加のアクセス許可は必要ありません。
要件
このページのサンプルコードを使用するには、次の依存関係を requirements.txt に追加してください。詳細については、Python 依存関係のインストール を参照してください。
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt cx_Oracle apache-airflow-providers-oracle
コードサンプル
次のステップでは、カスタムプラグインをテストする DAG コードを作成する方法について説明します。
-
コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:
cd dags -
以下のコードサンプルの内容をコピーし、ローカルに
oracle.pyとして保存します。from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )
カスタムプラグインを作成する
このセクションでは、依存関係のダウンロード、カスタムプラグインと plugins.zip の作成方法について説明します。
依存関係のダウンロード
Amazon MWAA は plugins.zip のコンテンツを各 Amazon MWAA スケジューラーとワーカーコンテナの /usr/local/airflow/plugins に抽出します。これはバイナリを環境に追加するために使用されます。次のステップでは、カスタムプラグインに必要なファイルを組み立てる方法について説明します。
Amazon Linux コンテナイメージをプルする
-
コマンドプロンプトで Amazon Linux コンテナイメージを取得し、コンテナをローカルで実行します。例:
docker pull amazonlinux docker run -it amazonlinux:latest /bin/bashコマンドプロンプトで bash コマンドラインを呼び出すことができます。例:
bash-4.2# -
Linux ネイティブの非同期 I/O 機能 (libaio) をインストールします。
yum -y install libaio -
以降の手順では、このウィンドウを開いたままにしておいてください。次のファイルをローカルにコピーします:
lib64/libaio.so.1、lib64/libaio.so.1.0.0、lib64/libaio.so.1.0.1。
クライアントフォルダをダウンロード
-
unzip パッケージをローカルにインストールします。例:
sudo yum install unzip -
oracle_pluginディレクトリを作成します。例:mkdir oracle_plugin cd oracle_plugin -
次の curl コマンドを使用して、Linux x86-64 (64 ビット) 用 Oracle インスタントクライアントダウンロード
から instantclient-basic-linux.x64-18.5.0.0.0dbru.zip をダウンロードします。 curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip -
client.zipファイルを解凍します。例:unzip *.zip
Docker からファイルを抽出します。
-
新しいコマンドプロンプトで、Docker コンテナ ID を表示して書き留めます。例:
docker container lsコマンドプロンプトでは、すべてのコンテナとその ID を返すことができます。例:
debc16fd6970 -
oracle_pluginディレクトリで、lib64/libaio.so.1、lib64/libaio.so.1.0.0、lib64/libaio.so.1.0.1ファイルをローカルinstantclient_18_5フォルダに抽出してください。例:docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
カスタムプラグイン
Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。これは環境変数の設定と変更に使用されます。次のステップでは、カスタムプラグインのサンプルコードを説明します。
-
以下のコードサンプルの内容をコピーし、ローカルに
env_var_plugin_oracle.pyとして保存します。from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'
Plugins.zip
以下のステップは、plugins.zip を作成する方法について説明します。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの plugins.zip ファイルにすることができます。
プラグインディレクトリの中身を Zip 圧縮する
-
コマンドプロンプトで、
oracle_pluginディレクトリに移動します。例:cd oracle_plugin -
instantclient_18_5ディレクトリを plugins.zip に圧縮します。例:zip -r ../plugins.zip ./コマンドプロンプトに次のように表示されます。
oracle_plugin$ ls client.zip instantclient_18_5 -
client.zipファイルを削除します。例:rm client.zip
env_var_plugin_oracle.py ファイルを圧縮します。
-
plugins.zip のルートに
env_var_plugin_oracle.pyファイルを追加します。例:zip plugins.zip env_var_plugin_oracle.py -
これで、plugins.zip には次のものが含まれるようになりました。
env_var_plugin_oracle.py instantclient_18_5/
Airflow 設定オプション
Apache Airflow v2 を使用している場合、core.lazy_load_plugins : False を Apache Airflow の設定オプションとして追加してください。詳細については、2 の設定オプションによるプラグインの読み込み を参照してください。
次のステップ
-
この例の
requirements.txtファイルを Amazon S3 バケットにアップロードする方法について詳しくは、Python 依存関係のインストール をご覧ください。 -
この例の DAG コードを Amazon S3 バケットの
dagsフォルダにアップロードする方法については、DAG の追加と更新 を参照してください。 -
この例の
plugins.zipファイルを Amazon S3 バケットにアップロードする方法について詳しくは、カスタムプラグインのインストール をご覧ください。