使用 AWS Secrets Manager 中的密钥进行 Apache Airflow 连接
以下示例调用 AWS Secrets Manager 在 Amazon MWAA 上获取 Apache Airflow 连接的密钥。它假设您已完成 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 中的步骤。
版本
您可以在 Python 3.10
先决条件
要使用本页上的示例代码,您需要以下内容:
-
创建 Secrets Manager 后端作为 Apache Airflow 配置选项,如 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 所列。
-
Secrets Manager 中的 Apache Airflow 连接字符串,如 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 所列。
权限
-
Secrets Manager 权限,如 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 所列。
要求
要在 Apache Airflow v2 和更高版本中使用此代码示例,无需附加依赖项。使用 aws-mwaa-docker-images
代码示例
以下步骤描述了如何创建 DAG 代码,以便调用 Secrets Manager 来获取密钥。
-
在命令提示符下,导航到存储 DAG 代码的目录。例如:
cd dags -
复制以下代码示例的内容,并在本地另存为
secrets-manager.py。""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG, settings, secrets from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from datetime import timedelta import os ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html sm_secretId_name = 'airflow/connections/myconn' default_args = { 'owner': 'airflow', 'start_date': days_ago(1), 'depends_on_past': False } ### Gets the secret myconn from Secrets Manager def read_from_aws_sm_fn(**kwargs): ### set up Secrets Manager hook = AwsBaseHook(client_type='secretsmanager') client = hook.get_client_type(region_name='us-east-1') response = client.get_secret_value(SecretId=sm_secretId_name) myConnSecretString = response["SecretString"] return myConnSecretString ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None ) as dag: write_all_to_aws_sm = PythonOperator( task_id="read_from_aws_sm", python_callable=read_from_aws_sm_fn, provide_context=True )
接下来做什么?
-
要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的
dags文件夹,请参阅 添加或更新 DAG。