

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 `SSHOperator` 创建 SSH 连接
<a name="samples-ssh"></a>

以下示例介绍如何使用有向无环图（DAG）中的 `SSHOperator` 从 Amazon MWAA 环境连接到远程 Amazon EC2 实例。您可以使用类似的方法连接到任何具有 SSH 访问权限的远程实例。

在以下示例中，您将 SSH 密钥 (`.pem`) 上传到在 Amazon S3 上的环境的 `dags` 目录中。然后，您可以使用 `requirements.txt` 安装必要的依赖项，并在 UI 中创建新的 Apache Airflow 连接。最后，您编写一个 DAG 来创建与远程实例的 SSH 连接。

**Topics**
+ [版本](#samples-ssh-version)
+ [先决条件](#samples-ssh-prereqs)
+ [权限](#samples-ssh-permissions)
+ [要求](#samples-ssh-dependencies)
+ [将密钥复制到 Amazon S3](#samples-ssh-secret)
+ [创建新的 Apache Airflow 连接](#samples-ssh-connection)
+ [代码示例](#samples-ssh-code)

## 版本
<a name="samples-ssh-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-ssh-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境](get-started.md)。
+ SSH 密钥。该代码示例假设您有 Amazon EC2 实例，并且与 Amazon MWAA 环境位于同一区域的 `.pem`。如果您没有密钥，请参阅*《Amazon EC2 用户指南》*中的[创建或导入密钥对](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)。

## 权限
<a name="samples-ssh-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 要求
<a name="samples-ssh-dependencies"></a>

将以下参数添加到 `requirements.txt`，以将 `apache-airflow-providers-ssh` 程序包安装到 Web 服务器上。当环境更新并且 Amazon MWAA 成功安装依赖项后，您将在 UI 中获得新的 **SSH** 连接类型。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**注意**  
`-c` 定义了 `requirements.txt` 中的约束 URL。这样可以确保 Amazon MWAA 为环境安装了正确的程序包版本。

## 将密钥复制到 Amazon S3
<a name="samples-ssh-secret"></a>

使用以下 AWS Command Line Interface 命令将 `.pem` 密钥复制到 Amazon S3 中的环境 `dags` 目录中。

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA 将包括 `.pem` 密钥在内的 `dags` 中的内容复制到本地 `/usr/local/airflow/dags/` 目录，这样，Apache Airflow 就可以访问密钥了。

## 创建新的 Apache Airflow 连接
<a name="samples-ssh-connection"></a>

**使用 Apache Airflow UI 创建新的 SSH 连接**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 从环境列表中，为环境选择**打开 Airflow UI**。

1. 在 Apache Airflow UI 页面上，从主导航栏中选择**管理员**以展开下拉列表，然后选择**连接**。

1. 在**列出连接**页面上，选择 **\$1** 或**添加新记录**按钮以添加新连接。

1. 在**连接到 AD** 页面上，提供以下信息：

   1. 在**连接名称**中，输入 **ssh\$1new**。

   1. 在**连接类型**中，从下拉列表中选择 **SSH**。
**注意**  
如果列表中没有 **SSH** 连接类型，则表示 Amazon MWAA 尚未安装所需的 `apache-airflow-providers-ssh` 程序包。请更新 `requirements.txt` 文件以包含此程序包，然后重试。

   1. 在**主机**中，请输入要连接的 Amazon EC2 实例的 IP 地址。例如 **12.345.67.89**。

   1. 在**用户名**中，请输入 **ec2-user**，以检查您是否正在连接到 Amazon EC2 实例。用户名可能会有所不同，具体取决于您希望 Apache Airflow 连接到的远程实例的类型。

   1. 在**附加**中，请以 JSON 格式输入以下键/值对：

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      此键值对指示 Apache Airflow 在本地 `/dags` 目录中搜索密钥。

## 代码示例
<a name="samples-ssh-code"></a>

以下 DAG 使用 `SSHOperator` 连接到目标 Amazon EC2 实例，然后运行 `hostname` Linux 命令来打印该实例的名称。您可以修改 DAG 以在远程实例上运行任何命令或脚本。

1. 打开终端，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `ssh.py`。

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow UI 触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您将在 `ssh_operator_example` DAG 的任务日志中看到输出，类似于 `ssh_task` 的任务日志中的以下内容：

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```