

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Amazon MWAA 上使用 DAG
<a name="working-dags"></a>

要在 Amazon MWAA 环境中运行有向无环图（DAG），请将文件复制到与环境相连的 Amazon S3 存储桶中，然后让 Amazon MWAA 知道您的 DAG 和支持文件在 Amazon MWAA 控制台上的位置。Amazon MWAA 负责在工作线程、计划程序和 Web 服务器之间同步 DAG。本指南介绍如何在 Amazon MWAA 环境中添加或更新 DAG，以及如何安装自定义插件和 Python 依赖项。

**Topics**
+ [

## Amazon S3 存储桶概述
](#working-dags-s3-about)
+ [

# 添加或更新 DAG
](configuring-dag-folder.md)
+ [

# 安装自定义插件
](configuring-dag-import-plugins.md)
+ [

# 安装 Python 依赖项
](working-dags-dependencies.md)
+ [

# 删除 Amazon S3 上的文件
](working-dags-delete.md)

## Amazon S3 存储桶概述
<a name="working-dags-s3-about"></a>

适用于 Amazon MWAA 环境的 Amazon S3 存储桶必须*已阻止公共访问权限*。默认情况下，所有 Amazon S3 资源都是私有的，包括桶、对象和相关子资源（例如，生命周期配置）。
+ 只有资源拥有者，即创建该存储桶的 AWS 账户 可以访问该资源。资源拥有者（例如管理员）可以写入访问控制策略来授予他人访问权限。
+ 您设置的访问策略必须有权将 DAG、`plugins.zip` 中的自定义插件和 `requirements.txt` 中的 Python 依赖项添加到 Amazon S3 存储桶中。有关包含所需权限的策略示例，请参阅 [AmazonMWAAFullConsoleAccess](access-policies.md#console-full-access)。

Amazon MWAA 环境的 Amazon S3 存储桶必须*启用版本控制*。启用 Amazon S3 存储桶版本控制后，每当创建新版本时，都会创建一个新副本。
+ 在 Amazon S3 存储桶上，为 `plugins.zip` 中的自定义插件和 `requirements.txt` 中的 Python 依赖项启用了版本控制。
+ 每次在 Amazon S3 存储桶上更新文件时，都必须在 Amazon MWAA 控制台上指定 `plugins.zip` 和 `requirements.txt` 的版本。

# 添加或更新 DAG
<a name="configuring-dag-folder"></a>

有向无环图（DAG）在 Python 文件中定义，该文件将 DAG 的结构定义为代码。您可以使用 AWS CLI 或 Amazon S3 控制台将 DAG 上传到环境。本主题介绍使用 Amazon S3 存储桶中的 `dags` 文件夹，在 Amazon MWAA 环境中添加或更新 Apache Airflow DAG 的步骤。

**Topics**
+ [

## 先决条件
](#configuring-dag-folder-prereqs)
+ [

## 工作方式
](#configuring-dag-folder-how)
+ [

## 更改了哪些内容？
](#configuring-dag-folder-changed)
+ [

## 使用 Amazon MWAA CLI 实用工具测试 DAG
](#working-dag-folder-cli-utility)
+ [

## 将 DAG 代码上传到 Amazon S3
](#configuring-dag-folder-uploading)
+ [

## 在 Amazon MWAA 控制台上指定 DAG 文件夹的路径（第一次）
](#configuring-dag-folder-mwaaconsole)
+ [

## 在 Apache Airflow UI 上访问更改
](#configuring-dag-folder-mwaaconsole-view)
+ [

## 接下来做什么？
](#configuring-dag-folder-next-up)

## 先决条件
<a name="configuring-dag-folder-prereqs"></a>

在完成本页上的步骤之前，您需要具备以下条件。
+ **权限** — 您的 AWS 账户 必须已获得管理员授权，访问适用于环境的 [AmazonMWAAFullConsoleAccess](access-policies.md#console-full-access) 访问控制策略。此外，[执行角色](mwaa-create-role.md)必须允许 Amazon MWAA 环境访问环境所使用的 AWS 资源。
+ **访问权限** — 如果您需要访问公共存储库以便直接在 Web 服务器上安装依赖项，则必须将环境配置为具有**公共网络** Web 服务器访问权限。有关更多信息，请参阅[Apache Airflow 访问模式](configuring-networking.md)。
+ **Amazon S3 配置** — 用于存储 DAG 的 [Amazon S3 存储桶](mwaa-s3-bucket.md)、在 `plugins.zip` 中的自定义插件和在 `requirements.txt` 中的 Python 依赖项必须配置为*已阻止公共访问*和*已启用版本控制*。

## 工作方式
<a name="configuring-dag-folder-how"></a>

有向无环图（DAG）在单个 Python 文件中定义，该文件将 DAG 的结构定义为代码。它包含以下各项：
+ [DAG](https://airflow.apache.org/docs/stable/concepts.html#dags) 定义。
+ 描述如何运行 DAG 和要运行的[任务](https://airflow.apache.org/docs/stable/concepts.html#tasks)的[运算符](https://airflow.apache.org/concepts.html#operators)。
+ 描述任务运行顺序的[运算符关系](https://airflow.apache.org/concepts.html#bitshift-composition)。

要在 Amazon MWAA 环境中运行 Apache Airflow 平台，您需要将 DAG 定义复制到存储桶中的 `dags` 文件夹。例如，存储桶中的 DAG 文件夹应如下所示：

**Example DAG 文件夹**  

```
dags/
└ dag_def.py
```

Amazon MWAA 每 30 秒自动将新建和更改的对象从 Amazon S3 存储桶同步到 Amazon MWAA 计划程序和工作线程容器的 `/usr/local/airflow/dags` 文件夹，从而保留 Amazon S3 源的文件层次结构，无论文件类型如何。新 DAG 在 Apache Airflow UI 中列出所需的时间由 `scheduler.dag\$1dir\$1list\$1interval` 控制。对现有 DAG 的更改将在下一个 [DAG 处理循环](best-practices-tuning.md#best-practices-tuning-scheduler)中获取。

**注意**  
您不需要在 DAG 文件夹中包含 `airflow.cfg` 配置文件。您可以从Amazon MWAA 控制台覆盖默认 Apache Airflow 配置。有关更多信息，请参阅[在 Amazon MWAA 上使用 Apache Airflow 配置选项](configuring-env-variables.md)。

## 更改了哪些内容？
<a name="configuring-dag-folder-changed"></a>

要查看特定 Apache Airflow 版本的更改，请参阅[发布说明](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#release-notes)页面。
+ Apache Airflow v3 配置：[配置参考](https://airflow.apache.org/docs/apache-airflow/3.0.6/configurations-ref.html)
+ Apache Airflow v2 公共接口信息：[Airflow 的公共接口](https://airflow.apache.org/docs/apache-airflow/2.10.3/public-airflow-interface.html)

## 使用 Amazon MWAA CLI 实用工具测试 DAG
<a name="working-dag-folder-cli-utility"></a>
+ 命令行界面 (CLI) 实用工具可在本地复制 Amazon MWAA 环境。
+ CLI 在本地构建 Docker 容器镜像，类似于 Amazon MWAA 生产镜像。您可以使用它运行本地 Apache Airflow 环境来开发和测试 DAG、自定义插件和依赖项，然后部署到 Amazon MWAA。
+ 要运行 CLI，请参阅 GitHub 上的 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)。

## 将 DAG 代码上传到 Amazon S3
<a name="configuring-dag-folder-uploading"></a>

您可以使用 Amazon S3 控制台或 AWS Command Line Interface (AWS CLI) 将 DAG 代码上传到 Amazon S3 存储桶中。以下步骤假设您正在将代码 (`.py`) 上传到 Amazon S3 存储桶中名为 `dags` 的文件夹。

### 使用 AWS CLI
<a name="configuring-dag-folder-cli"></a>

AWS Command Line Interface (AWS CLI) 是一种开源工具，您可以用来在命令行 Shell 中使用命令与 AWS 服务进行交互。要完成本节中的步骤，您需要以下满足以下条件：
+ [AWS CLI – 安装版本 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。
+ [AWS CLI – 使用 `aws configure` 进行快速配置](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html)。

**要使用 AWS CLI 上传，请执行以下操作**

1. 以下示例列出所有 Amazon S3 存储桶。

   ```
   aws s3 ls
   ```

1. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

   ```
   aws s3 ls s3://YOUR_S3_BUCKET_NAME
   ```

1. 以下命令将 `dag_def.py` 文件上传到 `dags` 文件夹。

   ```
   aws s3 cp dag_def.py s3://amzn-s3-demo-bucket/dags/
   ```

   如果 Amazon S3 存储桶中尚不存在名为 `dags` 的文件夹，则此命令会创建 `dags` 文件夹，并将名为 `dag_def.py` 的文件上传到新文件夹。

### 使用 Amazon S3 控制台
<a name="configuring-dag-folder-console"></a>

Amazon S3 控制台是一个基于 Web 的用户界面，可用来创建和管理 Amazon S3 存储桶中的资源。以下步骤假设您有一个名为 `dags` 的 DAG 文件夹。

**要使用 Amazon S3 控制台上传，请执行以下操作**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在 **S3 中的 DAG 代码**窗格中选择 **S3 存储桶**链接，在控制台上打开存储桶。

1. 选择 `dags` 文件夹。

1. 选择**上传**。

1. 选择 **添加文件**。

1. 选择 `dag_def.py` 的本地副本，选择**上传**。

## 在 Amazon MWAA 控制台上指定 DAG 文件夹的路径（第一次）
<a name="configuring-dag-folder-mwaaconsole"></a>

以下步骤假设您要在 Amazon S3 桶中指定名为 `dags` 文件夹的路径。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择要在其中运行 DAG 的环境。

1. 选择**编辑**。

1. 在 **Amazon S3 中的 DAG 代码**窗格上，选择 **DAG 文件夹**字段旁边的**浏览 S3**。

1. 选择 `dags` 文件夹。

1. 选择**选择**。

1. 选择**下一步**、**更新环境**。

## 在 Apache Airflow UI 上访问更改
<a name="configuring-dag-folder-mwaaconsole-view"></a>

您需要在 AWS Identity and Access Management (IAM) 中拥有 AWS 账户 的 [Apache Airflow 用户界面访问策略：亚马逊 MWAAWeb ServerAccess](access-policies.md#web-ui-access) 权限才能访问 Apache Airflow UI。

**要访问 Apache Airflow UI，请执行以下操作**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 选择**打开 Airflow UI**。

## 接下来做什么？
<a name="configuring-dag-folder-next-up"></a>

使用 GitHub 上的 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 在本地测试 DAG、自定义插件和 Python 依赖项。

# 安装自定义插件
<a name="configuring-dag-import-plugins"></a>

Amazon MWAA 支持 Apache Airflow 的内置插件管理器，允许您使用自定义 Apache Airflow 运算符、挂钩、传感器或接口。本页介绍使用 `plugins.zip` 文件在 Amazon MWAA 环境中安装 [Apache Airflow 自定义插件](https://airflow.incubator.apache.org/plugins.html)的步骤。

**Contents**
+ [

## 先决条件
](#configuring-dag-plugins-prereqs)
+ [

## 工作方式
](#configuring-dag-plugins-how)
+ [

## 何时使用插件
](#configuring-dag-plugins-changed)
+ [

## 自定义插件概述
](#configuring-dag-plugins-overview)
  + [

### 自定义插件目录和大小限制
](#configuring-dag-plugins-quota)
+ [

## 自定义插件示例
](#configuring-dag-plugins-airflow-ex)
  + [

### 在 plugins.zip 中使用平面目录结构的示例
](#configuring-dag-plugins-overview-simple)
  + [

### 在 plugins.zip 中使用平面目录结构的示例
](#configuring-dag-plugins-overview-complex)
+ [

## 创建 plugins.zip 文件
](#configuring-dag-plugins-test-create)
  + [

### 步骤 1：使用 Amazon MWAA CLI 实用工具测试自定义插件
](#configuring-dag-plugins-cli-utility)
  + [

### 步骤 2：创建 plugins.zip 文件
](#configuring-dag-plugins-zip)
+ [

## 上传 `plugins.zip` 到 Amazon S3
](#configuring-dag-plugins-upload)
  + [

### 使用 AWS CLI
](#configuring-dag-plugins-upload-cli)
  + [

### 使用 Amazon S3 控制台
](#configuring-dag-plugins-upload-console)
+ [

## 在环境中安装自定义插件
](#configuring-dag-plugins-mwaa-installing)
  + [

### 在 Amazon MWAA 控制台上指定 `plugins.zip` 的路径（第一次）
](#configuring-dag-plugins-mwaa-first)
  + [

### 在 Amazon MWAA 控制台上指定 `plugins.zip` 的版本
](#configuring-dag-plugins-s3-mwaaconsole)
+ [

## plugins.zip 的用例示例
](#configuring-dag-plugins-examples)
+ [

## 接下来做什么？
](#configuring-dag-plugins-next-up)

## 先决条件
<a name="configuring-dag-plugins-prereqs"></a>

在完成本页上的步骤之前，您需要具备以下条件。
+ **权限** — 您的 AWS 账户 必须已获得管理员授权，访问适用于环境的 [AmazonMWAAFullConsoleAccess](access-policies.md#console-full-access) 访问控制策略。此外，[执行角色](mwaa-create-role.md)必须允许 Amazon MWAA 环境访问环境所使用的 AWS 资源。
+ **访问权限** — 如果您需要访问公共存储库以便直接在 Web 服务器上安装依赖项，则必须将环境配置为具有**公共网络** Web 服务器访问权限。有关更多信息，请参阅[Apache Airflow 访问模式](configuring-networking.md)。
+ **Amazon S3 配置** — 用于存储 DAG 的 [Amazon S3 存储桶](mwaa-s3-bucket.md)、在 `plugins.zip` 中的自定义插件和在 `requirements.txt` 中的 Python 依赖项必须配置为*已阻止公共访问*和*已启用版本控制*。

## 工作方式
<a name="configuring-dag-plugins-how"></a>

要在环境中运行自定义插件，您必须做三件事：

1. 在本地创建 `plugins.zip` 文件。

1. 将 `plugins.zip` 文件上传到 Amazon S3 中的存储桶。

1. 在 Amazon MWAA 控制台的**插件文件**字段中指定此文件的版本。

**注意**  
如果这是您首次将 `plugins.zip` 上传到 Amazon S3 存储桶，则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。

## 何时使用插件
<a name="configuring-dag-plugins-changed"></a>

正如 [Apache Airflow 文档](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html#plugins)中所述，仅在扩展 Apache Airflow 用户界面时才需要插件。可以直接将自定义操作符与 `DAG` 代码一起放入 `/dags` 文件夹中。

如果需要自行创建与外部系统的集成，请将其放在 /`dags` 文件夹或其中的子文件夹中，而不是放在 `plugins.zip` 文件夹中。在 Apache Airflow 2.x 中，插件主要用于扩展 UI。

同样，不能将其他依赖项放入 `plugins.zip` 中。而可以将其存储在 Amazon S3 `/dags` 文件夹中的某个位置，在 Apache Airflow 启动之前，这些依赖项将在该位置同步到每个 Amazon MWAA 容器。

**注意**  
`/dags` 文件夹或 `plugins.zip` 中任何未显式定义 Apache Airflow DAG 对象的文件都必须在 `.airflowignore` 文件中列出。

## 自定义插件概述
<a name="configuring-dag-plugins-overview"></a>

Apache Airflow 的内置插件管理器只需将文件拖放到 `$AIRFLOW_HOME/plugins` 文件夹中即可将外部功能集成到其核心中。它允许您使用自定义 Apache Airflow 运算符、钩子、传感器或接口。下一节提供了本地开发环境中平面和嵌套目录结构的示例，以及生成的 import 语句，这些语句决定了 plugins.zip 中的目录结构。

### 自定义插件目录和大小限制
<a name="configuring-dag-plugins-quota"></a>

在启动期间，Apache Airflow 计划程序和工作线程在 `/usr/local/airflow/plugins/*` 中环境 AWS 托管的 Fargate 容器中搜索自定义插件。
+ **目录结构**。目录结构（在 `/*` 中）基于您 `plugins.zip` 文件的内容。例如，如果 `plugins.zip` 包含 `operators` 目录作为主级目录，则该目录将被解压缩到环境的 `/usr/local/airflow/plugins/operators` 中。
+ **大小限制**。我们建议使用小于 1 GB 的 `plugins.zip` 文件。`plugins.zip` 文件大小越大，环境的启动时间就越长。尽管 Amazon MWAA 没有明确限制 `plugins.zip` 文件的大小，但如果无法在十分钟内安装依赖项，Fargate 服务将超时并尝试将环境回滚到稳定状态。

**注意**  
对于使用 Apache Airflow v2.0.2 的环境，Amazon MWAA 会限制 Apache Airflow Web 服务器上的出站流量，并且不允许直接在 Web 服务器上安装插件或 Python 依赖项。从 Apache Airflow v2.2.2 开始，Amazon MWAA 可以直接在 Web 服务器上安装插件和依赖项。

## 自定义插件示例
<a name="configuring-dag-plugins-airflow-ex"></a>

下一节使用*《Apache Airflow 参考指南》*中的示例代码来说明如何构建本地开发环境。

### 在 plugins.zip 中使用平面目录结构的示例
<a name="configuring-dag-plugins-overview-simple"></a>

------
#### [ Apache Airflow v3 ]

以下示例显示了 Apache Airflow v3 中一个采用扁平目录结构的 `plugins.zip` 文件。

**Example 带 `PythonVirtualenvOperator` plugins.zip 的扁平目录**  
以下示例显示 [为 Apache Airflow PythonVirtualenvOperator 创建自定义插件](samples-virtualenv.md) 中 `PythonVirtualenvOperator` 自定义插件的 plugins.zip 文件的主级树。  

```
├── virtual_python_plugin.py
```

**Example plugins/virtual\$1python\$1plugin.py**  
以下示例显示 `PythonVirtualenvOperator` 自定义插件。  

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
 
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv 
from typing import List

def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
    cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
    if system_site_packages:
        cmd.append('--system-site-packages')
    if python_bin is not None:
        cmd.append(f'--python={python_bin}')
    return cmd

airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd

class VirtualPythonPlugin(AirflowPlugin):                
    name = 'virtual_python_plugin'
```

------
#### [ Apache Airflow v2 ]

以下示例显示了 Apache Airflow v2 中一个采用扁平目录结构的 `plugins.zip` 文件。

**Example 带 `PythonVirtualenvOperator` plugins.zip 的扁平目录**  
以下示例显示 [为 Apache Airflow PythonVirtualenvOperator 创建自定义插件](samples-virtualenv.md) 中 `PythonVirtualenvOperator` 自定义插件的 plugins.zip 文件的主级树。  

```
├── virtual_python_plugin.py
```

**Example plugins/virtual\$1python\$1plugin.py**  
以下示例显示 `PythonVirtualenvOperator` 自定义插件。  

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
 
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv 
from typing import List

def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
    cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
    if system_site_packages:
        cmd.append('--system-site-packages')
    if python_bin is not None:
        cmd.append(f'--python={python_bin}')
    return cmd

airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd

class VirtualPythonPlugin(AirflowPlugin):                
    name = 'virtual_python_plugin'
```

------

### 在 plugins.zip 中使用平面目录结构的示例
<a name="configuring-dag-plugins-overview-complex"></a>

------
#### [ Apache Airflow v3 ]

以下示例显示一个 `plugins.zip` 文件，其中包含 `hooks`、`operators` 的单独目录和 `sensors` 目录。

**Example Plugins.zip**  

```
__init__.py
my_airflow_plugin.py
 hooks/
  |-- __init__.py
  |-- my_airflow_hook.py
 operators/
  |-- __init__.py
  |-- my_airflow_operator.py
  |-- hello_operator.py
 sensors/
  |-- __init__.py
  |-- my_airflow_sensor.py
```

以下示例显示使用自定义插件的 DAG（[DAG 文件夹](https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-dag-folder.html#configuring-dag-folder-how)）中的导入语句。

**Example dags/your\$1dag.py**  

```
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator

default_args = {
	'owner': 'airflow',
	'depends_on_past': False,
	'start_date': datetime(2018, 1, 1),
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=5),
}


with DAG('customdag',
		 max_active_runs=3,
		 schedule_interval='@once',
		 default_args=default_args) as dag:

	sens = MySensor(
		task_id='taskA'
	)

	op = MyOperator(
		task_id='taskB',
		my_field='some text'
	)

	hello_task = HelloOperator(task_id='sample-task', name='foo_bar')



	sens >> op >> hello_task
```

**Example plugins/my\$1airflow\$1plugin.py**  

```
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
                    
class PluginName(AirflowPlugin):
                    
    name = 'my_airflow_plugin'
                    
    hooks = [MyHook]
    operators = [MyOperator]
    sensors = [MySensor]
```

以下示例显示自定义插件文件中所需的每条导入语句。

**Example hooks/my\$1airflow\$1hook.py**  

```
from airflow.hooks.base import BaseHook


class MyHook(BaseHook):

    def my_method(self):
        print("Hello World")
```

**Example sensors/my\$1airflow\$1sensor.py**  

```
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class MySensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MySensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        return True
```

**Example operators/my\$1airflow\$1operator.py**  

```
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook


class MyOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                 my_field,
                 *args,
                 **kwargs):
        super(MyOperator, self).__init__(*args, **kwargs)
        self.my_field = my_field

    def execute(self, context):
        hook = MyHook('my_conn')
        hook.my_method()
```

**Example operators/hello\$1operator.py**  

```
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message
```

按照[使用 Amazon MWAA CLI 实用工具测试自定义插件](#configuring-dag-plugins-cli-utility)中的步骤进行操作，然后[创建 plugins.zip 文件](#configuring-dag-plugins-zip)来将内容压缩到 `plugins` 目录**之内**。例如 `cd plugins`。

------
#### [ Apache Airflow v2 ]

以下示例显示一个 `plugins.zip` 文件，其中包含 `hooks`、`operators` 的单独目录和 `sensors` 目录。

**Example Plugins.zip**  

```
__init__.py
 my_airflow_plugin.py
 hooks/
  |-- __init__.py
  |-- my_airflow_hook.py
 operators/
  |-- __init__.py
  |-- my_airflow_operator.py
  |-- hello_operator.py
 sensors/
  |-- __init__.py
  |-- my_airflow_sensor.py
```

以下示例显示使用自定义插件的 DAG（[DAG 文件夹](https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-dag-folder.html#configuring-dag-folder-how)）中的导入语句。

**Example dags/your\$1dag.py**  

```
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator

default_args = {
	'owner': 'airflow',
	'depends_on_past': False,
	'start_date': datetime(2018, 1, 1),
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=5),
}


with DAG('customdag',
		 max_active_runs=3,
		 schedule_interval='@once',
		 default_args=default_args) as dag:

	sens = MySensor(
		task_id='taskA'
	)

	op = MyOperator(
		task_id='taskB',
		my_field='some text'
	)

	hello_task = HelloOperator(task_id='sample-task', name='foo_bar')



	sens >> op >> hello_task
```

**Example plugins/my\$1airflow\$1plugin.py**  

```
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
                    
class PluginName(AirflowPlugin):
                    
    name = 'my_airflow_plugin'
                    
    hooks = [MyHook]
    operators = [MyOperator]
    sensors = [MySensor]
```

以下示例显示自定义插件文件中所需的每条导入语句。

**Example hooks/my\$1airflow\$1hook.py**  

```
from airflow.hooks.base import BaseHook


class MyHook(BaseHook):

    def my_method(self):
        print("Hello World")
```

**Example sensors/my\$1airflow\$1sensor.py**  

```
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class MySensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MySensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        return True
```

**Example operators/my\$1airflow\$1operator.py**  

```
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook


class MyOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                 my_field,
                 *args,
                 **kwargs):
        super(MyOperator, self).__init__(*args, **kwargs)
        self.my_field = my_field

    def execute(self, context):
        hook = MyHook('my_conn')
        hook.my_method()
```

**Example operators/hello\$1operator.py**  

```
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message
```

按照[使用 Amazon MWAA CLI 实用工具测试自定义插件](#configuring-dag-plugins-cli-utility)中的步骤进行操作，然后[创建 plugins.zip 文件](#configuring-dag-plugins-zip)来将内容压缩到 `plugins` 目录**之内**。例如 `cd plugins`。

------

## 创建 plugins.zip 文件
<a name="configuring-dag-plugins-test-create"></a>

以下步骤描述了我们建议在本地创建 plugins.zip 文件的步骤。

### 步骤 1：使用 Amazon MWAA CLI 实用工具测试自定义插件
<a name="configuring-dag-plugins-cli-utility"></a>
+ 命令行界面 (CLI) 实用工具可在本地复制 Amazon MWAA 环境。
+ CLI 在本地构建 Docker 容器镜像，类似于 Amazon MWAA 生产镜像。您可以使用它运行本地 Apache Airflow 环境来开发和测试 DAG、自定义插件和依赖项，然后部署到 Amazon MWAA。
+ 要运行 CLI，请参阅 GitHub 上的 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)。

### 步骤 2：创建 plugins.zip 文件
<a name="configuring-dag-plugins-zip"></a>

您可以使用内置的 ZIP 存档实用工具或任何其他 ZIP 实用工具（例如 [7zip](https://www.7-zip.org/download.html)）来创建.zip 文件。

**注意**  
当您创建 .zip 文件时，Windows 操作系统的内置 ZIP 实用工具可能会添加子文件夹。我们建议您验证 plugins.zip 文件的内容，然后再上传到 Amazon S3 存储桶，以确保没有添加其他目录。

1. 将目录更改为本地 Airflow 插件目录。例如：

   ```
   myproject$ cd plugins
   ```

1. 运行以下命令以确保内容具有可执行权限（仅限 macOS 和 Linux）。

   ```
   plugins$ chmod -R 755 .
   ```

1. 将内容压缩到 `plugins` 文件夹中****。

   ```
   plugins$ zip -r plugins.zip .
   ```

## 上传 `plugins.zip` 到 Amazon S3
<a name="configuring-dag-plugins-upload"></a>

您可以使用 Amazon S3 控制台或 AWS Command Line Interface (AWS CLI) 将 `plugins.zip` 文件上传到 Amazon S3 存储桶中。

### 使用 AWS CLI
<a name="configuring-dag-plugins-upload-cli"></a>

AWS Command Line Interface (AWS CLI) 是一种开源工具，您可以用来在命令行 Shell 中使用命令与 AWS 服务进行交互。要完成本节中的步骤，您需要以下满足以下条件：
+ [AWS CLI – 安装版本 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。
+ [AWS CLI – 使用 `aws configure` 进行快速配置](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html)。

**要使用 AWS CLI 上传，请执行以下操作**

1. 在命令提示符下，导航到存储 `plugins.zip` 文件的目录。例如：

   ```
   cd plugins
   ```

1. 以下示例列出所有 Amazon S3 存储桶。

   ```
   aws s3 ls
   ```

1. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

   ```
   aws s3 ls s3://YOUR_S3_BUCKET_NAME
   ```

1. 使用以下命令将 `plugins.zip` 文件上传到环境的 Amazon S3 存储桶。

   ```
   aws s3 cp plugins.zip s3://amzn-s3-demo-bucket/plugins.zip
   ```

### 使用 Amazon S3 控制台
<a name="configuring-dag-plugins-upload-console"></a>

Amazon S3 控制台是一个基于 Web 的用户界面，可用来创建和管理 Amazon S3 存储桶中的资源。

**要使用 Amazon S3 控制台上传，请执行以下操作**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在 **S3 中的 DAG 代码**窗格中选择 **S3 存储桶**链接，在控制台上打开存储桶。

1. 选择**上传**。

1. 选择 **添加文件**。

1. 选择 `plugins.zip` 的本地副本，选择**上传**。

## 在环境中安装自定义插件
<a name="configuring-dag-plugins-mwaa-installing"></a>

本节介绍如何安装您上传到 Amazon S3 存储桶的自定义插件，方法是指定 plugins.zip 文件的路径，并在每次更新 zip 文件时指定 plugins.zip 文件的版本。

### 在 Amazon MWAA 控制台上指定 `plugins.zip` 的路径（第一次）
<a name="configuring-dag-plugins-mwaa-first"></a>

如果这是您首次将 `plugins.zip` 上传到 Amazon S3 存储桶，则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 选择**编辑**。

1. 在 **Amazon S3 中的 DAG 代码**窗格上，选择**要求文件 - 可选**字段旁边的**浏览 S3**。

1. 选择 Amazon S3 存储桶中的 `plugins.zip` 文件。

1. 选择**选择**。

1. 选择**下一步**、**更新环境**。

### 在 Amazon MWAA 控制台上指定 `plugins.zip` 的版本
<a name="configuring-dag-plugins-s3-mwaaconsole"></a>

每次在 Amazon S3 存储桶中上传 `plugins.zip` 的新版本时，都需要在 Amazon MWAA 控制台上指定 `plugins.zip` 文件的版本。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 选择**编辑**。

1. 在 **Amazon S3 中的 DAG 代码**窗格中，从下拉列表中选择 `plugins.zip` 的版本。

1. 选择**下一步**。

## plugins.zip 的用例示例
<a name="configuring-dag-plugins-examples"></a>
+ 要了解如何创建自定义插件，请参阅 [使用 Apache Hive 和 Hadoop 的自定义插件](samples-hive.md)。
+ 要了解如何创建自定义插件，请参阅 [修补 PythonVirtualenvOperator 的自定义插件 ](samples-virtualenv.md)。
+ 要了解如何创建自定义插件，请参阅 [使用 Oracle 定制插件](samples-oracle.md)。
+ 要了解如何创建自定义插件，请参阅 [在 Amazon MWAA 上更改 DAG 的时区](samples-plugins-timezone.md)。

## 接下来做什么？
<a name="configuring-dag-plugins-next-up"></a>

使用 GitHub 上的 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 在本地测试 DAG、自定义插件和 Python 依赖项。

# 安装 Python 依赖项
<a name="working-dags-dependencies"></a>

Python 依赖项是指在 Amazon MWAA 环境中的 Apache Airflow 版本的 Apache Airflow 基础版安装中未包含的任何程序包或发行版。本主题介绍使用 Amazon S3 存储桶中的 `requirements.txt` 文件在 Amazon MWAA 环境中安装 Apache Airflow Python 依赖项的步骤。

**Contents**
+ [

## 先决条件
](#working-dags-dependencies-prereqs)
+ [

## 工作原理
](#working-dags-dependencies-how)
+ [

## Python 依赖项概述
](#working-dags-dependencies-overview)
  + [

### Python 依赖项位置和大小限制
](#working-dags-dependencies-quota)
+ [

## 创建 requirements.txt 文件
](#working-dags-dependencies-test-create)
  + [

### 步骤 1：使用 Amazon MWAA CLI 实用工具测试 Python 依赖项
](#working-dags-dependencies-cli-utility)
  + [

### 步骤 2：创建 `requirements.txt`
](#working-dags-dependencies-syntax-create)
+ [

## 上传 `requirements.txt` 到 Amazon S3
](#configuring-dag-dependencies-upload)
  + [

### 使用 AWS CLI
](#configuring-dag-dependencies-upload-cli)
  + [

### 使用 Amazon S3 控制台
](#configuring-dag-dependencies-upload-console)
+ [

## 在环境中安装 Python 依赖项
](#configuring-dag-dependencies-installing)
  + [

### 在 Amazon MWAA 控制台上指定 `requirements.txt` 的路径（第一次）
](#configuring-dag-dependencies-first)
  + [

### 在 Amazon MWAA 控制台上指定 `requirements.txt` 的版本
](#working-dags-dependencies-mwaaconsole-version)
+ [

## 访问 `requirements.txt` 的日志
](#working-dags-dependencies-logs)
+ [

## 接下来做什么？
](#working-dags-dependencies-next-up)

## 先决条件
<a name="working-dags-dependencies-prereqs"></a>

在完成本页上的步骤之前，您需要具备以下条件。
+ **权限** — 您的管理员 AWS 账户 必须已授予您访问您环境的 [Amazon MWAAFull ConsoleAccess](access-policies.md#console-full-access) 访问控制策略的权限。此外，您的[执行角色](mwaa-create-role.md)必须允许您的 Amazon MWAA 环境访问您的环境所使用的 AWS 资源。
+ **访问权限** — 如果您需要访问公共存储库以便直接在 Web 服务器上安装依赖项，则必须将环境配置为具有**公共网络** Web 服务器访问权限。有关更多信息，请参阅[Apache Airflow 访问模式](configuring-networking.md)。
+ **Amazon S3 配置** — 用于[存储您的 DAGs自定义插件和 Python 依赖项的 Amazon S3 存储桶](mwaa-s3-bucket.md)`requirements.txt`必须配置为已*阻止公共访问*和*启用版本控制*。`plugins.zip`

## 工作原理
<a name="working-dags-dependencies-how"></a>

在 Amazon MWAA 上，您可以安装所有 Python 依赖项，方法是将 `requirements.txt` 文件上传到 Amazon S3 存储桶，然后在每次更新文件时在 Amazon MWAA 控制台上指定该文件的版本。Amazon MWAA 运行 `pip3 install -r requirements.txt`，以在 Apache Airflow 计划程序和每个工作线程上安装 Python 依赖项。

要在环境中运行 Python 依赖项，您必须做三件事：

1. 在本地创建 `requirements.txt` 文件。

1. 将本地 `requirements.txt` 上传到 Amazon S3 中的存储桶。

1. 在 Amazon MWAA 控制台上的**要求文件**字段中指定此文件的版本。

**注意**  
如果这是您首次创建 `requirements.txt` 并将其上传到 Amazon S3 存储桶，则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。

## Python 依赖项概述
<a name="working-dags-dependencies-overview"></a>

你可以从 Python 包索引 PyPi (.org)、Python wheels () 或 Python whe `.whl` els () 或环境中兼容私有的 /Pep-503 存储库上 PyPi托管的 Python 依赖项中安装 Apache Airflow 额外内容和其他 Python 依赖项。

### Python 依赖项位置和大小限制
<a name="working-dags-dependencies-quota"></a>

Apache Airflow 计划程序和工作线程会在 `requirements.txt` 文件中搜索软件包，然后在环境中将该软件包安装到 `/usr/local/airflow/.local/bin` 位置。
+ **大小限制**。我们建议使用 `requirements.txt` 文件，以引用组合大小小于 1 GB 的库。Amazon MWAA 需要安装的库越多，环境上的*启动*时间就越长。尽管 Amazon MWAA 没有明确限制安装的库的大小，但如果无法在十分钟内安装依赖项，Fargate 服务将超时并尝试将环境回滚到稳定状态。

## 创建 requirements.txt 文件
<a name="working-dags-dependencies-test-create"></a>

以下步骤描述了在本地创建 plugins.zip 文件时我们建议的步骤。

### 步骤 1：使用 Amazon MWAA CLI 实用工具测试 Python 依赖项
<a name="working-dags-dependencies-cli-utility"></a>
+ 命令行界面 (CLI) 实用工具可在本地复制 Amazon MWAA 环境。
+ CLI 在本地构建 Docker 容器镜像，类似于 Amazon MWAA 生产镜像。在部署到 Amazon MWAA 之前，您可以使用它来运行本地 Apache Airflow 环境来开发和 DAGs测试自定义插件和依赖项。
+ 要运行 CLI，请参阅[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)上的 GitHub。

### 步骤 2：创建 `requirements.txt`
<a name="working-dags-dependencies-syntax-create"></a>

下一节介绍如何在 `requirements.txt` 文件中指定 [Python 程序包索引](https://pypi.org/)中的 Python 依赖项。

------
#### [ Apache Airflow v3 ]

1. **本地测试**。在创建 `requirements.txt` 文件之前，以迭代方式添加其他库以找到程序包及其版本的正确组合。要运行 Amazon MWAA CLI 实用程序，请参阅上的。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) GitHub

1. **查看 Apache Airflow 程序包的 Extras**。要访问亚马逊 MWAA 上为 Apache Airflow v3 安装的软件包列表，请参阅网站。[aws-mwaa-docker-images `requirements.txt`](https://github.com/aws/amazon-mwaa-docker-images/blob/main/requirements.txt) GitHub 

1. **添加约束语句**。在文件顶部添加 Apache Airflow v3 环境的约束文件。`requirements.txt`Apache Airflow 约束文件指定了 Apache Airflow 发布时可用的提供程序版本。

    在以下示例中，用环境的版本号替换 *\$1environment-version\$1* 以及用与环境兼容的 Python 版本替换 *\$1Python-version\$1*。

   有关与 Apache Airflow 环境兼容的 Python 版本的信息，请参阅 [Apache Airflow 版本](airflow-versions.md#airflow-versions-official)。

   ```
   --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
   ```

    如果约束文件确定该 `xyz==1.0` 程序包与环境中的其他程序包不兼容，则 `pip3 install` 将无法阻止不兼容的库安装到环境中。如果任何软件包的安装失败，则可以在日志的相应日志流中访问每个 Apache Airflow 组件（调度程序、工作程序和 Web 服务器）的错误日志。 CloudWatch 有关日志类型的更多信息，请参阅 [访问 Amazon 中的 Airflow 日志 CloudWatch](monitoring-airflow.md)。

1. **Apache Airflow 程序包**。添加[程序包 Extras](http://airflow.apache.org/docs/apache-airflow/2.5.1/extra-packages-ref.html)及其版本 (`==`)。这有助于防止在环境中安装同名但版本不同的程序包。

   ```
   apache-airflow[package-extra]==2.5.1
   ```

1. **Python 库**。在 `requirements.txt` 文件中添加程序包名称和版本 (`==`)。这有助于防止自动应用来自 [PyPi.org](https://pypi.org) 的 future 更新。

   ```
   library == version
   ```  
**Example boto3 和 psycopg2-binary**  

   此示例仅用于演示目的。boto 和 psycopg2-binary 库包含在 Apache Airflow v3 基础版安装中，无需在 `requirements.txt` 文件中指定。

   ```
   boto3==1.17.54
   boto==2.49.0
   botocore==1.20.54
   psycopg2-binary==2.8.6
   ```

   [如果指定的包没有版本，则 Amazon MWAA 会安装.org 中最新版本的PyPi软件包。](https://pypi.org)此版本可能与您 `requirements.txt` 中的其他程序包冲突。

------
#### [ Apache Airflow v2 ]

1. **本地测试**。在创建 `requirements.txt` 文件之前，以迭代方式添加其他库以找到程序包及其版本的正确组合。要运行 Amazon MWAA CLI 实用程序，请参阅上的。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) GitHub

1. **查看 Apache Airflow 程序包的 Extras**。要访问亚马逊 MWAA 上为 Apache Airflow v2 安装的软件包列表，请访问网站。[aws-mwaa-docker-images `requirements.txt`](https://github.com/aws/amazon-mwaa-docker-images/blob/main/requirements.txt) GitHub 

1. **添加约束语句**。在 `requirements.txt` 文件顶部添加 Apache Airflow v2 环境的约束文件。Apache Airflow 约束文件指定了 Apache Airflow 发布时可用的提供程序版本。

    从 Apache Airflow v2.7.2 开始，要求文件必须包含一条 `--constraint` 语句。如果您未提供约束条件，Amazon MWAA 将为您指定一个约束条件，以确保您的要求中列出的程序包与您正在使用的 Apache Airflow 版本兼容。

   在以下示例中，用环境的版本号替换 *\$1environment-version\$1* 以及用与环境兼容的 Python 版本替换 *\$1Python-version\$1*。

   有关与 Apache Airflow 环境兼容的 Python 版本的信息，请参阅 [Apache Airflow 版本](airflow-versions.md#airflow-versions-official)。

   ```
   --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
   ```

   如果约束文件确定该 `xyz==1.0` 程序包与环境中的其他程序包不兼容，则 `pip3 install` 将无法阻止不兼容的库安装到环境中。如果任何软件包的安装失败，则可以在日志的相应日志流中访问每个 Apache Airflow 组件（调度程序、工作程序和 Web 服务器）的错误日志。CloudWatch 有关日志类型的更多信息，请参阅 [访问 Amazon 中的 Airflow 日志 CloudWatch](monitoring-airflow.md)。

1. **Apache Airflow 程序包**。添加[程序包 Extras](http://airflow.apache.org/docs/apache-airflow/2.5.1/extra-packages-ref.html)及其版本 (`==`)。这有助于防止在环境中安装同名但版本不同的程序包。

   ```
   apache-airflow[package-extra]==2.5.1
   ```

1. **Python 库**。在 `requirements.txt` 文件中添加程序包名称和版本 (`==`)。这有助于防止自动应用来自 [PyPi.org](https://pypi.org) 的 future 更新。

   ```
   library == version
   ```  
**Example boto3 和 psycopg2-binary**  

   此示例仅用于演示目的。boto 和 psycopg2-binary 库包含在 Apache Airflow v2 基础版安装中，无需在 `requirements.txt` 文件中指定。

   ```
   boto3==1.17.54
   boto==2.49.0
   botocore==1.20.54
   psycopg2-binary==2.8.6
   ```

   [如果指定的包没有版本，则 Amazon MWAA 会安装.org 中最新版本的PyPi软件包。](https://pypi.org)此版本可能与您 `requirements.txt` 中的其他程序包冲突。

------

## 上传 `requirements.txt` 到 Amazon S3
<a name="configuring-dag-dependencies-upload"></a>

您可以使用 Amazon S3 控制台或 AWS Command Line Interface (AWS CLI) 将`requirements.txt`文件上传到您的 Amazon S3 存储桶。

### 使用 AWS CLI
<a name="configuring-dag-dependencies-upload-cli"></a>

 AWS Command Line Interface (AWS CLI) 是一个开源工具，您可以使用命令行 shell 中的命令与 AWS 服务进行交互。要完成本节中的步骤，您需要以下满足以下条件：
+ [AWS CLI — 安装版本 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。
+ [AWS CLI — 使用快速配置`aws configure`](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html)。

**要使用上传 AWS CLI**

1. 以下示例列出所有 Amazon S3 存储桶。

   ```
   aws s3 ls
   ```

1. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

   ```
   aws s3 ls s3://YOUR_S3_BUCKET_NAME
   ```

1. 以下命令将 `requirements.txt` 文件上传到 Amazon S3 存储桶。

   ```
   aws s3 cp requirements.txt s3://amzn-s3-demo-bucket/requirements.txt
   ```

### 使用 Amazon S3 控制台
<a name="configuring-dag-dependencies-upload-console"></a>

Amazon S3 控制台是一个基于 Web 的用户界面，可用来创建和管理 Amazon S3 存储桶中的资源。

**要使用 Amazon S3 控制台上传，请执行以下操作**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在 **S3 中的 DAG 代码**窗格中选择 **S3 存储桶**链接，在控制台上打开存储桶。

1. 选择**上传**。

1. 选择 **添加文件**。

1. 选择 `requirements.txt` 的本地副本，选择**上传**。

## 在环境中安装 Python 依赖项
<a name="configuring-dag-dependencies-installing"></a>

本节介绍如何安装您上传到 Amazon S3 存储桶的依赖项，方法是指定 requirements.txt 文件的路径，并在每次更新时指定 requirements.txt 文件的版本。

### 在 Amazon MWAA 控制台上指定 `requirements.txt` 的路径（第一次）
<a name="configuring-dag-dependencies-first"></a>

如果这是您首次创建 `requirements.txt` 并将其上传到 Amazon S3 存储桶，则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 选择**编辑**。

1. 在 **Amazon S3 中的 DAG 代码**窗格上，选择**要求文件 - 可选**字段旁边的**浏览 S3**。

1. 选择 Amazon S3 存储桶中的 `requirements.txt` 文件。

1. 选择**选择**。

1. 选择**下一步**、**更新环境**。

您可以在环境完成更新后立即开始使用新程序包。

### 在 Amazon MWAA 控制台上指定 `requirements.txt` 的版本
<a name="working-dags-dependencies-mwaaconsole-version"></a>

每次在 Amazon S3 存储桶中上传 `requirements.txt` 的新版本时，都需要在 Amazon MWAA 控制台上指定 `requirements.txt` 文件的版本。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 选择**编辑**。

1. 在 **Amazon S3 中的 DAG 代码**窗格中，从下拉列表中选择 `requirements.txt` 的版本。

1. 选择**下一步**、**更新环境**。

您可以在环境完成更新后立即开始使用新程序包。

## 访问 `requirements.txt` 的日志
<a name="working-dags-dependencies-logs"></a>

您可以查看调度工作流程并解析 `dags` 文件夹的计划程序的 Apache Airflow 日志。以下步骤介绍如何在 Amazon MWAA 控制台上打开计划程序的日志组，以及如何在日志控制台上访问 Apache Airflow 日志。 CloudWatch 

**访问 `requirements.txt` 的日志**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在**监控**窗格上选择 **Airflow 计划程序日志组**。

1. 在**日志流**中选择 `requirements_install_ip` 日志。

1. 请参阅 `/usr/local/airflow/.local/bin` 上环境中安装的程序包列表。例如：

   ```
   Collecting appdirs==1.4.4 (from -r /usr/local/airflow/.local/bin (line 1))
   Downloading https://files.pythonhosted.org/packages/3b/00/2344469e2084fb28kjdsfiuyweb47389789vxbmnbjhsdgf5463acd6cf5e3db69324/appdirs-1.4.4-py2.py3-none-any.whl  
   Collecting astroid==2.4.2 (from -r /usr/local/airflow/.local/bin (line 2))
   ```

1. 查看程序包列表以及其中任何程序包在安装过程中是否遇到错误。如果出现问题，您可能会看到类似以下的错误：

   ```
   2021-03-05T14:34:42.731-07:00
   No matching distribution found for LibraryName==1.0.0 (from -r /usr/local/airflow/.local/bin (line 4))
   No matching distribution found for LibraryName==1.0.0 (from -r /usr/local/airflow/.local/bin (line 4))
   ```

## 接下来做什么？
<a name="working-dags-dependencies-next-up"></a>

使用[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)在本地测试你的 DAGs自定义插件和 Python 依赖关系 GitHub。

# 删除 Amazon S3 上的文件
<a name="working-dags-delete"></a>

本页介绍如何在 Amazon MWAA 环境的 Amazon S3 存储桶中进行版本控制，以及删除 DAG、`plugins.zip` 或 `requirements.txt` 文件的步骤。

**Contents**
+ [

## 先决条件
](#working-dags-delete-prereqs)
+ [

## 版本控制概述
](#working-dags-delete-overview)
+ [

## 工作方式
](#working-dags-delete-how)
+ [

## 删除 Amazon S3 上的 DAG
](#working-dags-s3-dag-delete)
+ [

## 从环境中移除 “当前”的 requirements.txt 或 plugins.zip
](#working-dags-s3-delete-version-c)
+ [

## 删除“非当前”（之前）的 requirements.txt 或 plugins.zip 版本
](#working-dags-s3-delete-version-p)
+ [

## 使用生命周期删除所有“非当前”（先前）版本并自动删除标记
](#working-dags-s3-delete-lifecycle)
+ [

## 删除 requirements.txt 所有“非当前”版本并自动删除标记的生命周期策略示例
](#working-dags-s3-delete-lifecycle-ex)
+ [

## 接下来做什么？
](#working-dags-s3-delete-next-up)

## 先决条件
<a name="working-dags-delete-prereqs"></a>

在完成本页上的步骤之前，您需要具备以下条件。
+ **权限** — 您的 AWS 账户 必须已获得管理员授权，访问适用于环境的 [AmazonMWAAFullConsoleAccess](access-policies.md#console-full-access) 访问控制策略。此外，[执行角色](mwaa-create-role.md)必须允许 Amazon MWAA 环境访问环境所使用的 AWS 资源。
+ **访问权限** — 如果您需要访问公共存储库以便直接在 Web 服务器上安装依赖项，则必须将环境配置为具有**公共网络** Web 服务器访问权限。有关更多信息，请参阅[Apache Airflow 访问模式](configuring-networking.md)。
+ **Amazon S3 配置** — 用于存储 DAG 的 [Amazon S3 存储桶](mwaa-s3-bucket.md)、在 `plugins.zip` 中的自定义插件和在 `requirements.txt` 中的 Python 依赖项必须配置为*已阻止公共访问*和*已启用版本控制*。

## 版本控制概述
<a name="working-dags-delete-overview"></a>

您 Amazon S3 存储桶中的 `plugins.zip` 和 `requirements.txt` 已进行版本控制。当对象启用 Amazon S3 存储桶版本控制并且从 Amazon S3 存储桶中删除构件（例如 plugins.zip）时，该文件不会被完全删除。每当在 Amazon S3 上删除构件时，都会创建该文件的新副本，该副本是 404（未找到对象）错误/“`I'm not here`”的 0k 文件。Amazon S3 称此为*删除标记*。与任何其他对象一样，删除标记是带有键名（或键）和版本 ID 的“空”文本。

我们建议定期删除文件版本并删除标记，以降低 Amazon S3 存储桶的存储成本。要完全删除“非当前”（以前的）文件版本，必须删除（这些）文件的所有版本，然后删除该版本的*删除标记*。

## 工作方式
<a name="working-dags-delete-how"></a>

Amazon MWAA 每三十秒钟对 Amazon S3 存储桶运行一次同步操作。这会导致 Amazon S3 存储桶中删除的任何 DAG 同步到 Fargate 容器的 Airflow 镜像。

只有当 Amazon MWAA 使用自定义插件和 Python 依赖项为 Fargate 容器构建新的 Airflow 映像时，在环境更新之后，`plugins.zip` 和 `requirements.txt` 文件才会发生更改。如果您删除了 `requirements.txt` 或 `plugins.zip` 文件的*当前*版本，然后更新环境但不为已删除文件提供新版本，则更新将失败，并显示一条错误消息，例如“`Unable to read version {version number} of file {file name}`”。

## 删除 Amazon S3 上的 DAG
<a name="working-dags-s3-dag-delete"></a>

DAG 文件 (`.py`) 没有版本控制，可以直接在 Amazon S3 控制台上删除。以下步骤描述如何删除 Amazon S3 桶中的 DAG。

**要删除 DAG，请执行以下操作**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在 **S3 中的 DAG 代码**窗格中选择 **S3 存储桶**链接，在控制台上打开存储桶。

1. 选择 `dags` 文件夹。

1. 选择 DAG，点击**删除**。

1. 在**删除对象？**下 ，键入 `delete`。

1. 选择**删除对象**。

**注意**  
Apache Airflow 保留了历史上的 DAG 运行。在 Apache Airflow 中运行 DAG 后，无论文件状态如何，它都会保留在 Apache Airflow 列表中，直到您在 Apache Airflow 中将其删除。要删除 Apache Airflow 中的 DAG，请选择**链接**列中的红色“删除”按钮。

## 从环境中移除 “当前”的 requirements.txt 或 plugins.zip
<a name="working-dags-s3-delete-version-c"></a>

目前，没有办法在添加 plugins.zip 或 requirements.txt 后将其从环境中删除，但我们正在努力解决这个问题。在此期间，变通方法是分别指向空文本或 zip 文件。

## 删除“非当前”（之前）的 requirements.txt 或 plugins.zip 版本
<a name="working-dags-s3-delete-version-p"></a>

在 Amazon MWAA 上，Amazon S3 存储桶中的 `requirements.txt` 和 `plugins.zip` 文件受到版本控制。如果您想完全删除 Amazon S3 存储桶中的这些文件，则必须检索对象（例如 plugins.zip）的当前版本 (121212)，删除该版本，然后移除文件版本的*删除标记*。

您也可以在 Amazon S3 控制台上删除所有“非当前”（先前）文件版本；但是，您仍需要使用以下选项之一删除*删除标记*。
+ 要检索对象版本，请参阅*《Amazon S3 指南》中的*[从启用版本控制的存储桶检索对象版本](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RetrievingObjectVersions.html)。
+ 要删除对象版本，请参阅*《Amazon S3 指南》中的*[从启用版本控制的存储桶删除对象版本](https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeletingObjectVersions.html)。
+ 要移除删除标记，请参阅*《Amazon S3 指南》中的*[管理删除标记](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ManagingDelMarkers.html)。

## 使用生命周期删除所有“非当前”（先前）版本并自动删除标记
<a name="working-dags-s3-delete-lifecycle"></a>

您可以为 Amazon S3 存储桶配置生命周期策略，以便在一定天数后删除 Amazon S3 存储桶中的 plugins.zip 和 requirements.txt 文件的所有“非当前”（先前）版本，或者移除过期对象的删除标记。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在 **Amazon S3 的 DAG 代码**下，选择 Amazon S3 存储桶。

1. 选择**创建生命周期规则**。

## 删除 requirements.txt 所有“非当前”版本并自动删除标记的生命周期策略示例
<a name="working-dags-s3-delete-lifecycle-ex"></a>

使用以下示例创建生命周期规则，该规则将在三十天后永久删除 requirements.txt 文件的“非当前”版本及其删除标记。

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 选择环境。

1. 在 **Amazon S3 的 DAG 代码**下，选择 Amazon S3 存储桶。

1. 选择**创建生命周期规则**。

1. 在**生命周期规则名称**中，键入 `Delete previous requirements.txt versions and delete markers after thirty days`。

1. 在**前缀**中，选择**要求**。

1. 在**生命周期规则操作**中，选择**永久删除对象的所有先前版本**和**删除过期的删除标记或未完成的分段上传**。

1. 在**对象变为先前版本后的天数**中，键入 `30`。

1. 在**过期对象删除标记**中，选择**删除过期对象删除标记，对象将在 30 天后永久删除**。

## 接下来做什么？
<a name="working-dags-s3-delete-next-up"></a>
+ 要详细了解 Amazon S3 删除标记，请参阅[管理删除标记](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-lifecycle.html)。
+ 要了解有关 Amazon S3 生命周期的更多信息，请参阅[过期对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html)。