

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 开始使用 MSK Connect
<a name="msk-connect-getting-started"></a>

本 step-by-step教程使用创建 MSK 集群和用于将数据从集群发送到 S3 存储桶的接收器连接器。 AWS 管理控制台 

**Topics**
+ [设置 MSK Connect 所需的资源](mkc-tutorial-setup.md)
+ [创建自定义插件](mkc-create-plugin.md)
+ [创建客户端计算机和 Apache Kafka 主题](mkc-create-topic.md)
+ [创建连接器](mkc-create-connector.md)
+ [向 MSK 集群发送数据](mkc-send-data.md)

# 设置 MSK Connect 所需的资源
<a name="mkc-tutorial-setup"></a>

在此步骤中，您需创建此入门场景所需的以下资源：
+ 一个 Amazon S3 存储桶，用作从连接器接收数据的目的地。
+ 一个 MSK 集群，您将向其发送数据。然后，连接器将从此集群读取数据并将其发送到目标 S3 存储桶。
+ 一项 IAM 策略，包含写入目标 S3 存储桶的权限。
+ 一个 IAM 角色，允许连接器写入目标 S3 存储桶。您将所创建的 IAM 策略添加至此角色。
+ 一个 Amazon VPC 端点，可以将数据从具有集群和连接器的 Amazon VPC 发送到 Amazon S3。

**创建 S3 存储桶**

1. 登录 AWS 管理控制台 并打开 Amazon S3 控制台，网址为[https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择 **创建存储桶 **。

1. 对于存储桶名称，输入一个描述性名称，例如 `amzn-s3-demo-bucket-mkc-tutorial`。

1. 向下滚动并选择**创建存储桶**。

1. 在存储桶列表中，选择您新创建的存储桶。

1. 请选择 **Create folder**（创建文件夹）。

1. 输入 `tutorial` 作为文件夹的名称，然后向下滚动并选择**创建文件夹**。

**创建集群**

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧窗格的 **MSK 集群**下，选择**集群**。

1. 选择**创建集群**。

1. 对于**创建方法**，请选择**自定义创建**。

1. 对于集群名称，请输入 **mkc-tutorial-cluster**。

1. 在**集群类型**中，选择**已预置**。

1. 选择**下一步**。

1. 在**网络**下，选择“Amazon VPC”。然后选择想要使用的可用区和子网。请记住您选择的 Amazon VPC 和子网，因为您将在本教程的后面部分中使用它们。 IDs 

1. 选择**下一步**。

1. 在**访问控制方法**下，确保仅选择**未经身份验证的访问**。

1. 在**加密**下，确保仅选择**明文**。

1. 继续执行向导，然后选择**创建集群**。这会将您引导至该集群的“详细信息”页面。在该页面的**已应用的安全组**下，找到安全组 ID。记住该 ID，因为您将在本教程的后面部分需要它。

**创建有权写入 S3 存储桶的 IAM 策略**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在导航窗格中，选择**策略**。

1. 选择**创建策略**。

1. 在**策略编辑器**中，选择 **JSON**，然后将编辑器窗口中的 JSON 替换为以下 JSON。

   在以下示例中，*<amzn-s3-demo-bucket-my-tutorial>*替换为您的 S3 存储桶的名称。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   有关如何写入安全策略的说明，请参阅 [IAM 访问控制](iam-access-control.md)。

1. 选择**下一步**。

1. 在**查看和创建**页面中，请执行以下操作：

   1. 对于**策略名称**，输入一个描述性名称，例如 **mkc-tutorial-policy**。

   1. **在此策略中定义的权限**中，查看 and/or 编辑策略中定义的权限。

   1. （可选）为了帮助识别、组织或搜索策略，请选择**添加新标签**以键值对形式添加标签。例如，使用 **Environment** 和 **Test** 的键值对向策略添加标签。

      有关使用标签的更多信息，请参阅 *IAM 用户指南*中的[AWS Identity and Access Management 资源标签](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 选择**创建策略**。

**创建可以写入目标存储桶的 IAM 角色**

1. 在 IAM 控制台的导航窗格中，选择**角色**，然后选择**创建角色**。

1. 在**选择受信任的实体**页面上，请执行以下操作：

   1. 对于 **Trusted entity type**（可信实体类型），选择 **AWS 服务**。

   1. 对于**服务或使用案例**，选择 **S3**。

   1. 在**使用案例**下，选择 **S3**。

1. 选择**下一步**。

1. 在 **Add permissions**（添加权限）页面上，请执行以下操作：

   1. 在搜索框的**权限策略**下，输入您之前为本教程创建的策略的名称。例如 **mkc-tutorial-policy**。然后，选中策略名称左侧的框。

   1. （可选）设置[权限边界](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html)。这是一项高级功能，可用于服务角色，但不可用于服务相关角色。有关设置权限边界的信息，请参阅《IAM 用户指南》**中的[创建角色和附加策略（控制台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html)。

1. 选择**下一步**。

1. 在 **Name, review, and create**（命名、查看和创建）页面中，请执行以下操作：

   1. 对于**角色名称**，输入一个描述性名称，例如 **mkc-tutorial-role**。
**重要**  
命名角色时，请注意以下事项：  
角色名称在您内部必须是唯一的 AWS 账户，并且不能因大小写而变得唯一。  
例如，不要同时创建名为 **PRODROLE** 和 **prodrole** 的角色。当角色名称在策略中使用或者作为 ARN 的一部分时，角色名称区分大小写，但是当角色名称在控制台中向客户显示时（例如，在登录期间），角色名称不区分大小写。
创建角色后，您无法编辑该角色的名称，因为其他实体可能会引用该角色。

   1. （可选）对于**描述**，输入角色的描述。

   1. （可选）要编辑角色的使用案例和权限，请在**步骤 1：选择可信实体**或**步骤 2：添加权限**部分中选择**编辑**。

   1. （可选）为了帮助识别、组织或搜索角色，请选择**添加新标签**以键值对形式添加标签。例如，使用 **ProductManager** 和 **John** 的键值对向角色添加标签。

      有关使用标签的更多信息，请参阅 *IAM 用户指南*中的[AWS Identity and Access Management 资源标签](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 检查该角色，然后选择**创建角色**。

**允许 MSK Connect 代入该角色**

1. 在 IAM 控制台的左侧窗格中，在**访问管理**下，选择**角色**。

1. 找到 `mkc-tutorial-role` 并将其选中。

1. 在角色的**摘要**下，选择**信任关系**选项卡。

1. 选择**编辑信任关系**。

1. 将现有信任策略替换为以下 JSON。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. 选择**更新信任策略**。

**创建从集群的 VPC 到 Amazon S3 的 Amazon VPC 端点**

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 在左侧窗格中，选择**端点**。

1. 选择**创建端点**。

1. 在**服务名称**下，选择 **com.amazonaws.us-east-1.s3** 服务和**网关**类型。

1. 选择集群的 VPC，然后选中与集群子网关联的路由表左侧的复选框。

1. 选择**创建端点**。

**下一步**

[创建自定义插件](mkc-create-plugin.md)

# 创建自定义插件
<a name="mkc-create-plugin"></a>

插件包含定义连接器逻辑的代码。在此步骤中，您需创建一个包含 Lenses Amazon S3 接收器连接器代码的自定义插件。在后面的步骤中，当您创建 MSK 连接器时，您可以指定其代码位于此自定义插件中。您可以使用同一插件来创建多个具有不同配置的 MSK 连接器。

**创建自定义插件**

1. 下载 [S3 连接器](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)。

1. 将 ZIP 文件上传到您有权访问的 S3 存储桶。有关如何将文件上传到 Amazon S3 的信息，请参阅《Amazon S3 用户指南》中的[上传对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在左侧窗格中展开 **MSK Connect**，然后选择**自定义插件**。

1. 选择**创建自定义插件**。

1. 选择**浏览 S3**。

1. 在存储桶列表中，找到您上传 ZIP 文件的存储桶，然后选择该存储桶。

1. 在存储桶的对象列表中，选择 ZIP 文件左侧的单选按钮，然后选择标有**选择**的按钮。

1. 输入 `mkc-tutorial-plugin` 作为自定义插件名称，然后选择**创建自定义插件**。

可能需要 AWS 几分钟才能完成自定义插件的创建。创建过程完成后，您会在浏览器窗口顶部的横幅中看到以下消息。

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**下一步**

[创建客户端计算机和 Apache Kafka 主题](mkc-create-topic.md)

# 创建客户端计算机和 Apache Kafka 主题
<a name="mkc-create-topic"></a>

在此步骤中，您需创建 Amazon EC2 实例以用作 Apache Kafka 客户端实例。然后，您可以使用此实例在集群上创建主题。

**创建客户端计算机**

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 选择 **Launch instances**。

1. 输入客户端计算机的**名称**，例如 **mkc-tutorial-client**。

1. 对于**亚马逊机器映像（AMI）类型**，始终选中 **Amazon Linux 2 AMI（HVM）– 内核 5.10，SSD 卷类型**。

1. 选择 **t2.xlarge** 实例类型。

1. 在**密钥对（登录）**下，选择**创建新密钥对**。为**密钥对名称**输入 **mkc-tutorial-key-pair**，然后选择**下载密钥对**。此外，您还可使用现有密钥对。

1. 选择**启动实例**。

1. 选择**查看实例**。然后，在**安全组**列中，选择与新的实例关联的安全组。复制并保存安全组的 ID，以供稍后使用。

**允许新创建的客户端向集群发送数据**

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 在左侧窗格的**安全性**下，选择**安全组**。在**安全组 ID** 列中，找到集群的安全组。您在 [设置 MSK Connect 所需的资源](mkc-tutorial-setup.md) 中创建集群时保存了该安全组的 ID。通过选中该安全组行左侧的复选框来选择该安全组。确保没有同时选择其他安全组。

1. 在屏幕的下半部分，选择**入站规则**选项卡。

1. 选择**编辑入站规则**。

1. 在屏幕的左下角，选择**添加规则**。

1. 在新规则中，选择**类型**列中的**所有流量**。在**源**列右侧的字段中，输入客户端计算机的安全组 ID。这是您在创建客户端计算机后保存的安全组 ID。

1. 选择**保存规则**。您的 MSK 集群现在将接受来自您在上一程序中创建的客户端的所有流量。

**要创建主题，请执行以下操作**

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 在实例表中选择 `mkc-tutorial-client`。

1. 在屏幕顶部附近，选择**连接**，然后按照说明连接到实例。

1. 通过运行以下命令在客户端实例上安装 Java：

   ```
   sudo yum install java-1.8.0
   ```

1. 运行以下命令以下载 Apache Kafka。

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**注意**  
如果您希望使用此命令中使用的镜像站点之外的镜像站点，则可在 [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) 网站上选择其他镜像站点。

1. 在上一步中将 TAR 文件下载到的目录中运行以下命令。

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. 转到 **kafka\$12.12-2.2.1** 目录。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧窗格中，选择**集群**，然后选择名称 `mkc-tutorial-cluster`。

1. 选择**查看客户端信息**。

1. 复制**明文**连接字符串。

1. 选择**完成**。

1. 在客户端实例 (`mkc-tutorial-client`) 上运行以下命令，*bootstrapServerString*替换为您在查看集群的客户机信息时保存的值。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   如果此命令成功，您将看到以下消息：`Created topic mkc-tutorial-topic.`

**下一步**

[创建连接器](mkc-create-connector.md)

# 创建连接器
<a name="mkc-create-connector"></a>

此过程介绍了如何使用 AWS 管理控制台创建连接器。

**创建连接器**

1. 登录并在[https://console.aws.amazon.com/msk/家中打开 Amazon MSK 控制台？ AWS 管理控制台 region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧窗格中，展开 **MSK Connect**，然后选择**连接器**。

1. 选择 **Create connector (创建连接器)**。

1. 在插件列表中，选择 `mkc-tutorial-plugin`，然后选择**下一步**。

1. 对于连接器名称，请输入 `mkc-tutorial-connector`。

1. 在集群列表中，选择 `mkc-tutorial-cluster`。

1. 在**连接器网络设置**部分，为网络类型选择以下选项之一：
   + **IPv4**（默认）- IPv4 仅用于通过连接目的地
   + **双栈**-用于通过 IPv4 和连接到目的地 IPv6 （仅当您的子网具有 IPv4 IPv6 CIDR 块与之关联时才可用）

1. 复制以下配置，并将其粘贴到连接器配置字段中。

   确保将区域替换为创建连接器 AWS 区域 所在位置的代码。此外，在以下示例中，将 Amazon S3 存储桶名称替换为存储桶的名称。*<amzn-s3-demo-bucket-my-tutorial>*

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. 在**访问权限**下，选择 `mkc-tutorial-role`。

1. 选择**下一步**。在**安全性**页面上，再次选择**下一步**。

1. 在**日志**页面上，选择**下一步**。

1. 在**查看并创建**页面上，查看您的连接器配置，然后选择**创建连接器**。

**下一步**

[向 MSK 集群发送数据](mkc-send-data.md)

# 向 MSK 集群发送数据
<a name="mkc-send-data"></a>

在此步骤中，您将数据发送到之前创建的 Apache Kafka 主题，然后在目标 S3 存储桶中查找相同的数据。

**向 MSK 集群发送数据**

1. 在客户端实例上的 Apache Kafka 安装 `bin` 文件夹中，创建一个名为 `client.properties` 的文本文件，该文件包含以下内容。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. 运行以下命令以创建控制台生成器。*BootstrapBrokerString*替换为运行上一个命令时获得的值。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. 输入所需的任何消息，然后按 **Enter**。重复执行此步骤两次或三次。每次输入一行并按 **Enter** 时，该行会作为单独的消息发送到您的 Apache Kafka 集群。

1. 查看目标 Amazon S3 存储桶，查找您在上一步中发送的消息。