

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 了解 MSK Connect
<a name="msk-connect"></a>

MSK Connect 是 Amazon MSK 的一项功能，它让开发人员可以轻松地将数据流入和流出其 Apache Kafka 集群。MSK Connect 使用 Kafka Connect 版本 2.7.1 或 3.7.x，这是一个开源框架，用于将 Apache Kafka 集群与数据库、搜索索引和文件系统等外部系统连接起来。借助 MSK Connect，您可以部署专为 Kafka Connect 构建的完全托管的连接器，用于将数据移入亚马逊 S3 和亚马逊服务等热门数据存储或从中提取数据。 OpenSearch 您可以部署由 Debezium 等第三方开发的连接器，用于将变更日志从数据库流式传输到 Apache Kafka 集群，或者无需更改代码即可部署现有连接器。连接器会自动扩缩以适应负载变化，您只需为使用的资源付费。

使用源连接器将数据从外部系统导入到您的主题中。您可以使用接收器连接器，将主题中的数据导出到外部系统。

MSK Connect 支持任何连接到 Amazon VPC 的 Apache Kafka 集群的连接器，无论是 MSK 集群还是独立托管的 Apache Kafka 集群。

MSK Connect 持续监控连接器的运行状况和交付状态、修补和管理底层硬件，并自动扩缩连接器以适应吞吐量的变化。

要开始使用 MSK Connect，请参阅 [开始使用 MSK Connect](msk-connect-getting-started.md)。

要了解您可以使用 MSK Connect 创建的 AWS 资源，请参阅[了解连接器](msk-connect-connectors.md)[创建自定义插件](msk-connect-plugins.md)、和。[了解 MSK Connect 工作程序](msk-connect-workers.md)

有关 MSK Connect API 的信息，请参阅 [Amazon MSK Connect API Reference](https://docs.aws.amazon.com/MSKC/latest/mskc/Welcome.html)。

## 使用 Amazon MSK Connect 的好处
<a name="msk-connect-benefits"></a>

Apache Kafka 是用于提取和处理实时数据流的最广泛采用的开源流平台之一。借助 Apache Kafka，您可以分离和独立扩展数据生成和数据消费应用程序。

Kafka Connect 是使用 Apache Kafka 构建和运行流应用程序的重要组成部分。Kafka Connect 提供了一种在 Kafka 和外部系统之间移动数据的标准化方式。Kafka Connect 具有高度可扩展性，可以处理大量数据。Kafka Connect 提供了一组强大的 API 操作和工具，用于配置、部署和监控在 Kafka 主题和外部系统之间移动数据的连接器。您可以使用这些工具来自定义和扩展 Kafka Connect 的功能，以满足您的流应用程序的特定需求。

当您自行操作 Apache Kafka Connect 集群或尝试将开源 Apache Kafka Connect 应用程序迁移到 AWS时，可能会遇到挑战。这些挑战包括设置基础设施和部署应用程序所需的时间、设置自托管 Apache Kafka Connect 集群时的工程障碍以及管理运营开销。

为了应对这些挑战，我们建议使用 Amazon Managed Streaming for Apache Kafka Connect（Amazon MSK Connect）将您的开源 Apache Kafka Connect 应用程序迁移到 AWS。Amazon MSK Connect 简化了使用 Kafka Connect 在 Apache Kafka 集群和外部系统（例如数据库、搜索索引和文件系统）之间传输数据的过程。

以下是迁移到 Amazon MSK Connect 的一些好处：
+ **消除运营开销** — Amazon MSK Connect 消除了与 Apache Kafka Connect 集群的修补、预置和扩展相关的运营负担。Amazon MSK Connect 持续监控您的 Connect 集群的运行状况并自动进行修补和版本升级，而不会对您的工作负载造成任何中断。
+ **自动重启 Connect 任务** — Amazon MSK Connect 可以自动恢复失败的任务以减少生产中断。任务失败可能是由临时错误引起的，例如超出 Kafka 的 TCP 连接限制，以及新工作程序加入接收器连接器的消费者组时的任务重新平衡。
+ **自动水平和垂直扩缩** — Amazon MSK Connect 使连接器应用程序能够自动扩展以支持更高的吞吐量。Amazon MSK Connect 为您管理扩展。您只需指定自动扩缩组中的工作程序数量和利用率阈值。您可以使用 Amazon MSK Connect `UpdateConnector` API 操作在 1 到 8 v CPUs 之间垂直放大或缩小 v CPUs 以支持可变吞吐量。
+ **私有网络连接** — Amazon MSK Connect 使用私有 DNS 名称私密连接到源系统 AWS PrivateLink 和接收系统。

# 开始使用 MSK Connect
<a name="msk-connect-getting-started"></a>

本 step-by-step教程使用创建 MSK 集群和用于将数据从集群发送到 S3 存储桶的接收器连接器。 AWS 管理控制台 

**Topics**
+ [设置 MSK Connect 所需的资源](mkc-tutorial-setup.md)
+ [创建自定义插件](mkc-create-plugin.md)
+ [创建客户端计算机和 Apache Kafka 主题](mkc-create-topic.md)
+ [创建连接器](mkc-create-connector.md)
+ [向 MSK 集群发送数据](mkc-send-data.md)

# 设置 MSK Connect 所需的资源
<a name="mkc-tutorial-setup"></a>

在此步骤中，您需创建此入门场景所需的以下资源：
+ 一个 Amazon S3 存储桶，用作从连接器接收数据的目的地。
+ 一个 MSK 集群，您将向其发送数据。然后，连接器将从此集群读取数据并将其发送到目标 S3 存储桶。
+ 一项 IAM 策略，包含写入目标 S3 存储桶的权限。
+ 一个 IAM 角色，允许连接器写入目标 S3 存储桶。您将所创建的 IAM 策略添加至此角色。
+ 一个 Amazon VPC 端点，可以将数据从具有集群和连接器的 Amazon VPC 发送到 Amazon S3。

**创建 S3 存储桶**

1. 登录 AWS 管理控制台 并打开 Amazon S3 控制台，网址为[https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择 **创建存储桶 **。

1. 对于存储桶名称，输入一个描述性名称，例如 `amzn-s3-demo-bucket-mkc-tutorial`。

1. 向下滚动并选择**创建存储桶**。

1. 在存储桶列表中，选择您新创建的存储桶。

1. 请选择 **Create folder**（创建文件夹）。

1. 输入 `tutorial` 作为文件夹的名称，然后向下滚动并选择**创建文件夹**。

**创建集群**

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧窗格的 **MSK 集群**下，选择**集群**。

1. 选择**创建集群**。

1. 对于**创建方法**，请选择**自定义创建**。

1. 对于集群名称，请输入 **mkc-tutorial-cluster**。

1. 在**集群类型**中，选择**已预置**。

1. 选择**下一步**。

1. 在**网络**下，选择“Amazon VPC”。然后选择想要使用的可用区和子网。请记住您选择的 Amazon VPC 和子网，因为您将在本教程的后面部分中使用它们。 IDs 

1. 选择**下一步**。

1. 在**访问控制方法**下，确保仅选择**未经身份验证的访问**。

1. 在**加密**下，确保仅选择**明文**。

1. 继续执行向导，然后选择**创建集群**。这会将您引导至该集群的“详细信息”页面。在该页面的**已应用的安全组**下，找到安全组 ID。记住该 ID，因为您将在本教程的后面部分需要它。

**创建有权写入 S3 存储桶的 IAM 策略**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在导航窗格中，选择**策略**。

1. 选择**创建策略**。

1. 在**策略编辑器**中，选择 **JSON**，然后将编辑器窗口中的 JSON 替换为以下 JSON。

   在以下示例中，*<amzn-s3-demo-bucket-my-tutorial>*替换为您的 S3 存储桶的名称。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   有关如何写入安全策略的说明，请参阅 [IAM 访问控制](iam-access-control.md)。

1. 选择**下一步**。

1. 在**查看和创建**页面中，请执行以下操作：

   1. 对于**策略名称**，输入一个描述性名称，例如 **mkc-tutorial-policy**。

   1. **在此策略中定义的权限**中，查看 and/or 编辑策略中定义的权限。

   1. （可选）为了帮助识别、组织或搜索策略，请选择**添加新标签**以键值对形式添加标签。例如，使用 **Environment** 和 **Test** 的键值对向策略添加标签。

      有关使用标签的更多信息，请参阅 *IAM 用户指南*中的[AWS Identity and Access Management 资源标签](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 选择**创建策略**。

**创建可以写入目标存储桶的 IAM 角色**

1. 在 IAM 控制台的导航窗格中，选择**角色**，然后选择**创建角色**。

1. 在**选择受信任的实体**页面上，请执行以下操作：

   1. 对于 **Trusted entity type**（可信实体类型），选择 **AWS 服务**。

   1. 对于**服务或使用案例**，选择 **S3**。

   1. 在**使用案例**下，选择 **S3**。

1. 选择**下一步**。

1. 在 **Add permissions**（添加权限）页面上，请执行以下操作：

   1. 在搜索框的**权限策略**下，输入您之前为本教程创建的策略的名称。例如 **mkc-tutorial-policy**。然后，选中策略名称左侧的框。

   1. （可选）设置[权限边界](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html)。这是一项高级功能，可用于服务角色，但不可用于服务相关角色。有关设置权限边界的信息，请参阅《IAM 用户指南》**中的[创建角色和附加策略（控制台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html)。

1. 选择**下一步**。

1. 在 **Name, review, and create**（命名、查看和创建）页面中，请执行以下操作：

   1. 对于**角色名称**，输入一个描述性名称，例如 **mkc-tutorial-role**。
**重要**  
命名角色时，请注意以下事项：  
角色名称在您内部必须是唯一的 AWS 账户，并且不能因大小写而变得唯一。  
例如，不要同时创建名为 **PRODROLE** 和 **prodrole** 的角色。当角色名称在策略中使用或者作为 ARN 的一部分时，角色名称区分大小写，但是当角色名称在控制台中向客户显示时（例如，在登录期间），角色名称不区分大小写。
创建角色后，您无法编辑该角色的名称，因为其他实体可能会引用该角色。

   1. （可选）对于**描述**，输入角色的描述。

   1. （可选）要编辑角色的使用案例和权限，请在**步骤 1：选择可信实体**或**步骤 2：添加权限**部分中选择**编辑**。

   1. （可选）为了帮助识别、组织或搜索角色，请选择**添加新标签**以键值对形式添加标签。例如，使用 **ProductManager** 和 **John** 的键值对向角色添加标签。

      有关使用标签的更多信息，请参阅 *IAM 用户指南*中的[AWS Identity and Access Management 资源标签](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)。

1. 检查该角色，然后选择**创建角色**。

**允许 MSK Connect 代入该角色**

1. 在 IAM 控制台的左侧窗格中，在**访问管理**下，选择**角色**。

1. 找到 `mkc-tutorial-role` 并将其选中。

1. 在角色的**摘要**下，选择**信任关系**选项卡。

1. 选择**编辑信任关系**。

1. 将现有信任策略替换为以下 JSON。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. 选择**更新信任策略**。

**创建从集群的 VPC 到 Amazon S3 的 Amazon VPC 端点**

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 在左侧窗格中，选择**端点**。

1. 选择**创建端点**。

1. 在**服务名称**下，选择 **com.amazonaws.us-east-1.s3** 服务和**网关**类型。

1. 选择集群的 VPC，然后选中与集群子网关联的路由表左侧的复选框。

1. 选择**创建端点**。

**下一步**

[创建自定义插件](mkc-create-plugin.md)

# 创建自定义插件
<a name="mkc-create-plugin"></a>

插件包含定义连接器逻辑的代码。在此步骤中，您需创建一个包含 Lenses Amazon S3 接收器连接器代码的自定义插件。在后面的步骤中，当您创建 MSK 连接器时，您可以指定其代码位于此自定义插件中。您可以使用同一插件来创建多个具有不同配置的 MSK 连接器。

**创建自定义插件**

1. 下载 [S3 连接器](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)。

1. 将 ZIP 文件上传到您有权访问的 S3 存储桶。有关如何将文件上传到 Amazon S3 的信息，请参阅《Amazon S3 用户指南》中的[上传对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在左侧窗格中展开 **MSK Connect**，然后选择**自定义插件**。

1. 选择**创建自定义插件**。

1. 选择**浏览 S3**。

1. 在存储桶列表中，找到您上传 ZIP 文件的存储桶，然后选择该存储桶。

1. 在存储桶的对象列表中，选择 ZIP 文件左侧的单选按钮，然后选择标有**选择**的按钮。

1. 输入 `mkc-tutorial-plugin` 作为自定义插件名称，然后选择**创建自定义插件**。

可能需要 AWS 几分钟才能完成自定义插件的创建。创建过程完成后，您会在浏览器窗口顶部的横幅中看到以下消息。

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**下一步**

[创建客户端计算机和 Apache Kafka 主题](mkc-create-topic.md)

# 创建客户端计算机和 Apache Kafka 主题
<a name="mkc-create-topic"></a>

在此步骤中，您需创建 Amazon EC2 实例以用作 Apache Kafka 客户端实例。然后，您可以使用此实例在集群上创建主题。

**创建客户端计算机**

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 选择 **Launch instances**。

1. 输入客户端计算机的**名称**，例如 **mkc-tutorial-client**。

1. 对于**亚马逊机器映像（AMI）类型**，始终选中 **Amazon Linux 2 AMI（HVM）– 内核 5.10，SSD 卷类型**。

1. 选择 **t2.xlarge** 实例类型。

1. 在**密钥对（登录）**下，选择**创建新密钥对**。为**密钥对名称**输入 **mkc-tutorial-key-pair**，然后选择**下载密钥对**。此外，您还可使用现有密钥对。

1. 选择**启动实例**。

1. 选择**查看实例**。然后，在**安全组**列中，选择与新的实例关联的安全组。复制并保存安全组的 ID，以供稍后使用。

**允许新创建的客户端向集群发送数据**

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 在左侧窗格的**安全性**下，选择**安全组**。在**安全组 ID** 列中，找到集群的安全组。您在 [设置 MSK Connect 所需的资源](mkc-tutorial-setup.md) 中创建集群时保存了该安全组的 ID。通过选中该安全组行左侧的复选框来选择该安全组。确保没有同时选择其他安全组。

1. 在屏幕的下半部分，选择**入站规则**选项卡。

1. 选择**编辑入站规则**。

1. 在屏幕的左下角，选择**添加规则**。

1. 在新规则中，选择**类型**列中的**所有流量**。在**源**列右侧的字段中，输入客户端计算机的安全组 ID。这是您在创建客户端计算机后保存的安全组 ID。

1. 选择**保存规则**。您的 MSK 集群现在将接受来自您在上一程序中创建的客户端的所有流量。

**要创建主题，请执行以下操作**

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 在实例表中选择 `mkc-tutorial-client`。

1. 在屏幕顶部附近，选择**连接**，然后按照说明连接到实例。

1. 通过运行以下命令在客户端实例上安装 Java：

   ```
   sudo yum install java-1.8.0
   ```

1. 运行以下命令以下载 Apache Kafka。

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**注意**  
如果您希望使用此命令中使用的镜像站点之外的镜像站点，则可在 [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) 网站上选择其他镜像站点。

1. 在上一步中将 TAR 文件下载到的目录中运行以下命令。

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. 转到 **kafka\$12.12-2.2.1** 目录。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧窗格中，选择**集群**，然后选择名称 `mkc-tutorial-cluster`。

1. 选择**查看客户端信息**。

1. 复制**明文**连接字符串。

1. 选择**完成**。

1. 在客户端实例 (`mkc-tutorial-client`) 上运行以下命令，*bootstrapServerString*替换为您在查看集群的客户机信息时保存的值。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   如果此命令成功，您将看到以下消息：`Created topic mkc-tutorial-topic.`

**下一步**

[创建连接器](mkc-create-connector.md)

# 创建连接器
<a name="mkc-create-connector"></a>

此过程介绍了如何使用 AWS 管理控制台创建连接器。

**创建连接器**

1. 登录并在[https://console.aws.amazon.com/msk/家中打开 Amazon MSK 控制台？ AWS 管理控制台 region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧窗格中，展开 **MSK Connect**，然后选择**连接器**。

1. 选择 **Create connector (创建连接器)**。

1. 在插件列表中，选择 `mkc-tutorial-plugin`，然后选择**下一步**。

1. 对于连接器名称，请输入 `mkc-tutorial-connector`。

1. 在集群列表中，选择 `mkc-tutorial-cluster`。

1. 在**连接器网络设置**部分，为网络类型选择以下选项之一：
   + **IPv4**（默认）- IPv4 仅用于通过连接目的地
   + **双栈**-用于通过 IPv4 和连接到目的地 IPv6 （仅当您的子网具有 IPv4 IPv6 CIDR 块与之关联时才可用）

1. 复制以下配置，并将其粘贴到连接器配置字段中。

   确保将区域替换为创建连接器 AWS 区域 所在位置的代码。此外，在以下示例中，将 Amazon S3 存储桶名称替换为存储桶的名称。*<amzn-s3-demo-bucket-my-tutorial>*

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. 在**访问权限**下，选择 `mkc-tutorial-role`。

1. 选择**下一步**。在**安全性**页面上，再次选择**下一步**。

1. 在**日志**页面上，选择**下一步**。

1. 在**查看并创建**页面上，查看您的连接器配置，然后选择**创建连接器**。

**下一步**

[向 MSK 集群发送数据](mkc-send-data.md)

# 向 MSK 集群发送数据
<a name="mkc-send-data"></a>

在此步骤中，您将数据发送到之前创建的 Apache Kafka 主题，然后在目标 S3 存储桶中查找相同的数据。

**向 MSK 集群发送数据**

1. 在客户端实例上的 Apache Kafka 安装 `bin` 文件夹中，创建一个名为 `client.properties` 的文本文件，该文件包含以下内容。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. 运行以下命令以创建控制台生成器。*BootstrapBrokerString*替换为运行上一个命令时获得的值。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. 输入所需的任何消息，然后按 **Enter**。重复执行此步骤两次或三次。每次输入一行并按 **Enter** 时，该行会作为单独的消息发送到您的 Apache Kafka 集群。

1. 查看目标 Amazon S3 存储桶，查找您在上一步中发送的消息。

# 了解连接器
<a name="msk-connect-connectors"></a>

连接器会持续将数据来源中的流数据复制到您的 Apache Kafka 集群，或者持续将数据从集群复制到数据接收器中，从而将外部系统和 Amazon 服务与 Apache Kafka 集群相集成。连接器还可以执行轻量级逻辑，例如在将数据传送到目标之前进行转换、格式转换或数据筛选。源连接器从数据来源提取数据，并将这些数据推送到集群中，而接收器连接器则从集群中提取数据，并将这些数据推送到数据接收器中。

下图显示了连接器的架构。工作程序是运行连接器逻辑的 Java 虚拟机（JVM）进程。每个工作程序都会创建一组任务，这些任务在并行线程中运行并执行复制数据的工作。任务不存储状态，因此可以随时启动、停止或重新启动，以提供弹性且可扩展的数据管道。

![\[显示连接器集群架构的示意图。\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/images/mkc-worker-architecture.png)


# 了解连接器容量
<a name="msk-connect-capacity"></a>

连接器的总容量取决于该连接器拥有的工作器数量以及每个工作人员的 MSK Connect 单元数 (MCUs)。每个 MCU 代表 1 个 vCPU 的计算能力和 4GiB 的内存。MCU 内存与工作程序实例的总内存有关，而不是正在使用的堆内存。

MSK Connect 工作程序使用客户提供的子网中的 IP 地址。每个工作程序都使用客户提供的子网中的一个 IP 地址。您应确保在提供给 CreateConnector 请求的子网中有足够的可用的 IP 地址来考虑其指定容量，尤其是在自动缩放连接器时，工作人员数量可能会波动。

要创建连接器，必须选择以下两种容量模式之一。
+ *已预置* – 如果您知道连接器的容量要求，请选择此模式。指定两个值：
  + 工作程序数量。
  +  MCUs 每个工作人员的人数。
+ *自动扩缩* – 如果连接器的容量要求各不相同，或者您事先不知道连接器的容量要求，请选择此模式。当您使用自动缩放模式时，Amazon MSK Connect 会使用与连接器中运行的工作线程数量和每个工作线程的数量成比例的值来覆盖连接器的`tasks.max`属性。 MCUs 

  指定三组值：
  + 最小和最大工作程序数量。
  + CPU 利用率的横向缩减百分比和横向扩展百分比，该百分比由 `CpuUtilization` 指标确定。当连接器的 `CpuUtilization` 指标超过横向扩展百分比时，MSK Connect 会增加连接器中运行的工作程序数量。当 `CpuUtilization` 指标低于横向缩减百分比时，MSK Connect 会减少工作程序数量。工作程序的数量将始终保持在创建连接器时指定的最小和最大数量之间。
  +  MCUs 每个工作人员的人数。
  + （可选）*最大自动缩放任务数*-自动缩放操作期间分配给连接器的最大任务数。此参数允许您设置任务创建的上限，从而更好地控制与 Kafka 主题分区相关的资源利用率和并行度。

有关工作人员的更多信息[了解 MSK Connect 工作程序](msk-connect-workers.md)，请参阅；有关最大自动缩放任务数的更多信息，请参阅[了解最大自动缩放任务数](msk-connect-max-autoscaling-task-count.md)。要了解有关 MSK Connect 指标的信息，请参阅[监控 Amazon MSK Connect](mkc-monitoring-overview.md)。

# 了解最大自动缩放任务数
<a name="msk-connect-max-autoscaling-task-count"></a>

该`maxAutoscalingTaskCount`参数是一个可选容量字段，可用于 Amazon MSK Connect 中的自动缩放连接器。此参数允许您设置连接器自动缩放操作期间可以创建的最大任务数的上限，从而更好地控制资源利用率和性能。

当您使用自动扩展容量模式时，Amazon MSK Connect 会自动使用与工作人员数量和 MCUs 每个工作人员的数量成比例的值来覆盖连接器的`tasks.max`属性。该`maxAutoscalingTaskCount`参数提供了一个额外的可配置选项，用于限制为连接器创建的最大任务数。

当您想要控制与 Kafka 集群中主题分区数量相关的并行度时，此功能特别有用。通过设置此限制，您可以优化性能并防止在自动计算的任务计数超过工作负载要求时可能出现的任务分配效率低下。

## 配置要求
<a name="msk-connect-max-autoscaling-task-count-requirements"></a>

该`maxAutoscalingTaskCount`参数必须满足以下要求：

```
maxAutoscalingTaskCount ≥ maxWorkerCount
```

此要求通过为每位工作人员维护至少一项任务来确保资源的有效利用。系统强制执行此最低限度以优化连接器功能。

如果指定`maxAutoscalingTaskCount`，则该限制将在连接器创建后以及所有后续扩展事件中立即应用。在自动缩放操作期间，随着工作人员数量的增加或减少，系统将继续遵守此限制。该`tasks.max`值根据工作人员数量和 MCUs 每个工作人员的数量按比例调整，但永远不会超过配置`maxAutoscalingTaskCount`的值。

如果您未指定此参数，则连接器将使用没有任何限制的标准计算：`tasks.max = workerCount × mcuCount × tasksPerMcu`（其中 tasksPerMcu 为 2）。

## 何时使用 C maxAutoscalingTask ount
<a name="msk-connect-max-autoscaling-task-count-when-to-use"></a>

考虑`maxAutoscalingTaskCount`在以下场景中使用：
+ *分区数有限*：当你的 Kafka 主题的固定分区数低于自动计算的任务数时，设置限制可以防止创建没有工作可执行的空闲任务。
+ *性能优化*：当您确定特定任务计数可为您的工作负载提供最佳吞吐量时，您可以限制最大任务数以保持稳定的性能。
+ *资源管理*：无论有多少工作线程在运行，都要控制连接器的最大并行度和资源消耗。

## 示例
<a name="msk-connect-max-autoscaling-task-count-example"></a>

对于具有以下配置的连接器：

```
minWorkerCount: 1
maxWorkerCount: 4
mcuCount: 8
maxAutoscalingTaskCount: 15
```

否则`maxAutoscalingTaskCount`，当扩展到 4 个工作器时，连接器将创建 64 个任务（每个 MCU 4 个工作人员 MCUs × 8 × 2 个任务）。`maxAutoscalingTaskCount`设置为 15 时，连接器仅创建 15 个任务，如果您的 Kafka 主题有 15 个或更少的分区，这可能更合适。

# 配置双栈网络类型
<a name="msk-connect-dual-stack"></a>

Amazon MSK Connect 支持新连接器的双栈网络类型。使用双栈网络，您的连接器可以通过 IPv4 和 IPv6连接到目的地。请注意， IPv6 连接仅在双栈模式 (IPv4 \$1 IPv6) 下可用，不支持 IPv6仅联网。

默认情况下，新连接器使用 IPv4 网络类型。要创建双栈网络类型的连接器，请确保您已满足下一节中描述的先决条件。请注意，使用双栈网络类型创建连接器后，就无法修改其网络类型。要更改网络类型，必须删除并重新创建连接器。

Amazon MSK Connect 还支持通过 IPv6 和 IPv4进行服务 API 终端节点连接。要使用 IPv6 连接进行 API 调用，您需要使用双堆栈终端节点。有关 MSK Connect 服务终端节点的更多信息，请参阅[亚马逊 MSK Connect 终端节点和](https://docs.aws.amazon.com/general/latest/gr/msk-connect.html)配额。

## 使用双栈网络类型的先决条件
<a name="dual-stack-prerequisites"></a>

在为连接器配置双栈网络类型之前，请确保在创建连接器期间提供的所有子网均已分配双栈网络 IPv6 和 IPv4 CIDR 块。

## 使用双栈网络类型的注意事项
<a name="dual-stack-considerations"></a>
+ IPv6 支持目前仅在双堆栈模式 (IPv4 \$1 IPv6) 下可用，不支持仅在双栈模式 (\$1) 下可用 IPv6
+ 启用双堆栈的连接器可以同时连接到 MSK IPv4 和 IPv6 Sink 或源数据系统
+ 创建连接器后无法修改网络类型-必须删除并重新创建连接器才能更改网络类型
+ 连接器创建期间指定的所有子网都必须支持双堆栈，才能成功创建双栈网络类型的连接器
+ 如果使用双栈子网但未指定网络类型，则为了向后兼容，连接器将默认为 IPv4-only
+ 对于现有连接器，您无法更新网络类型-必须删除并重新创建连接器才能更改网络类型
+ 使用双栈联网不会产生额外费用

# 创建 连接器
<a name="mkc-create-connector-intro"></a>

此过程介绍了如何使用 AWS 管理控制台创建连接器。

**使用创建连接器 AWS 管理控制台**

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在左侧窗格的 **MSK Connect** 下，选择**连接器**。

1. 选择 **Create connector (创建连接器)**。

1. 您可以选择使用现有的自定义插件来创建连接器，也可以先创建新的自定义插件。有关自定义插件以及如何创建这些插件的信息，请参阅[创建自定义插件](msk-connect-plugins.md)。在此过程中，假设您有一个要使用的自定义插件。在自定义插件列表中，找到要使用的插件，选中其左侧的复选框，然后选择**下一步**。

1. 输入名称和描述（可选）。

1. 选择您想要连接到的集群。

1. 在 “**连接器网络设置”** 部分，为网络类型选择以下选项之一：
   + **IPv4**（默认）- IPv4 仅用于通过连接目的地
   + **双栈**-用于通过 IPv4 和连接到目的地 IPv6 （仅当您的子网具有 IPv4 IPv6 CIDR 块与之关联时才可用）

1. 指定连接器配置。您需要指定的配置参数取决于要创建的连接器类型。但是，部分参数是所有连接器通用的参数，例如 `connector.class` 和 `tasks.max` 参数。以下是 [Confluent Amazon S3 Sink Connector](https://www.confluent.io/hub/confluentinc/kafka-connect-s3) 的配置示例。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. 接下来，配置您的连接器容量。您可以在两种容量模式之间选择：已预置和自动扩缩。有关这两个选项的信息，请参阅[了解连接器容量](msk-connect-capacity.md)。

1. （可选）在 “**最大自动扩缩任务数**” 部分中，使用 “最大自动扩缩任务数” 字段输入要在自动缩放操作期间分配给连接器的最大任务数。该值必须至少等于您的最大工作人员人数。如果您未指定值，则连接器将使用没有任何限制的标准计算。有关更多信息，请参阅 [了解最大自动缩放任务数](msk-connect-max-autoscaling-task-count.md)。

1. 选择默认工作程序配置或自定义工作程序配置。有关创建自定义工作程序配置的信息，请参阅[了解 MSK Connect 工作程序](msk-connect-workers.md)。

1. 接下来，指定服务执行角色。这必须是 MSK Connect 可以担任的 IAM 角色，该角色向连接器授予访问必要 AWS 资源所需的所有权限。这些权限取决于连接器的逻辑。有关如何创建此角色的信息，请参阅[了解服务执行角色](msk-connect-service-execution-role.md)。

1. 选择**下一步**，查看安全信息，然后再次选择**下一步**。

1. 指定所需的日志记录选项，然后选择**下一步**。有关日志记录的信息，请参阅[为 MSK Connect 进行日志记录](msk-connect-logging.md)。

1. 在**查看并创建**页面上，查看您的连接器配置，然后选择**创建连接器**。

要使用 MSK Connect API 创建连接器，请参阅[CreateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateConnector.html)。

可使用 `UpdateConnector` API 修改连接器配置。有关更多信息，请参阅 [更新连接器](mkc-update-connector.md)。

# 更新连接器
<a name="mkc-update-connector"></a>

此过程介绍了如何使用 AWS 管理控制台更新现有 MSK Connect 连接器的配置。

**使用更新连接器配置 AWS 管理控制台**

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在左侧窗格的 **MSK Connect** 下，选择**连接器**。

1. 选择现有的连接器。

1. 选择**编辑连接器配置**。

1. 更新连接器配置。您无法`connector.class`使用进行覆盖 UpdateConnector。以下是 Confluent Amazon S3 Sink Connector 的配置示例。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. 选择**提交**。

1. 然后，可以在连接器的**操作**选项卡中监视操作的当前状态。

要使用 MSK Connect API 更新连接器的配置，请参阅[UpdateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_UpdateConnector.html)。

# 通过连接器连接
<a name="msk-connect-from-connectors"></a>

以下最佳实践可以提高您与 Amazon MSK Connect 的连接性能。

## 请勿与 Amazon VPC IPs 对等互连或 Transit Gateway 重叠
<a name="CIDR-ip-ranges"></a>

如果您使用的是带有 Amazon MSK Connect 的 Amazon VPC 对等连接或 Transit Gateway，请不要将连接器配置为访问 CIDR 范围内的对等 VPC 资源 IPs ：
+ “10.99.0.0/16”
+ “192.168.0.0/16”
+ “172.21.0.0/16”

# 创建自定义插件
<a name="msk-connect-plugins"></a>

插件是一种 AWS 资源，其中包含定义连接器逻辑的代码。您可以将 JAR 文件（或包含一个或多个 JAR 文件的 ZIP 文件）上传到 S3 存储桶，并在创建插件时指定存储桶的位置。创建插件后，MSK Connect 会复制该时间点的 S3 对象的内容。它不维护指向 S3 对象的链接，因此对该对象的任何后续修改都不会影响插件或其连接器。创建连接器时，需要指定您想要 MSK Connect 用于该连接器的插件。插件与连接器的关系是 one-to-many：你可以从同一个插件创建一个或多个连接器。

**注意**  
自定义插件无法在原地更新。要使用插件代码的新版本，请删除所有引用该插件的连接器，删除该插件，然后重新创建它。

**自定义插件的依赖包装**  
我们建议您为插件添加所有必需的 JAR 文件和依赖项。Package 将您的连接器打包为以下之一：  
一个 ZIP 文件，其中包含插件所需的所有必需的 JAR 文件和依赖关系。
一个 uber JAR，其中包含插件及其依赖项的所有类文件。
不捆绑插件依赖关系可能会影响运行时环境中的可用性或兼容性，并导致意外错误。

有关如何开发连接器代码的信息，请参阅 Apache Kafka 文档中的[连接器开发指南](https://kafka.apache.org/documentation/#connect_development)。

**使用创建自定义插件 AWS 管理控制台**

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在左侧窗格的 **MSK Connect** 下，选择**自定义插件**。

1. 选择**创建自定义插件**。

1. 选择**浏览 S3**。

1. 在 S3 存储桶列表中，选择包含插件的 JAR 或 ZIP 文件的存储桶。

1. 在对象列表中，选中插件的 JAR 或 ZIP 文件左侧的复选框，然后选择**选择**。

1. 选择**创建自定义插件**。

要使用 MSK Connect API 创建自定义插件，请参阅[CreateCustomPlugin](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateCustomPlugin.html)。

# 了解 MSK Connect 工作程序
<a name="msk-connect-workers"></a>

工作程序是运行连接器逻辑的 Java 虚拟机（JVM）进程。每个工作程序都会创建一组任务，这些任务在并行线程中运行并执行复制数据的工作。任务不存储状态，因此可以随时启动、停止或重新启动，以提供弹性且可扩展的数据管道。剩余工作程序会自动检测到工作程序数量的变化，无论是由于扩展事件还是意外故障所致。它们会进行协调，以重新平衡剩余工作程序集合的任务。Connect 工作程序使用 Apache Kafka 的使用器组来协调和重新平衡。

如果您的连接器容量要求变化不定或难以估计，则可以让 MSK Connect 根据需要在您指定的下限和上限之间扩展工作程序数量。或者，您可以指定要运行连接器逻辑的确切工作程序数量。有关更多信息，请参阅 [了解连接器容量](msk-connect-capacity.md)。

**MSK Connect 工作程序使用 IP 地址**  
MSK Connect 工作程序使用客户提供的子网中的 IP 地址。每个工作程序都使用客户提供的子网中的一个 IP 地址。您应确保在提供给 CreateConnector 请求的子网中有足够的可用的 IP 地址来考虑其指定容量，尤其是在自动缩放连接器时，工作人员数量可能会波动。

## 默认工作程序配置
<a name="msk-connect-default-worker-config"></a>

MSK Connect 提供以下默认工作程序配置：

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# 支持的工作程序配置属性
<a name="msk-connect-supported-worker-config-properties"></a>

MSK Connect 提供默认的工作程序配置。您还可以选择创建用于连接器的自定义工作程序配置。以下列表包含有关 Amazon MSK Connect 支持或不支持的工作程序配置属性的信息。
+ 只有 `key.converter` 和 `value.converter` 属性为必需。
+ MSK Connect 支持以下 `producer.` 配置属性。

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ MSK Connect 支持以下 `consumer.` 配置属性。

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ 支持所有其他不以 `producer.` 或 `consumer.` 前缀开头的配置属性，但以下属性*除外*。

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

有关工作程序配置属性及其表示的更多信息，请参阅 Apache Kafka 文档中的 [Kafka Connect Configs](https://kafka.apache.org/documentation/#connectconfigs)。

# 创建自定义工作程序配置
<a name="msk-connect-create-custom-worker-config"></a>

此过程介绍了如何使用 AWS 管理控制台创建自定义工作程序配置。

**使用创建自定义工作器配置 AWS 管理控制台**

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在左侧窗格的 **MSK Connect** 下，选择**工作程序配置**。

1. 选择**创建工作程序配置**。

1. 输入名称和可选描述，然后添加属性和要将属性设置为的值。

1. 选择**创建工作程序配置**。

要使用 MSK Connect API 创建工作器配置，请参阅[CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html)。

# 使用 `offset.storage.topic` 管理源连接器偏移
<a name="msk-connect-manage-connector-offsets"></a>

本节提供的信息可帮助您使用*偏移存储主题*管理源连接器偏移。偏移存储主题是 Kafka Connect 用来存储连接器和任务配置偏移的内部主题。

## 注意事项
<a name="msk-connect-manage-connector-offsets-considerations"></a>

在管理源连接器偏移时，请考虑以下几点。
+ 要指定偏移存储主题，请提供将连接器偏移作为工作程序配置中 `offset.storage.topic` 的值进行存储的 Kafka 主题名称。
+ 更改连接器配置时要谨慎行事。如果源连接器将配置中的值用于键偏移记录，则更改配置值可能会导致连接器出现意想不到的行为。我们建议您参考插件的文档以获取指导。
+ **自定义默认分区数** – 除了通过添加 `offset.storage.topic` 来自定义工作程序配置外，您还可以为偏移和状态存储主题自定义分区数量。内部主题的默认分区如下。
  + `config.storage.topic`：1，不可配置，必须是单分区主题
  + `offset.storage.topic`：25，可通过提供 `offset.storage.partitions` 进行配置
  + `status.storage.topic`：5，可通过提供 `status.storage.partitions` 进行配置
+ **手动删除主题** – Amazon MSK Connect 在每次部署连接器时都会创建新的 Kafka 连接内部主题（主题名称以 `__amazon_msk_connect` 开头）。附加到已删除连接器的旧主题不会自动删除，因为内部主题（例如 `offset.storage.topic`）可以在连接器之间重复使用。但是，您可以手动删除 MSK Connect 创建的未使用的内部主题。内部主题按照 `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` 格式命名。

  正则表达式 `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` 可用于删除内部主题。您不应删除正在运行的连接器当前正在使用的内部主题。
+ **对 MSK Connect 创建的内部主题使用相同名称** – 如果要重复使用偏移存储主题来消耗先前创建的连接器的偏移，则必须为新连接器指定与旧连接器相同的名称。可以使用工作程序配置来设置 `offset.storage.topic` 属性，以便将相同的名称分配到 `offset.storage.topic`，并在不同的连接器之间重复使用。[管理连接器偏移](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)中描述了此配置。MSK Connect 不允许不同的连接器共享 `config.storage.topic` 和 `status.storage.topic`。每次在 MSKC 中创建新连接器时都会创建这些主题。它们会按照 `__amazon_msk_connect_<status|configs>_connector_name_connector_id` 格式自动命名，因此在您创建的不同连接器中会有所不同。

# 使用默认偏移存储主题
<a name="msk-connect-default-offset-storage-topic"></a>

默认情况下，Amazon MSK Connect 会在 Kafka 集群上为您创建的每个连接器生成一个新的偏移存储主题。MSK 使用部分连接器 ARN 构造默认主题名称。例如 `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`。

# 使用自定义偏移存储主题
<a name="msk-connect-set-offset-storage-topic"></a>

要在源连接器之间提供偏移连续性，您可以使用自己选择的偏移存储主题来代替默认主题。指定偏移存储主题可以帮助您完成创建源连接器之类的任务，该连接器可从上一个连接器的最后一个偏移恢复读取。

要指定偏移存储主题，请在创建连接器之前在工作程序配置中为 `offset.storage.topic` 属性提供一个值。如果要重复使用偏移存储主题来消耗先前创建的连接器的偏移，则必须为新连接器指定与旧连接器相同的名称。如果您创建自定义偏移存储主题，则必须在主题配置中将 [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) 设置为 `compact`。

**注意**  
如果您在创建*接收器*连接器时指定了偏移存储主题，若该主题尚不存在，则 MSK Connect 会创建该主题。但是，该主题不会用于存储连接器偏移，  
而是使用 Kafka 使用器组协议来管理接收器连接器偏移。每个接收器连接器都会创建一个名为 `connect-{CONNECTOR_NAME}` 的组。只要使用器组存在，您创建的任何具有相同 `CONNECTOR_NAME` 值的连续接收器连接器都将从上次提交的偏移继续。

**重要**  
如果您想在保持偏移连续性的同时更新现有连接器配置，请使用 UpdateConnector API。有关更多信息，请参阅 [更新连接器](mkc-update-connector.md)。

**Example ：在重新创建源连接器时指定偏移存储主题**  
如果您需要在保持偏移连续性的同时删除和重新创建连接器，则可以在工作器配置中指定偏移存储主题。例如，假设您有一个变更数据捕获 (CDC) 连接器，并且您想在不丢失 CDC 数据流中的位置的情况下重新创建它。以下步骤演示如何完成此任务。  

1. 在您的客户端计算机上，运行以下命令以查找连接器偏移存储主题的名称。将 `<bootstrapBrokerString>` 替换为集群的引导代理字符串。有关获取引导代理字符串的说明，请参阅[获取 Amazon MSK 集群的引导代理](msk-get-bootstrap-brokers.md)。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   以下输出显示了所有集群主题的列表，包括所有默认的内部连接器主题。在此示例中，现有 CDC 连接器使用由 MSK Connect 创建的[默认偏移存储主题](msk-connect-default-offset-storage-topic.md)。这就是偏移存储主题名为 `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2` 的原因。

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk) 打开 Amazon MSK 控制台。

1. 从**连接器**列表中选择您的连接器。复制并保存**连接器配置**字段的内容，以便您可以对其进行修改并使用它来创建新连接器。

1. 要删除连接器，请选择**删除**。然后在文本输入字段中输入连接器名称，以确认删除。

1. 使用适合您场景的值创建自定义工作程序配置。有关说明，请参阅[创建自定义工作程序配置](msk-connect-create-custom-worker-config.md)。

   在工作程序配置中，必须将之前检索到的偏移存储主题的名称指定为类似于以下配置中 `offset.storage.topic` 的值。

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**重要**  
必须为新连接器指定与旧连接器相同的名称。

   使用在上一步中设置的工作程序配置创建新连接器。有关说明，请参阅[创建 连接器](mkc-create-connector-intro.md)。

# 教程：使用配置提供程序将敏感信息外部化
<a name="msk-connect-config-provider"></a>

此示例演示了如何使用开源配置提供程序将 Amazon MSK Connect 的敏感信息外部化。配置提供程序允许您在连接器或工作程序配置中指定变量而不是明文，在连接器中运行的工作程序会在运行时系统解析这些变量。这样可以防止凭证和其他密钥以明文形式存储。示例中的配置提供程序支持从 S AWS ecrets Manager、Amazon S3 和 Systems Manager (SSM) 检索配置参数。在[步骤 2](#msk-connect-config-providers) 中，您可以看到如何为要配置的服务设置敏感信息的存储和检索。

## 注意事项
<a name="msk-connect-config-providers-considerations"></a>

将 MSK 配置提供程序与 Amazon MSK Connect 配合使用时，请考虑以下事项：
+ 向 IAM 服务执行角色分配使用配置提供程序时的适当权限。
+ 在工作程序配置中定义配置提供程序及其在连接器配置中的实现。
+ 如果插件未将敏感配置值定义为秘密，则这些值可能会出现在连接器日志中。Kafka Connect 对未定义的配置值的处理方式与任何其他明文值相同。要了解更多信息，请参阅[防止连接器日志中出现秘密](msk-connect-logging.md#msk-connect-logging-secrets)。
+ 默认情况下，当连接器使用配置提供程序时，MSK Connect 会经常重新启动该连接器。要关闭此重启行为，可以在连接器配置中将 `config.action.reload` 值设置为 `none`。

## 创建自定义插件并上传到 S3
<a name="msk-connect-config-providers-create-custom-plugin"></a>

 要创建自定义插件，请在本地计算机上运行以下命令 msk-config-provider来创建一个包含连接器的 zip 文件。

**使用终端窗口和 Debezium 作为连接器创建自定义插件**

使用 AWS CLI 以拥有允许您访问 AWS S3 存储桶的凭据的超级用户身份运行命令。有关安装和设置 AWS CLI 的信息，请参阅《*AWS Command Line Interface 用户指南》*[中的 AWS CLI 入门](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)。有关在 Amazon S3 中使用 AWS CLI 的信息，请参阅*AWS Command Line Interface 用户指南*中的[将 Amazon S3 与 AWS CLI 配合使用](https://docs.aws.amazon.com/cli/latest/userguide/cli-services-s3.html)。

1. 在终端窗口中，使用以下命令在工作区中创建一个名为 `custom-plugin` 的文件夹。

   ```
   mkdir custom-plugin && cd custom-plugin
   ```

1. 使用以下命令从 [Debezium 网站](https://debezium.io/releases/)下载最新稳定版本的 **MySQL Connector 插件**。

   ```
   wget https://repo1.maven.org/maven2/io/debezium/debezium-connectormysql/
   2.2.0.Final/debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

   使用以下命令将下载的 gzip 文件提取到 `custom-plugin` 文件夹中。

   ```
   tar xzf debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

1. 使用以下命令下载 [MSK 配置提供程序 zip 文件](https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip)。

   ```
   wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip
   ```

   使用以下命令将下载的 zip 文件提取到 `custom-plugin` 文件夹中。

   ```
   unzip msk-config-providers-0.4.0-with-dependencies.zip
   ```

1. 将上述步骤中的 MSK 配置提供程序和自定义连接器的内容压缩到名为 `custom-plugin.zip` 的单个文件中。

   ```
   zip -r ../custom-plugin.zip * 
   ```

1. 将文件上传到 S3 以供日后参考。

   ```
   aws s3 cp ../custom-plugin.zip s3:<S3_URI_BUCKET_LOCATION>
   ```

1. 在 Amazon MSK 控制台的 **MSK Connect** 部分下，选择**自定义插件**，然后选择**创建自定义插件**并浏览 **s3: < *S3\$1URI\$1BUCKET\$1LOCATION* >** S3 存储桶以选择您刚刚上传的自定义插件 ZIP 文件。  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/images/s3-object-browser.png)

1. 对于插件名称，输入 **debezium-custom-plugin**。或者，输入描述并选择**创建自定义插件**。  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/images/create-custom-plugin.png)

## 为不同的提供程序配置参数和权限
<a name="msk-connect-config-providers"></a>

您可以在以下三个服务中配置参数值：
+ Secrets Manager 
+ Systems Manager Parameter Store
+ S3 – Simple Storage Service

选择以下选项卡之一，获取有关为该服务设置参数和相关权限的说明。

------
#### [ Configure in Secrets Manager ]

**在 Secrets Manager 中配置参数值**

1. 打开 [Secrets Manager 控制台](https://console.aws.amazon.com/secretsmanager/)。

1. 创建新密钥来存储凭证或密钥。有关说明，请参阅《*AWS Secrets Manager 用户指南》*中的[创建 AWS Secrets Manager 密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html)。

1. 复制密钥的 ARN。

1. 将以下示例策略中的 Secrets Manager 权限添加到您的[服务执行角色](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)。将示例 ARN `arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234` 替换为密钥的 ARN。

1. 添加工作程序配置和连接器说明。  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Action": [
                       "secretsmanager:GetResourcePolicy",
                       "secretsmanager:GetSecretValue",
                       "secretsmanager:DescribeSecret",
                       "secretsmanager:ListSecretVersionIds"
                   ],
                   "Resource": [
                   "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
                   ]
               }
           ]
       }
   ```

1. 要使用 Secrets Manager 配置提供程序，请在步骤 3 中将以下几行代码复制到工作程序配置文本框中：

   ```
   # define name of config provider:
   
   config.providers = secretsmanager
   
   # provide implementation classes for secrets manager:
   
   config.providers.secretsmanager.class = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.secretsmanager.param.region = us-east-1
   ```

1. 对于 Secrets Manager 配置提供程序，请在步骤 4 中复制连接器配置的以下几行代码。

   ```
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   ```

您也可以将上述步骤与更多配置提供程序一起使用。

------
#### [ Configure in Systems Manager Parameter Store ]

**在 Systems Manager Parameter Store 中配置参数值**

1. 打开 [Systems Manager 控制台](https://console.aws.amazon.com/systems-manager/)。

1. 在导航窗格中，选择 **Parameter Store**。

1. 创建要存储在 Systems Manager 中的新参数。有关说明，请参阅《 AWS Systems Manager 用户指南》中的 “[创建 Systems Manager 参数（控制台）](https://docs.aws.amazon.com/systems-manager/latest/userguide/parameter-create-console.html)”。

1. 复制参数的 ARN。

1. 将以下示例策略中的 Systems Manager 权限添加到您的[服务执行角色](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)。*<arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName>*替换为参数的 ARN。  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": [
                       "ssm:GetParameterHistory",
                       "ssm:GetParametersByPath",
                       "ssm:GetParameters",
                       "ssm:GetParameter"
                   ],
                   "Resource": "arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName"
               }
           ]
       }
   ```

1. 要使用 Parameter Store 配置提供程序，请在步骤 3 中将以下几行代码复制到工作程序配置文本框中：

   ```
   # define name of config provider:
   
   config.providers = ssm
   
   # provide implementation classes for parameter store:
   
   config.providers.ssm.class = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.ssm.param.region = us-east-1
   ```

1. 对于 Parameter Store 配置提供程序，请在步骤 5 中复制连接器配置的以下几行代码。

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   ```

   您也可以将上述两个步骤与更多配置提供程序捆绑使用。

------
#### [ Configure in Amazon S3 ]

**要在 Amazon S3 objects/files 中进行配置**

1. 打开 [Amazon S3 控制台](https://console.aws.amazon.com/s3/)。

1. 将对象上传到 S3 中的存储桶。有关说明，请参阅[上传对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 复制对象的 ARN。

1. 将以下示例策略中的 Amazon S3 对象读取权限添加到您的[服务执行角色](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)。将示例 ARN `arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>` 替换为对象的 ARN。  
****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": "s3:GetObject",
                   "Resource": "arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>"
               }
           ]
       }
   ```

1. 要使用 Amazon S3 配置提供程序，请在步骤 3 中将以下几行代码复制到工作程序配置文本框中：

   ```
   # define name of config provider:
   
   config.providers = s3import
   # provide implementation classes for S3:
   
   config.providers.s3import.class = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   ```

1. 对于 Amazon S3 配置提供程序，请在步骤 4 中将以下几行代码复制到连接器配置。

   ```
   #Example implementation for S3 object
   
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

   您也可以将上述两个步骤与更多配置提供程序捆绑使用。

------

## 使用与配置提供程序有关的信息创建自定义工作程序配置
<a name="msk-connect-config-providers-create-custom-config"></a>

1. 在 **Amazon MSK Connect** 部分下选择**工作程序配置**。

1. 选择**创建工作程序配置**。

1. 在“工作程序配置名称”文本框中输入 `SourceDebeziumCustomConfig`。“描述”是选填项。

1. 根据所需的提供程序复制相关的配置代码，然后将其粘贴到**工作程序配置**文本框中。

1. 以下是所有三个提供程序的工作程序配置示例：

   ```
   key.converter=org.apache.kafka.connect.storage.StringConverter
   key.converter.schemas.enable=false
   value.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter.schemas.enable=false
   offset.storage.topic=offsets_my_debezium_source_connector
   
   # define names of config providers:
   
   config.providers=secretsmanager,ssm,s3import
   
   # provide implementation classes for each provider:
   
   config.providers.secretsmanager.class    = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   config.providers.ssm.class               = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   config.providers.s3import.class          = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   
   config.providers.secretsmanager.param.region = us-east-1
   config.providers.ssm.param.region = us-east-1
   ```

1. 单击“创建工作程序配置”。

## 创建连接器
<a name="msk-connect-config-providers-create-connector"></a>

1. 按照[创建新连接器](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html)中的说明，创建新连接器。

1. 选择您在 [创建自定义插件并上传到 S3](#msk-connect-config-providers-create-custom-plugin) 中上传到 S3 存储桶中的 `custom-plugin.zip` 文件作为自定义插件的来源。

1. 根据所需的提供程序复制相关的配置代码，然后将其粘贴到“工作程序配置”字段中。

1. 以下是所有三个提供程序的连接器配置示例：

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   
   #Example implementation for Amazon S3 file/object
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

1. 选择 “**使用自定义配置”，**然后**SourceDebeziumCustomConfig**从 “**工作器配置**” 下拉列表中进行选择。

1. 按照[创建连接器](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html)中说明的其余步骤进行操作。

# MSK Connect 的 IAM 角色和策略
<a name="msk-connect-iam"></a>

本部分帮助您设置相应的 IAM 策略和角色，以便在您的 AWS 环境中安全地部署和管理 Amazon MSK Connect。以下部分介绍了必须与 MSK Connect 一起使用的服务执行角色，包括连接至经 IAM 身份验证的 MSK 集群时所需的信任策略和额外权限。该页面还提供了授予对 MSK Connect 功能完全访问权限的全面 IAM 策略的示例，以及该服务可用的 AWS 托管策略的详细信息。

**Topics**
+ [了解服务执行角色](msk-connect-service-execution-role.md)
+ [MSK Connect 的 IAM 策略示例](mkc-iam-policy-examples.md)
+ [防范跨服务混淆代理问题](cross-service-confused-deputy-prevention.md)
+ [AWS MSK Connect 的托管策略](mkc-security-iam-awsmanpol.md)
+ [使用 MSK Connect 的服务相关角色](mkc-using-service-linked-roles.md)

# 了解服务执行角色
<a name="msk-connect-service-execution-role"></a>

**注意**  
Amazon MSK Connect 不支持使用[服务相关角色](mkc-using-service-linked-roles.md)作为服务执行角色。您必须创建单独的服务执行角色。有关如何创建自定义 IAM 角色的说明，请参阅 I [A *M 用户指南*中的创建角色以向 AWS 服务委派权限](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)。

在使用 MSK Connect 创建连接器时，您需要指定要与之配合使用的 AWS Identity and Access Management (IAM) 角色。您的服务执行角色必须具有以下信任策略，以便 MSK Connect 可以代入该角色。有关此策略中条件上下文键的说明，请参阅 [防范跨服务混淆代理问题](cross-service-confused-deputy-prevention.md)。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/myConnector/abc12345-abcd-4444-a8b9-123456f513ed-2"
        }
      }
    }   
  ]
}
```

------

如果您想要与连接器一起使用的 Amazon MSK 集群使用 IAM 身份验证，则必须向连接器的服务执行角色添加以下权限策略。有关如何查找集群的 UUID 以及如何构造主题的信息 ARNs，请参阅[授权策略资源](kafka-actions.md#msk-iam-resources)。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:000000000001:cluster/testClusterName/300d0000-0000-0005-000f-00000000000b-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/myCluster/300a0000-0000-0003-000a-00000000000b-6/__amazon_msk_connect_read"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_write"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/__amazon_msk_connect_*",
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/connect-*"
            ]
        }
    ]
}
```

------

根据连接器的类型，您可能还需要为服务执行角色附加允许其访问 AWS 资源的权限策略。例如，如果您的连接器需要向 S3 存储桶发送数据，则服务执行角色必须具有授予写入该存储桶之权限的权限策略。出于测试目的，您可以使用其中一个预构建 IAM policy 来授予完全访问权限，例如 `arn:aws:iam::aws:policy/AmazonS3FullAccess`。但是，出于安全考虑，我们建议您使用最严格的策略，允许您的连接器从 AWS 源读取数据或写入 AWS 接收器。

# MSK Connect 的 IAM 策略示例
<a name="mkc-iam-policy-examples"></a>

要向非管理员用户授予对所有 MSK Connect 功能的完全访问权限，请将如下策略附加到该用户的 IAM 角色。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKConnectFullAccess",
      "Effect": "Allow",
      "Action": [
        "kafkaconnect:CreateConnector",
        "kafkaconnect:DeleteConnector",
        "kafkaconnect:DescribeConnector",
        "kafkaconnect:ListConnectors",
        "kafkaconnect:UpdateConnector",
        "kafkaconnect:CreateCustomPlugin",
        "kafkaconnect:DeleteCustomPlugin",
        "kafkaconnect:DescribeCustomPlugin",
        "kafkaconnect:ListCustomPlugins",
        "kafkaconnect:CreateWorkerConfiguration",
        "kafkaconnect:DeleteWorkerConfiguration",
        "kafkaconnect:DescribeWorkerConfiguration",
        "kafkaconnect:ListWorkerConfigurations"
      ],
      "Resource": "*"
    },
    {
      "Sid": "IAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKConnectServiceRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafkaconnect.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EC2NetworkAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeVpcs",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    },
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/"
    },
    {
      "Sid": "MSKLogGroupAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams",
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:us-east-1:123456789012:log-group:/aws/msk-connect/*"
      ]
    },
    {
      "Sid": "S3PluginAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins",
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins/*"
      ]
    }
  ]
}
```

------

# 防范跨服务混淆代理问题
<a name="cross-service-confused-deputy-prevention"></a>

混淆代理问题是一个安全性问题，即不具有某操作执行权限的实体可能会迫使具有更高权限的实体执行该操作。在中 AWS，跨服务模仿可能会导致混乱的副手问题。一个服务（*呼叫服务*）调用另一项服务（*所谓的服务*）时，可能会发生跨服务模拟。可以操纵调用服务，使用其权限以在其他情况下该服务不应有访问权限的方式对另一个客户的资源进行操作。为防止这种情况， AWS 提供可帮助您保护所有服务的数据的工具，而这些服务中的服务主体有权限访问账户中的资源。

我们建议在资源策略中使用 [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn) 和 [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount) 全局条件上下文键，以限制 MSK Connect 为其他服务提供的资源访问权限。如果 `aws:SourceArn` 值不包含账户 ID，例如 Amazon S3 存储桶 ARN 不包含账户 ID，您必须使用两个全局条件上下文键来限制权限。如果同时使用全局条件上下文密钥和包含账户 ID 的 `aws:SourceArn` 值，则 `aws:SourceAccount` 值和 `aws:SourceArn` 值中的账户在同一策略语句中使用时，必须使用相同的账户 ID。如果您只希望将一个资源与跨服务访问相关联，请使用 `aws:SourceArn`。如果您想允许该账户中的任何资源与跨服务使用操作相关联，请使用。`aws:SourceAccount`

对于 MSK Connect，`aws:SourceArn` 的值必须是 MSK 连接器。

防范混淆代理问题最有效的方法是使用 `aws:SourceArn` 全局条件上下文键和资源的完整 ARN。如果不知道资源的完整 ARN，或者正在指定多个资源，请针对 ARN 未知部分使用带有通配符（`*`）的 `aws:SourceArn` 全局上下文条件键。例如，*arn:aws:kafkaconnect:us-east-1:123456789012:connector/\$1*表示属于美国东部（弗吉尼亚北部）区域中编号为 123456789012 的账户的所有连接器。

以下示例演示了如何使用 MSK Connect 中的 `aws:SourceArn` 和 `aws:SourceAccount` 全局条件上下文键来防范混淆代理问题。用你的*123456789012*和连接器信息替换并 arn: aws: kafkaconnect::: connector/ *us-east-1*。*123456789012* *my-S3-Sink-Connector* *abcd1234-5678-90ab-cdef-1234567890ab* AWS 账户 

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": " kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
        "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-S3-Sink-Connector/abcd1234-5678-90ab-cdef-1234567890ab"
        }
      }
    }   
  ]
}
```

------

# AWS MSK Connect 的托管策略
<a name="mkc-security-iam-awsmanpol"></a>

 AWS 托管策略是由创建和管理的独立策略 AWS。 AWS 托管策略旨在为许多常见用例提供权限，以便您可以开始为用户、组和角色分配权限。

请记住， AWS 托管策略可能不会为您的特定用例授予最低权限权限，因为它们可供所有 AWS 客户使用。我们建议通过定义特定于使用案例的[客户管理型策略](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies)来进一步减少权限。

您无法更改 AWS 托管策略中定义的权限。如果 AWS 更新 AWS 托管策略中定义的权限，则更新会影响该策略所关联的所有委托人身份（用户、组和角色）。 AWS 最有可能在启动新的 API 或现有服务可以使用新 AWS 服务 的 API 操作时更新 AWS 托管策略。

有关更多信息，请参阅《IAM 用户指南》**中的 [AWS 托管式策略](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies)。

## AWS 托管策略：Amazon MSKConnect ReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKConnectReadOnlyAccess"></a>

此策略向用户授予列出和描述 MSK Connect 资源所需的权限。

您可以将 `AmazonMSKConnectReadOnlyAccess` 策略附加到 IAM 身份。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:ListCustomPlugins",
                "kafkaconnect:ListWorkerConfigurations"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeConnector"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:connector/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeCustomPlugin"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:custom-plugin/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeWorkerConfiguration"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:worker-configuration/*"
            ]
        }
    ]
}
```

------

## AWS 托管策略： KafkaConnectServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaConnectServiceRolePolicy"></a>

此策略向 MSK Connect 服务授予创建和管理带有 `AmazonMSKConnectManaged:true` 标签的网络接口所需的权限。这些网络接口允许 MSK Connect 通过网络访问 Amazon VPC 中的资源，例如 Apache Kafka 集群、源或接收器。

您无法附加 KafkaConnectServiceRolePolicy 到您的 IAM 实体。此策略附加到服务相关角色，允许 MSK Connect 代表您执行操作。

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateTags"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeNetworkInterfaces",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:AttachNetworkInterface",
				"ec2:DetachNetworkInterface",
				"ec2:DeleteNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}
```

------

## MSK Connect 对 AWS 托管策略的更新
<a name="security-iam-awsmanpol-updates"></a>

查看自该服务开始跟踪这些更改以来，MSK Connect AWS 托管策略更新的详细信息。


| 更改 | 描述 | 日期 | 
| --- | --- | --- | 
|  MSK Connect 更新了只读策略  |  MSK Connect 更新了亚马逊MSKConnectReadOnlyAccess 政策，取消了对上架操作的限制。  | 2021 年 10 月 13 日 | 
|  MSK Connect 开启了跟踪更改  |  MSK Connect 开始跟踪其 AWS 托管策略的更改。  | 2021 年 9 月 14 日 | 

# 使用 MSK Connect 的服务相关角色
<a name="mkc-using-service-linked-roles"></a>

Amazon MSK Connect 使用 AWS Identity and Access Management (IAM) [服务相关](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role)角色。服务相关角色是一种独特类型的 IAM 角色，它直接链接到 MSK Connect。服务相关角色由 MSK Connect 预定义，包括该服务代表您调用其他 AWS 服务所需的所有权限。

服务相关角色可让您更轻松地设置 MSK Connect，因为您不必手动添加所需权限。MSK Connect 定义其服务相关角色的权限，除非另外定义，否则只有 MSK Connect 可以代入其角色。定义的权限包括信任策略和权限策略，而且权限策略不能附加到任何其他 IAM 实体。

有关支持服务相关角色的其他服务的信息，请参阅[使用 IAM 的AWS 服务](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html)并查找 **Service-Linked Role**（服务相关角色）列中显示为 **Yes**（是）的服务。选择**是**和链接，查看该服务的服务关联角色文档。

## MSK Connect 的服务相关角色权限
<a name="slr-permissions"></a>

MSK Connect 使用名为 **AWSServiceRoleForKafkaConnect**— 允许亚马逊 MSK Connect 代表您访问亚马逊资源的服务相关角色。

 AWSServiceRoleForKafkaConnect 服务相关角色信任`kafkaconnect.amazonaws.com`服务来代替该角色。

有关该角色使用的权限策略的信息，请参阅 [AWS 托管策略： KafkaConnectServiceRolePolicy](mkc-security-iam-awsmanpol.md#security-iam-awsmanpol-KafkaConnectServiceRolePolicy)。

您必须配置权限，允许 IAM 实体（如用户、组或角色）创建、编辑或删除服务关联角色。有关更多信息，请参阅《IAM 用户指南》**中的[服务关联角色权限](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions)。

## 创建 MSK Connect 的服务相关角色
<a name="create-slr"></a>

您无需手动创建服务关联角色。当您在 AWS 管理控制台、或 AWS API 中创建连接器时 AWS CLI，MSK Connect 会为您创建服务相关角色。

如果您删除该服务关联角色，然后需要再次创建，您可以使用相同流程在账户中重新创建此角色。当您创建连接器时，MSK Connect 将再次为您创建服务相关角色。

## 编辑 MSK Connect 的服务相关角色
<a name="edit-slr"></a>

MSK Connect 不允许您编辑 AWSServiceRoleForKafkaConnect 服务相关角色。在创建服务相关角色后，您将无法更改角色的名称，因为可能有多种实体引用该角色。不过，您可以使用 IAM 编辑角色的说明。有关更多信息，请参阅《IAM 用户指南》**中的[编辑服务关联角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role)。

## 删除 MSK Connect 的服务相关角色
<a name="delete-slr"></a>

您可以使用 IAM 控制台、 AWS CLI 或 AWS API 手动删除服务相关角色。为执行此操作，您必须先手动删除所有 MSK Connect 连接器，然后才能手动删除该角色。有关更多信息，请参见《IAM 用户指南》**中的[删除服务相关角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#delete-service-linked-role)。

## MSK Connect 服务相关角色的受支持区域
<a name="slr-regions"></a>

MSK Connect 支持在该服务可用的所有区域中使用服务相关角色。有关更多信息，请参阅 [AWS 区域和端点](https://docs.aws.amazon.com/general/latest/gr/rande.html)。

# 为 Amazon MSK Connect 启用互联网访问
<a name="msk-connect-internet-access"></a>

如果您的 Amazon MSK Connect 连接器需要访问互联网，我们建议您使用以下 Amazon Virtual Private Cloud (VPC) 设置来启用该访问权限。
+ 使用私有子网配置连接器。
+ 在公有子网中为您的 VPC 创建公有 [NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)或 [NAT 实例](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html)。有关更多信息，请参阅《*Amazon Virtual Private Cloud**用户指南*》中的 “[ VPCs 使用 NAT 设备将子网连接到 Internet 或其他设备](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)” 页面。
+ 允许从私有子网到 NAT 网关或实例的出站流量。

# 为 Amazon MSK Connect 设置 NAT 网关
<a name="msk-connect-internet-access-private-subnets-example"></a>

以下步骤显示如何设置 NAT 网关，以便为连接器启用互联网访问。在私有子网中创建连接器之前，必须完成这些步骤。

## 设置 NAT 网关的完整先决条件
<a name="msk-connect-internet-access-private-subnets-prereq"></a>

确保您已具有以下项目。
+ 与您的集群关联的 Amazon Virtual Private Cloud (VPC) 的 ID。例如 *vpc-123456ab*。
+ 您 IDs 的 VPC 中的私有子网。例如 *subnet-a1b2c3de*、*subnet-f4g5h6ij* 等。您必须使用私有子网配置连接器。

## 为连接器启用互联网访问的步骤
<a name="msk-connect-internet-access-private-subnets-steps"></a>

**为连接器启用互联网访问**

1. 打开 Amazon Virtual Private Cloud 控制台，网址为[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)。

1. 使用描述性名称为您的 NAT 网关创建一个公有子网，并记下子网 ID。有关详细说明，请参阅[在 VPC 中创建子网](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)。

1. 创建互联网网关以便您的 VPC 可以与互联网通信，并记下网关 ID。将互联网网关附加到 VPC。有关说明，请参阅[创建并附加互联网网关](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)。

1. 预置公有 NAT 网关，以便私有子网中的主机可以访问您的公有子网。创建 NAT 网关时，请选择之前创建的公有子网。有关说明，请参阅[创建 NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)。

1. 配置路由表。您总共必须有两个路由表才能完成此设置。您应该已经有一个与您的 VPC 同时自动创建的主路由表。在此步骤中，您需为公有子网创建额外的路由表。

   1. 使用以下设置修改 VPC 的主路由表，以便私有子网将流量路由到您的 NAT 网关。有关说明，请参阅《Amazon Virtual Private Cloud用户指南》****中的[使用路由表](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html)。  
**私有 MSKC 路由表**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

   1. 按照[创建自定义路由表](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)中的说明，为公有子网创建路由表。创建表时，在**名称标签**字段中输入描述性名称，以帮助您识别该表与哪个子网关联。例如**公有 MSKC**。

   1. 使用以下设置配置您的**公有 MSKC** 路由表。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

# 了解私有 DNS 主机名
<a name="msk-connect-dns"></a>

借助 MSK Connect 中的私有 DNS 主机名支持，您可以配置连接器以参考公有或私有域名。支持取决于 VPC *DHCP 选项集中*指定的 DNS 服务器。

DHCP 选项集是一组网络配置，可供 VPC 中的 EC2 实例用于通过 VPC 网络进行通信。每个 VPC 都有一个默认 DHCP 选项集，但如果您希望 VPC 中的实例使用不同的 DNS 服务器来进行域名解析，而不使用 Amazon 提供的 DNS 服务器，您也可以创建自定义 DHCP 选项集。参阅 [Amazon VPC 中的 DHCP 选项集](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html)。

在 MSK Connect 中包含私有 DNS 解析 capability/feature 之前，连接器使用服务 VPC DNS 解析器进行来自客户连接器的 DNS 查询。连接器未使用客户 VPC DHCP 选项集中定义的 DNS 服务器进行 DNS 解析。

连接器只能参考在客户连接器配置或插件中可公开解析的主机名。它们无法解析在私有托管区中定义的私有主机名，也无法在其他客户网络中使用 DNS 服务器。

如果没有私有 DNS，那些选择让自己的数据库、数据仓库和系统（例如自己 VPC 中的 Secrets Manager）无法访问互联网的客户就无法使用 MSK 连接器。客户经常使用私有 DNS 主机名来遵循企业安全状况要求。

# 配置连接器的 VPC DHCP 选项集
<a name="msk-connect-dns-config-dhcp"></a>

创建连接器时，连接器会自动使用在其 VPC DHCP 选项集中定义的 DNS 服务器。在创建连接器之前，请确保已为连接器的 DNS 主机名解析要求配置 VPC DHCP 选项集。

在 MSK Connect 中提供私有 DNS 主机名功能之前创建的连接器将继续使用之前的 DNS 解析配置，无需进行任何修改。

如果您只需要在连接器中进行可公开解析的 DNS 主机名解析，为了更容易设置，建议您在创建连接器时使用账户的默认 VPC。有关 Amazon 提供的 DNS 服务器或 Amazon Route 53 Resolver 的更多信息，请参阅《Amazon VPC 用户指南》**中的 [Amazon DNS 服务器](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#AmazonDNS)。

如果您需要解析私有 DNS 主机名，请确保在创建连接器过程中传递的 VPC 的 DHCP 选项集已正确配置。有关更多信息，请参阅《Amazon VPC 用户指南》**中的[使用 DHCP 选项集](https://docs.aws.amazon.com/vpc/latest/userguide/DHCPOptionSet.html)。

在配置私有 DNS 主机名解析的 DHCP 选项集时，请确保连接器可以访问您在 DHCP 选项集中配置的自定义 DNS 服务器。否则，连接器将创建失败。

自定义 VPC DHCP 选项集后，随后在该 VPC 中创建的连接器将使用您在选项集中指定的 DNS 服务器。如果在创建连接器后更改选项集，则该连接器将在几分钟内采用新选项集中的设置。

# 配置 VPC 的 DNS 属性
<a name="msk-connect-dns-attributes"></a>

确保已按照《Amazon VPC 用户指南》**的[您 VPC 中的 DNS 属性](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-support)和 [DNS 主机名](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-hostnames)中所述正确配置 VPC DNS 属性。

有关使用入站[ VPCs 和出站解析器终端节点将其他网络连接到您的 VPC 以使用您的连接器的信息，请参阅 *Amazon Route 53 开发者指南*中的解决与您的网络之间的 DNS 查询](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver.html)。

# 处理连接器创建失败
<a name="msk-connect-dns-failure-handling"></a>

本节介绍与 DNS 解析相关的可能的连接器创建失败以及解决问题的建议操作。


| Failure | 建议采取的措施 | 
| --- | --- | 
|  如果 DNS 解析查询失败，或者无法从连接器访问 DNS 服务器，则连接器创建失败。  |  如果您已为连接器配置了这些日志，则可以在 CloudWatch 日志中看到由于 DNS 解析查询失败而导致的连接器创建失败。 检查 DNS 服务器配置，并确保从连接器到 DNS 服务器的网络连接可用。  | 
|  如果在连接器运行时更改 VPC DHCP 选项集中的 DNS 服务器配置，则来自连接器的 DNS 解析查询可能会失败。如果 DNS 解析失败，某些连接器任务可能会进入失败状态。  |  如果您已为连接器配置了这些日志，则可以在 CloudWatch 日志中看到由于 DNS 解析查询失败而导致的连接器创建失败。 失败的任务应自动重启以使连接器恢复正常。如果没有发生这种情况，您可以联系支持人员为其连接器重启失败的任务，也可以重新创建连接器。  | 

# MSK Connect 的安全性
<a name="msk-connect-security"></a>

您可以使用由 AWS PrivateLink提供支持的接口 VPC 终端节点来防止您的亚马逊 VPC 和兼容 Amazon MSK-Connect 之间的流量 APIs 离开亚马逊网络。接口 VPC 终端节点不需要互联网网关、NAT 设备、VPN 连接或 AWS Direct Connect 连接。有关更多信息，请参阅 [将 Amazon MSK APIs 与接口 VPC 终端节点配合使用](privatelink-vpc-endpoints.md)。

# 为 MSK Connect 进行日志记录
<a name="msk-connect-logging"></a>

MSK Connect 可以写入可用于调试连接器的日志事件。创建连接器时，您可以指定零个或多个以下日志目标：
+ Amazon CloudWatch 日志：您可以指定希望 MSK Connect 将连接器的日志事件发送到哪个日志组。有关如何创建日志组的信息，请参阅《[日志*用户指南》中的创建CloudWatch 日志*组](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html#Create-Log-Group)。
+ Amazon S3：您可以指定希望 MSK Connect 向其发送连接器日志事件的 S3 存储桶。有关如何创建 S3 存储桶的信息，请参阅《Amazon S3 用户指南》**中的[创建存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。
+ Amazon Data Firehose：您可以指定希望 MSK Connect 向其发送连接器日志事件的传输流。有关如何创建传输流的信息，请参阅《*Firehose 用户指南*》中的 [Creating an Amazon Data Firehose delivery stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)。

要了解有关设置日志记录的更多信息，请参阅《Amazon CloudWatch Logs 用户指南》**中的[启用从某些 AWS 服务进行日志记录](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html)。

MSK Connect 会发出以下类型的日志事件：


****  

| 级别 | 说明 | 
| --- | --- | 
| INFO | 启动和关闭时感兴趣的运行时系统事件。 | 
| WARN | 不是错误但不希望出现或意外的运行时系统情况。 | 
| FATAL | 导致过早终止的严重错误。 | 
| ERROR | 非致命的意外情况和运行时系统错误。 | 

以下是发送到 Log CloudWatch s 的日志事件的示例：

```
[Worker-0bb8afa0b01391c41] [2021-09-06 16:02:54,151] WARN [Producer clientId=producer-1] Connection to node 1 (b-1.my-test-cluster.twwhtj.c2.kafka.us-east-1.amazonaws.com/INTERNAL_IP) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:782)
```

## 防止连接器日志中出现秘密
<a name="msk-connect-logging-secrets"></a>

**注意**  
如果插件未将敏感配置值定义为秘密，则这些值可能会出现在连接器日志中。Kafka Connect 对未定义的配置值的处理方式与任何其他明文值相同。

如果您的插件将某个属性定义为秘密，则 Kafka Connect 会从连接器日志中编辑该属性的值。例如，以下连接器日志表明，如果插件将 `aws.secret.key` 定义为 `PASSWORD` 类型，则其值将替换为 `[hidden]`。

```
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] [2022-01-11 15:18:55,150] INFO SecretsManagerConfigProviderConfig values:
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.access.key = my_access_key
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.region = us-east-1
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.secret.key = [hidden]
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.prefix =
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.ttl.ms = 300000
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] (com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProviderConfig:361)
```

为防止连接器日志文件中出现秘密，插件开发人员必须使用 Kafka Connect 枚举常量 [https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD](https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD) 来定义敏感属性。当属性为类型 `ConfigDef.Type.PASSWORD` 时，Kafka Connect 会将其值从连接器日志中排除，即使该值以明文形式发送也一样。

# 监控 Amazon MSK Connect
<a name="mkc-monitoring-overview"></a>

监控是维护 MSK Connect 和其他 AWS 解决方案的可靠性、可用性和性能的重要组成部分。Amazon 会实时 CloudWatch 监控您的 AWS 资源和您运行 AWS 的应用程序。您可以收集和跟踪指标，创建自定义的控制面板，以及设置警报以在指定的指标达到您指定的阈值时通知您或采取措施。例如，您可以 CloudWatch 跟踪连接器的 CPU 使用率或其他指标，以便在需要时增加其容量。有关更多信息，请参阅 [Amazon CloudWatch 用户指南](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)。

您可以使用以下 API 操作：
+ `DescribeConnectorOperation`：监控连接器更新操作的状态。
+ `ListConnectorOperations`：跟踪以前在连接器上运行的更新。

下表显示了 MSK Connect 向该`ConnectorName`维度 CloudWatch 下发送的指标。默认情况下，MSK Connect 提供这些指标，不收取任何额外费用。 CloudWatch 将这些指标保留 15 个月，这样您就可以访问历史信息并更好地了解连接器的性能。还可以设置特定阈值监视警报，在达到对应阈值时发送通知或采取行动。有关更多信息，请参阅 [Amazon CloudWatch 用户指南](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)。


| 指标名称 | 说明 | 
| --- | --- | 
| CpuUtilization | 系统和用户的 CPU 消耗百分比。 | 
| ErroredTaskCount | 已出错的任务数量。 | 
| MemoryUtilization | 工作程序实例上总内存的百分比，而不仅仅是当前正在使用的 Java 虚拟机（JVM）堆内存。JVM 通常不会将内存释放回操作系统。因此，JVM 堆大小 (MemoryUtilization) 通常从最小堆大小开始，该堆大小逐渐增加到稳定的最大值，约为 80-90%。随着连接器实际内存使用量的变化，JVM 堆使用量可能会增加或减少。 | 
| RebalanceCompletedTotal | 此连接器完成的重新平衡总数。 | 
| RebalanceTimeAvg | 连接器在重新平衡上花费的平均时间（以毫秒为单位）。 | 
| RebalanceTimeMax | 连接器在重新平衡上花费的最长时间（以毫秒为单位）。 | 
| RebalanceTimeSinceLast |  自此连接器完成最近一次重新平衡以来的时间（以毫秒为单位）。  | 
| RunningTaskCount | 连接器中正在运行的任务数量。 | 
| SinkConsumerByteRate | 在对数据应用任何转换之前，Kafka Connect 框架的接收器使用者每秒使用的平均字节数。 | 
| SinkRecordReadRate | 平均每秒从 Apache Kafka 或 Amazon MSK 集群读取的记录数量。 | 
| SinkRecordSendRate | 平均每秒从转换中输出并发送到目标的记录数量。此数量不包含筛选后的记录。 | 
| SourceRecordPollRate | 平均每秒生成或轮询的记录数量。 | 
| SourceProducerByteRate | 在对数据应用任何转换之前，Kafka Connect 框架的源创建者每秒生成的平均字节数。 | 
| SourceRecordWriteRate | 平均每秒从转换中输出并写入 Apache Kafka 或 Amazon MSK 集群的记录数量。 | 
| TaskStartupAttemptsTotal | 连接器已尝试的任务启动总数。您可以使用此指标来识别任务启动尝试中的异常情况。 | 
| TaskStartupSuccessPercentage | 连接器成功启动任务的平均百分比。您可以使用此指标来识别任务启动尝试中的异常情况。 | 
| WorkerCount | 在连接器中运行的工作程序数量。 | 
| BytesInPerSec | 传输至 Kafka Connect 框架用于工作程序之间通信的元数据字节数。 | 
| BytesOutPerSec | 从 Kafka Connect 框架传输的用于工作程序之间通信的元数据字节数。 | 

# 设置 Amazon MSK Connect 资源的示例
<a name="msk-connect-examples"></a>

本节包含一些示例，可帮助您设置 Amazon MSK Connect 资源，例如常见的第三方连接器和配置提供程序。

**Topics**
+ [设置 Amazon S3 接收器连接器](mkc-S3sink-connector-example.md)
+ [为 MSK Connect 设置 EventBridge Kafka 水槽连接器](mkc-eventbridge-kafka-connector.md)
+ [使用带有配置提供程序的 Debezium 源连接器](mkc-debeziumsource-connector-example.md)

# 设置 Amazon S3 接收器连接器
<a name="mkc-S3sink-connector-example"></a>

此示例说明如何使用 Confluent [Amazon S3 接收器连接器和](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)在 MSK Conn AWS CLI ect 中创建 Amazon S3 接收器连接器。

1. 复制以下 JSON 并将其粘贴到新文件中。将占位符字符串替换为与 Amazon MSK 集群的引导服务器连接字符串以及集群的子网和安全组相对应的值。 IDs有关如何设置服务执行角色的信息，请参阅 [MSK Connect 的 IAM 角色和策略](msk-connect-iam.md)。

   ```
   {
       "connectorConfiguration": {
           "connector.class": "io.confluent.connect.s3.S3SinkConnector",
           "s3.region": "us-east-1",
           "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
           "flush.size": "1",
           "schema.compatibility": "NONE",
           "topics": "my-test-topic",
           "tasks.max": "2",
           "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
           "storage.class": "io.confluent.connect.s3.storage.S3Storage",
           "s3.bucket.name": "amzn-s3-demo-bucket"
       },
       "connectorName": "example-S3-sink-connector",
       "kafkaCluster": {
           "apacheKafkaCluster": {
               "bootstrapServers": "<cluster-bootstrap-servers-string>",
               "vpc": {
                   "subnets": [
                       "<cluster-subnet-1>",
                       "<cluster-subnet-2>",
                       "<cluster-subnet-3>"
                   ],
                   "securityGroups": ["<cluster-security-group-id>"]
               }
           }
       },
       "capacity": {
           "provisionedCapacity": {
               "mcuCount": 2,
               "workerCount": 4
           }
       },
       "kafkaConnectVersion": "2.7.1",
       "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
       "plugins": [
           {
               "customPlugin": {
                   "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
                   "revision": 1
               }
           }
       ],
       "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"},
       "kafkaClusterClientAuthentication": {"authenticationType": "NONE"}
   }
   ```

1. 在上一步中保存 JSON 文件的文件夹中运行以下 AWS CLI 命令。

   ```
   aws kafkaconnect create-connector --cli-input-json file://connector-info.json
   ```

   以下是您在成功运行命令后获得的输出示例。

   ```
   {
       "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
       "ConnectorState": "CREATING", 
       "ConnectorName": "example-S3-sink-connector"
   }
   ```

# 为 MSK Connect 设置 EventBridge Kafka 水槽连接器
<a name="mkc-eventbridge-kafka-connector"></a>

本主题向您展示如何为 MSK Connect 设置 [EventBridge Kafka 接收器连接器](https://github.com/awslabs/eventbridge-kafka-connector)。此连接器允许您将事件从 MSK 集群发送到 EventBridge [事件总线](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html)。本主题介绍创建所需资源和配置连接器以实现 Kafka 和 EventBridge之间的无缝数据流的过程。

**Topics**
+ [先决条件](#mkc-eb-kafka-prerequisites)
+ [设置 MSK Connect 所需的资源](#mkc-eb-kafka-set-up-resources)
+ [创建连接器](#mkc-eb-kafka-create-connector)
+ [向 Kafka 发送消息](#mkc-eb-kafka-send-json-encoded-messages)

## 先决条件
<a name="mkc-eb-kafka-prerequisites"></a>

部署连接器之前，请确保拥有以下资源：
+ **Amazon MSK 集群**：用于生成和使用 Kafka 消息的活动 MSK 集群。
+ **Amazon EventBridge 活动总线**：用于接收来自 Kafka 主题的事件的事件的活动总线。 EventBridge 
+ **IAM 角色**：创建具有 MSK Connect 和连接 EventBridge 器所需权限的 IAM 角色。
+ 通过 MSK Connect 或在 MSK 集群的 [VPC 和子网中 EventBridge 创建的 VPC 接口终端节点](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html)[访问公共互联](msk-connect-internet-access.md)网。这有助于避免直接穿越公共互联网，而不需要 NAT 网关。
+ [客户端计算机](create-serverless-cluster-client.md)，例如 Amazon EC2 实例或 [AWS CloudShell](https://aws.amazon.com/cloudshell/)，用于创建主题并向 Kafka 发送记录。

## 设置 MSK Connect 所需的资源
<a name="mkc-eb-kafka-set-up-resources"></a>

为连接器创建 IAM 角色，然后创建连接器。您还可以创建 EventBridge 规则来筛选发送到事件总线的 Kafka EventBridge 事件。

**Topics**
+ [连接器的 IAM 角色](#mkc-eb-kafka-iam-role-connector)
+ [传入事件的 EventBridge 规则](#mkc-eb-kafka-create-rule)

### 连接器的 IAM 角色
<a name="mkc-eb-kafka-iam-role-connector"></a>

您与连接器关联的 IAM 角色必须具有允许向其发送事件的[PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html)权限 EventBridge。以下 IAM 策略示例授予了向名为 `example-event-bus` 的事件总线发送事件的权限。确保将以下示例中的资源 ARN 替换为事件总线的 ARN。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

此外，还必须确保连接器的 IAM 角色包含以下信任策略。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### 传入事件的 EventBridge 规则
<a name="mkc-eb-kafka-create-rule"></a>

可以创建使传入事件与事件数据标准（称为[https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)）相匹配的[规则](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)。使用事件模式，可以定义传入事件筛选标准，并确定哪些事件应触发特定规则，然后将事件路由到指定[目标](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)。以下事件模式示例与发送到事件总线的 Kafka 事件相匹配。 EventBridge 

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

以下是 EventBridge 使用 Kafka 接收器连接器从 Kafka 发送到的事件的示例。

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

在 EventBridge 控制台中，使用此示例模式在事件总线上[创建规则](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html)并指定目标，例如 CloudWatch 日志组。 EventBridge 控制台将自动为 CloudWatch 日志组配置必要的访问策略。

## 创建连接器
<a name="mkc-eb-kafka-create-connector"></a>

在下一节中，您将使用创建和部署 [EventBridge Kafka 接收器连接器](https://github.com/awslabs/eventbridge-kafka-connector)。 AWS 管理控制台

**Topics**
+ [步骤 1：下载连接器](#mkc-eb-kafka-download-connector)
+ [步骤 2：创建 Amazon S3 存储桶](#mkc-eb-kafka-s3-bucket-create)
+ [步骤 3：在 MSK Connect 中创建插件](#mkc-eb-kafka-create-plugin)
+ [步骤 4：创建连接器](#mkc-eb-kafka-create-connector)

### 步骤 1：下载连接器
<a name="mkc-eb-kafka-download-connector"></a>

从 Ka EventBridge fka EventBridge 连接器的[GitHub 版本页面](https://github.com/awslabs/eventbridge-kafka-connector/releases)下载最新的连接器接收器 JAR。例如，要下载版本 v1.4.1，请选择 JAR 文件链接 `kafka-eventbridge-sink-with-dependencies.jar` 以下载连接器。然后，将文件保存到计算机上的首选位置。

### 步骤 2：创建 Amazon S3 存储桶
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. 要将 JAR 文件存储在 Amazon S3 中以用于 MSK Connect，请打开 AWS 管理控制台，然后选择 Amazon S3。

1. 在 Amazon S3 控制台中，选择**创建存储桶**，输入唯一存储桶名称。例如 **amzn-s3-demo-bucket1-eb-connector**。

1. 为 Amazon S3 存储桶选择合适的区域。确保与部署 MSK 集群的区域相匹配。

1. 对于**存储桶设置**，请保留默认选择或根据需要调整。

1. 选择**创建存储桶**。

1. 将 JAR 文件上传到 Amazon S3 存储桶中。

### 步骤 3：在 MSK Connect 中创建插件
<a name="mkc-eb-kafka-create-plugin"></a>

1. 打开 AWS 管理控制台，然后导航到 **MSK Connect**。

1. 在左侧导航窗格中，选择**自定义插件**。

1. 选择**创建插件**，然后输入**插件名称**。例如 **eventbridge-sink-plugin**。

1. 对于**自定义插件位置**，请粘贴 **S3 对象 URL**。

1. 为插件添加可选描述。

1. 选择**创建插件**。

创建插件后，您可以使用它在 MSK Connect 中配置和部署 EventBridge Kafka 连接器。

### 步骤 4：创建连接器
<a name="mkc-eb-kafka-create-connector"></a>

在创建连接器之前，建议创建所需的 Kafka 主题以避免连接器错误。要创建主题，请使用客户端计算机。

1. 在 MSK 控制台的左侧窗格中，选择**连接器**，然后选择**创建连接器**。

1. 在插件列表中，选择 **eventbridge-sink-plugin**，然后选择**下一步**。

1. 对于连接器名称，请输入 **EventBridgeSink**。

1. 在集群列表中，选择 MSK 集群。

1. <a name="connector-ex"></a>复制以下连接器配置，并将其粘贴到**连接器配置**字段中

   根据需要替换以下配置中的占位符。
   + 如果 MSK 集群可访问公共互联网，请将 `aws.eventbridge.endpoint.uri` 删除。
   + 如果您使用 PrivateLink 安全地从 MSK 连接到 EventBridge，请将后`https://`面的 DNS 部分替换为您之前创建的（可选）VPC 接口终端节点的 EventBridge 正确私有 DNS 名称。
   + 将以下配置中的 EventBridge 事件总线 ARN 替换为事件总线的 ARN。
   + 更新任何特定区域的值。

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   有关连接器配置的更多信息，请参见[eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector)。

   如果需要，请更改工作程序和自动缩放的设置。我们还建议从下拉列表中使用最新可用（推荐）的 Apache Kafka Connect 版本。在**访问权限**下，使用之前创建的角色。我们还建议启用日志功能，以实现可 CloudWatch 观察性和故障排除。根据需要调整其他可选设置，例如标签。然后，部署连接器并等待其状态进入运行状态。

## 向 Kafka 发送消息
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

可使用 Kafka Connect 中提供的 `value.converter` 设置，也可以选择使用 `key.converter` 设置来指定不同的转换器，从而配置消息编码，例如 Apache Avro 和 JSON。

如使用 `org.apache.kafka.connect.json.JsonConverter` 作为 `value converter` 所示，本主题中的 [connector example](#connector-ex) 配置为使用 JSON 编码的消息。当连接器处于“运行”状态时，从客户端计算机向 `msk-eventbridge-tutorial` Kafka 主题发送记录。

# 使用带有配置提供程序的 Debezium 源连接器
<a name="mkc-debeziumsource-connector-example"></a>

此示例演示了如何将 Debezium MySQL 连接器插件与兼容 MySQL 的 [Amazon Aurora](https://aws.amazon.com/rds/aurora/) 数据库一起用作来源。在此示例中，我们还设置了开源 [AWS Secrets Manager 配置提供程序](https://github.com/jcustenborder/kafka-config-provider-aws)来对 AWS Secrets Manager中的数据库凭证进行外部化。要了解有关配置提供程序的更多信息，请参阅[教程：使用配置提供程序将敏感信息外部化](msk-connect-config-provider.md)。

**重要**  
Debezium MySQL 连接器插件[仅支持一项任务](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max)，不使用 Amazon MSK Connect 的自动扩缩容量模式。您应该改为使用预置容量模式，并在连接器配置中将 `workerCount` 设置为 1。要了解有关 MSK Connect 容量模式的更多信息，请参阅[了解连接器容量](msk-connect-capacity.md)。

# 使用 Debezium 源连接器的完整先决条件
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

您的连接器必须能够访问互联网，这样它才能与诸如 AWS Secrets Manager 您之外的服务进行交互 Amazon Virtual Private Cloud。本节中的步骤可帮助您完成以下任务以启用互联网访问。
+ 设置托管 NAT 网关并将流量路由到 VPC 中互联网网关的公有子网。
+ 创建将私有子网流量定向到 NAT 网关的默认路由。

有关更多信息，请参阅 [为 Amazon MSK Connect 启用互联网访问](msk-connect-internet-access.md)。

**先决条件**

在启用互联网访问之前，您需要以下项目：
+ 与您的集群关联的 Amazon Virtual Private Cloud (VPC) 的 ID。例如 *vpc-123456ab*。
+ 您 IDs 的 VPC 中的私有子网。例如 *subnet-a1b2c3de*、*subnet-f4g5h6ij* 等。您必须使用私有子网配置连接器。

**为连接器启用互联网访问**

1. 打开 Amazon Virtual Private Cloud 控制台，网址为[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)。

1. 使用描述性名称为您的 NAT 网关创建一个公有子网，并记下子网 ID。有关详细说明，请参阅[在 VPC 中创建子网](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)。

1. 创建互联网网关以便您的 VPC 可以与互联网通信，并记下网关 ID。将互联网网关附加到 VPC。有关说明，请参阅[创建并附加互联网网关](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)。

1. 预置公有 NAT 网关，以便私有子网中的主机可以访问您的公有子网。创建 NAT 网关时，请选择之前创建的公有子网。有关说明，请参阅[创建 NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)。

1. 配置路由表。您总共必须有两个路由表才能完成此设置。您应该已经有一个与您的 VPC 同时自动创建的主路由表。在此步骤中，您需为公有子网创建额外的路由表。

   1. 使用以下设置修改 VPC 的主路由表，以便私有子网将流量路由到您的 NAT 网关。有关说明，请参阅《Amazon Virtual Private Cloud用户指南》****中的[使用路由表](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html)。  
**私有 MSKC 路由表**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. 按照[创建自定义路由表](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)中的说明，为公有子网创建路由表。创建表时，在**名称标签**字段中输入描述性名称，以帮助您识别该表与哪个子网关联。例如**公有 MSKC**。

   1. 使用以下设置配置您的**公有 MSKC** 路由表。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

现在，您已经为 Amazon MSK Connect 启用互联网访问，可以创建连接器了。

# 创建 Debezium 源连接器
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

此过程介绍了如何创建 Debezium 源连接器。

1. 

**创建自定义插件**

   1. 从 [Debezium](https://debezium.io/releases/) 网站下载 MySQL 连接器插件的最新稳定发行版。记下您下载的 Debezium 发行版（版本 2.x 或较旧的 1.x 系列）。在此程序的后面部分，您需根据您的 Debezium 版本创建连接器。

   1. 下载并解压缩 [AWS Secrets Manager 配置提供程序](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws)。

   1. 将以下档案文件放在同一个目录中：
      + `debezium-connector-mysql` 文件夹
      + `jcusten-border-kafka-config-provider-aws-0.1.1` 文件夹

   1. 将您在上一步中创建的目录压缩为 ZIP 文件，然后将该 ZIP 文件上传到 S3 存储桶。有关说明，请参阅《Amazon S3 用户指南》**中的[上传对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

   1. 复制以下 JSON 并将其粘贴到文件中。例如 `debezium-source-custom-plugin.json`。*<example-custom-plugin-name>*替换为您想要的插件名称、*<amzn-s3-demo-bucket-arn>*上传 ZIP 文件的 Amazon S3 存储桶的 ARN 以及`<file-key-of-ZIP-object>`上传到 S3 的 ZIP 对象的文件密钥。

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. 从保存 JSON 文件的文件夹中运行以下 AWS CLI 命令来创建插件。

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      您应该可以看到类似于以下示例的输出内容。

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. 运行以下命令以检查插件状态。状态应从 `CREATING` 更改为 `ACTIVE`。将 ARN 占位符替换为您在上一条命令的输出中获得的 ARN。

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**为您的数据库凭证配置 AWS Secrets Manager 和创建密钥**

   1. 打开 Secrets Manager 控制台，网址为[https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/)。

   1. 创建新密钥来存储您的数据库登录凭证。有关说明，请参阅《AWS Secrets Manager用户指南》**中的[创建密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html)。

   1. 复制密钥的 ARN。

   1. 将以下示例策略中的 Secrets Manager 权限添加到您的 [了解服务执行角色](msk-connect-service-execution-role.md)。*<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>*替换为你的密钥的 ARN。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      有关如何添加 IAM 权限的说明，请参阅《IAM 用户指南》**中的[添加和删除 IAM 身份权限](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

1. 

**使用与配置提供程序有关的信息创建自定义工作程序配置**

   1. 将以下工作程序配置属性复制到文件中，将占位符字符串替换为与您的场景对应的值。要了解有关 S AWS ecrets Manager Config Provider 配置属性的更多信息，请参阅[SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html)插件的文档。

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. 运行以下 AWS CLI 命令来创建您的自定义工作器配置。

      替换以下值：
      + *<my-worker-config-name>*-自定义工作器配置的描述性名称
      + *<encoded-properties-file-content-string>*-您在上一步中复制的纯文本属性的基于 base64 编码版本

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**创建连接器**

   1. 复制与 Debezium 版本（2.x 或 1.x）相对应的以下 JSON，并将其粘贴到新文件中。将 `<placeholder>` 字符串替换为与您的场景对应的值。有关如何设置服务执行角色的信息，请参阅 [MSK Connect 的 IAM 角色和策略](msk-connect-iam.md)。

      请注意，该配置使用诸如 `${secretManager:MySecret-1234:dbusername}` 之类的变量而不是明文来指定数据库凭证。将 `MySecret-1234` 替换为密钥名称，然后加入您想要检索的密钥名称。您还必须将 `<arn-of-config-provider-worker-configuration>` 替换为自定义工作程序配置的 ARN。

------
#### [ Debezium 2.x ]

      对于 Debezium 2.x 版本，请复制以下 JSON 并将其粘贴到新文件中。将 *<placeholder>* 字符串替换为与您的场景对应的值。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      对于 Debezium 1.x 版本，请复制以下 JSON 并将其粘贴到新文件中。将 *<placeholder>* 字符串替换为与您的场景对应的值。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. 在上一步中保存 JSON 文件的文件夹中运行以下 AWS CLI 命令。

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      以下是您在成功运行命令后获得的输出示例。

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# 更新 Debezium 连接器配置
<a name="mkc-debeziumsource-connector-update"></a>

要更新 Debezium 连接器配置，请执行以下步骤：

1. 复制以下 JSON 并将其粘贴到新文件中。将 `<placeholder>` 字符串替换为与您的场景对应的值。

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. 在上一步中保存 JSON 文件的文件夹中运行以下 AWS CLI 命令。

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   以下是您在成功运行命令后的输出示例。

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. 现在，可以运行以下命令来监控操作的当前状态：

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

有关包含详细步骤的 Debezium 连接器示例，请参阅 [Introducing Amazon MSK Connect - Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/)。

# 迁移到 Amazon MSK Connect
<a name="msk-connect-migrating"></a>

本节介绍如何将 Apache Kafka 连接器应用程序迁移到 Amazon Managed Streaming for Apache Kafka Connect（Amazon MSK Connect）。要详细了解迁移到 Amazon MSK Connect 的好处，请参阅[使用 Amazon MSK Connect 的好处](msk-connect.md#msk-connect-benefits)。

本节还介绍了 Kafka Connect 和 Amazon MSK Connect 使用的状态管理主题，并介绍了迁移源和接收器连接器的过程。

# 了解 Kafka Connect 使用的内部主题
<a name="msk-connect-kafka-connect-topics"></a>

在分布式模式下运行的 Apache Kafka Connect 应用程序使用 Kafka 集群中的内部主题和组成员资格来存储其状态。以下是与用于 Kafka Connect 应用程序的内部主题相对应的配置值：
+ 配置主题，通过 `config.storage.topic` 指定

  在配置主题中，Kafka Connect 存储用户已启动的所有连接器和任务的配置。每次用户更新连接器的配置或连接器请求重新配置时（例如，连接器检测到它可以启动更多任务），都会向此主题发出一条记录。此主题启用了压缩，因此它始终保留每个实体的最后状态。
+ 偏移量主题，通过 `offset.storage.topic` 指定

  在偏移量主题中，Kafka Connect 存储源连接器的偏移量。与配置主题一样，偏移量主题也启用了压缩。此主题仅用于写入从外部系统向 Kafka 生成数据的源连接器的源位置。从 Kafka 读取数据并发送到外部系统的接收器连接器使用常规 Kafka 消费者组存储其消费者偏移量。
+ 状态主题，通过 `status.storage.topic` 指定

  在状态主题中，Kafka Connect 存储连接器和任务的当前状态。此主题用作 REST API 用户查询的数据的中心位置。此主题允许用户查询任何工作程序，同时仍可获取所有正在运行的插件的状态。与配置和偏移量主题一样，状态主题也启用了压缩。

除了这些主题之外，Kafka Connect 还大量使用了 Kafka 的组成员资格 API。这些组以连接器名称命名。例如，对于名为 file-sink 的连接器，该组被命名为。 connect-file-sink组中的每个消费者都会向单个任务提供记录。可以使用常规消费者组工具（例如 `Kafka-consumer-group.sh`）检索这些组及其偏移量。对于每个接收器连接器，Connect 运行时都会运行一个从 Kafka 中提取记录的常规消费者组。

# Amazon MSK Connect 应用程序的状态管理
<a name="msk-connect-state-management"></a>

默认情况下，Amazon MSK Connect 在 Kafka 集群中为每个 Amazon MSK Connector 创建三个单独的主题，用于存储连接器的配置、偏移量和状态。默认主题名称的结构如下：
+ \$1\$1msk\$1connect\$1configs\$1 \$1 *connector-name* *connector-id*
+ \$1\$1msk\$1connect\$1status\$1 \$1 *connector-name* *connector-id*
+ \$1\$1msk\$1connect\$1offsets\$1 \$1 *connector-name* *connector-id*

**注意**  
要在源连接器之间提供偏移连续性，您可以使用自己选择的偏移存储主题来代替默认主题。指定偏移存储主题可以帮助您完成创建源连接器之类的任务，该连接器可从上一个连接器的最后一个偏移恢复读取。要指定偏移存储主题，请在创建连接器之前为 Amazon MSK Connect 工作程序配置中的 [https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets) 属性提供一个值。

# 将源连接器迁移到 Amazon MSK Connect
<a name="msk-connect-migrate-source-connectors"></a>

源连接器是将记录从外部系统导入 Kafka 的 Apache Kafka Connect 应用程序。本节介绍将本地运行的 Apache Kafka Connect 源连接器应用程序迁移到亚马逊 MSK Connect 的过程，或者将运行的自管理 Kafka Connect 集群迁移到 AWS Amazon MSK Connect。

Kafka Connect 源连接器应用程序将偏移量存储在一个主题中，该主题以为配置属性 `offset.storage.topic` 设置的值命名。以下是 JDBC 连接器的示例偏移量消息，该连接器运行两个任务，从名为 `movies` 和 `shows` 的两个不同表中导入数据。从表 movies 导入的最新行的主 ID 为 `18343`。从 shows 表导入的最新行的主 ID 为 `732`。

```
["jdbcsource",{"protocol":"1","table":"sample.movies"}] {"incrementing":18343}
["jdbcsource",{"protocol":"1","table":"sample.shows"}] {"incrementing":732}
```

要将源连接器迁移到 Amazon MSK Connect，请执行以下操作：

1. 通过从本地或自行管理的 Kafka Connect 集群中提取连接器库来创建 Amazon MSK Connect [自定义插件](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)。

1. 创建 Amazon MSK Connect [工作程序属性](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config)，并将属性 `key.converter`、`value.converter` 和 `offset.storage.topic` 设置为与为现有 Kafka Connect 集群中运行的 Kafka 连接器设置的值相同的值。

1. 通过在现有 Kafka Connect 集群上发出 `PUT /connectors/connector-name/pause` 请求来暂停现有集群上的连接器应用程序。

1. 确保所有连接器应用程序的任务都已完全停止。您可以通过在现有 Kafka Connect 集群上发出 `GET /connectors/connector-name/status` 请求或使用来自为属性 `status.storage.topic` 设置的主题名称的消息来停止任务。

1. 从现有集群获取连接器配置。您可以通过在现有集群上发出 `GET /connectors/connector-name/config/` 请求或使用来自为属性 `config.storage.topic` 设置的主题名称的消息来获取连接器配置。

1. 创建与现有集群同名的新 [Amazon MSK 连接器](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html)。使用您在步骤 1 中创建的连接器自定义插件、在步骤 2 中创建的 Worker 属性和在步骤 5 中提取的连接器配置来创建此连接器。

1. 当 Amazon MSK 连接器状态为 `active` 时，请查看日志以验证连接器是否已开始从源系统导入数据。

1. 通过发出 `DELETE /connectors/connector-name` 请求来删除现有集群中的连接器。

# 将接收器连接器迁移到 Amazon MSK Connect
<a name="msk-connect-migrate-sink-connectors"></a>

接收器连接器是将数据从 Kafka 导出到外部系统的 Apache Kafka Connect 应用程序。本节介绍将本地运行的 Apache Kafka Connect 接收器应用程序迁移到亚马逊 MSK Connect 的过程，或者将运行的自管理 Kafka Connect 集群迁移到 AWS Amazon MSK Connect。

Kafka Connect 接收器连接器使用 Kafka 组成员资格 API，并将偏移量存储在与典型消费者应用程序相同的 `__consumer_offset` 主题中。此行为简化了将接收器连接器从自托管迁移到 Amazon MSK Connect 的过程。

要将接收器连接器迁移到 Amazon MSK Connect，请执行以下操作：

1. 通过从本地或自行管理的 Kafka Connect 集群中提取连接器库来创建 Amazon MSK Connect [自定义插件](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)。

1. 创建 Amazon MSK Connect [工作程序属性](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config)，并将属性 `key.converter` 和 `value.converter` 设置为与为现有 Kafka Connect 集群中运行的 Kafka 连接器设置的值相同的值。

1. 通过在现有 Kafka Connect 集群上发出 `PUT /connectors/connector-name/pause` 请求来暂停现有集群上的连接器应用程序。

1. 确保所有连接器应用程序的任务都已完全停止。您可以通过在现有 Kafka Connect 集群上发出 `GET /connectors/connector-name/status` 请求或使用来自为属性 `status.storage.topic` 设置的主题名称的消息来停止任务。

1. 从现有集群获取连接器配置。您可以通过在现有集群上发出 `GET /connectors/connector-name/config` 请求或使用来自为属性 `config.storage.topic` 设置的主题名称的消息来获取连接器配置。

1. 创建与现有集群同名的新 [Amazon MSK 连接器](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html)。使用您在步骤 1 中创建的连接器自定义插件、在步骤 2 中创建的 Worker 属性和在步骤 5 中提取的连接器配置来创建此连接器。

1. 当 Amazon MSK 连接器状态为 `active` 时，请查看日志以验证连接器是否已开始从源系统导入数据。

1. 通过发出 `DELETE /connectors/connector-name` 请求来删除现有集群中的连接器。

# 排查 Amazon MSK Connect 中的问题
<a name="msk-connect-troubleshooting"></a>

以下信息可帮助您排查使用 MSK Connect 时可能存在的问题。您也可以将问题发布到 [AWS re:Post](https://repost.aws/)。

**连接器无法访问公有互联网上托管的资源**  
请参阅[为 Amazon MSK Connect 启用互联网访问](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access.html)。

**连接器正在运行的任务数不等于 tasks.max 中指定的任务数量**  
以下是连接器使用的任务可能少于指定的 tasks.max 配置的一些原因：
+ 某些连接器实现限制了可使用的任务数量。例如，适用于 MySQL 的 Debezium 连接器仅限于使用单个任务。
+ 使用自动扩展容量模式时，Amazon MSK Connect 会覆盖连接器的 tasks.max 属性，其值与连接器中运行的工作器数量和每个工作线程的数量成正比。 MCUs 如果您配置了可选`maxAutoscalingTaskCount`参数，则该`tasks.max`值将不会超过此限制。有关更多信息，请参阅[了解最大自动缩放任务数](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#msk-connect-max-autoscaling-task-count)。
+ 对于接收器连接器，并行度（任务数量）不能超过主题分区的数量。虽然您可以将 tasks.max 设置为大于该值，但单个分区一次只能由一个任务处理。
+ 在 Kafka Connect 2.7.x 中，默认的使用器分区分配器是 `RangeAssignor`。该分配器的行为是将每个主题的第一个分区分配给单个使用器，将每个主题的第二个分区分配给单个使用器，依此类推。这意味着，使用 `RangeAssignor` 的接收器连接器的最大活动任务数等于正在消耗的任何单个主题中的最大分区数。如果这不适用于您的用例，则应[创建一个工作程序配置](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)，其中将 `consumer.partition.assignment.strategy` 属性设置为更合适的使用器分区分配器。参见 [Kafka 2.7 接口 ConsumerPartitionAssignor：*所有已知的实现类*](https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html)。