

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建亚马逊 EventBridge 管道
<a name="eb-pipes-create"></a>

EventBridge Pipes 使您能够在源和目标之间创建 point-to-point集成，包括高级事件转换和扩展。

要创建 EventBridge 管道，请执行以下步骤：

1. [指定源](#pipes-configure-source)

1. [配置事件筛选（可选）](#pipes-configure-filtering)

1. [定义事件富集（可选）](#pipes-define-enrichment)

1. [配置目标](#pipes-configure-target)

1. [配置管道设置](#pipes-configure-pipe-settings)

要快速建立示例管道，请参见 [入门：创建 Amazon EventBridge 管道](pipes-get-started.md)。本主题 CloudFormation 用于部署管道及其相关资源，并引导您概述管道的功能。

有关如何使用 CLI 创建管道的信息，请参阅 AWS CL *AWS I 命令*参考中的 [create-](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/pipes/create-pipe.html) pipe。

## 指定源
<a name="pipes-configure-source"></a>

首先，请指定您希望管道接收事件的源。

**使用控制台指定管道源**

1. 打开 Amazon EventBridge 控制台，网址为[https://console.aws.amazon.com/events/](https://console.aws.amazon.com/events/)。

1. 在导航窗格中，选择**管道**。

1. 选择**创建管道**。

1. 输入管道的名称。

1. （可选）添加管道的描述。

1. 在**构建管道**选项卡上，对于**源**，选择要为此管道指定的源类型，然后配置此源。

   配置属性因您所选的源类型而异：

------
#### [ Confluent ]

**使用控制台将 Confluent 云流配置为源**

   1. 对于**源**，选择 **Confluent 云**。

   1. 对于**引导服务器**，输入代理的 `host:port` 配对地址。

   1. 对于**主题名称**，输入管道将要读取的主题名称。

   1. （可选）对于 **VPC**，选择所需的 VPC。然后，对于 **VPC 子网**，选择所需的子网。对于 **VPC 安全组**，选择安全组。

   1. 对于**身份验证 - 可选**，启用**使用身份验证**并执行以下操作：

      1. 对于**身份验证方法**，选择身份验证类型。

      1. 在**密钥**中选择密钥。

      有关更多信息，请参阅 Confluent 文档中的 [Authenticate to Confluent Cloud resources](https://docs.confluent.io/cloud/current/access-management/authenticate/overview.html)。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**起始位置**，请选择以下选项之一：
         + **最新** - 开始读取分片中包含最新记录的流。
         + **修剪水平线** - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      1. 对于**批次大小 - 可选**，输入每个批次的最大记录数。默认值是 100。

      1. 对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

------
#### [ DynamoDB ]

   1. 为**源**选择 **DynamoDB**。

   1. 对于 **DynamoDB 流**，选择要用作源的流。

   1. 对于**起始位置**，请选择以下选项之一：
      + **最新** - 开始读取分片中包含最新记录的流。
      + **修剪水平线** - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**批次大小 - 可选**，输入每个批次的最大记录数。默认值是 10。

      1. 对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

      1. 对于**每个分片的并发批次 - 可选**，输入同一分片中可以同时读取的批次数。

      1. 对于**部分批次项目失败时**，请选择以下选项：
         + **AUTOMATIC\$1BISECT** - 将每个批次一分为二，并分别重试，直到所有记录都处理完毕或批次中还剩下一条失败消息。
**注意**  
如果不选择 **AUTOMATIC\$1BISECT**，则可以返回特定的失败记录，只会重试这些记录。

------
#### [ Kinesis ]

**使用控制台配置 Kinesis 源**

   1. 对于**源**，选择 **Kinesis**。

   1. 对于 **Kinesis 流**，选择要用作源的流。

   1. 对于**起始位置**，请选择以下选项之一：
      + **最新** - 开始读取分片中包含最新记录的流。
      + **修剪水平线** - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。
      + **在时间戳处** - 从指定时间开始读取流。在 “**时间戳**” 下，使用和 hh: mm: ss 格式输入数据YYYY/MM/DD和时间。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**批次大小 - 可选**，输入每个批次的最大记录数。默认值是 10。

      1. （可选）对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

      1. 对于**每个分片的并发批次 - 可选**，输入同一分片中可以同时读取的批次数。

      1. 对于**部分批次项目失败时**，请选择以下选项：
         + **AUTOMATIC\$1BISECT** - 将每个批次一分为二，并分别重试，直到所有记录都处理完毕或批次中还剩下一条失败消息。
**注意**  
如果不选择 **AUTOMATIC\$1BISECT**，则可以返回特定的失败记录，只会重试这些记录。

------
#### [ Amazon MQ ]

**使用控制台配置 Amazon MQ 源**

   1. 对于**源**，选择 **Amazon MQ**。

   1. 对于 **Amazon MQ 代理**，选择要用作源的流。

   1. 对于**队列名称**，输入管道将要读取的队列名称。

   1. 对于**身份验证方法**，选择 **BASIC\$1AUTH**。

   1. 在**密钥**中选择密钥。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**批次大小 - 可选**，输入每个批次的最大消息数。默认值是 100。

      1. 对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

------
#### [ Amazon MSK ]

**使用控制台配置 Amazon MSK 源**

   1. 对于**源**，选择 **Amazon MSK**。

   1. 对于 **Amazon MSK 集群**，选择要使用的集群。

   1. 对于**主题名称**，输入管道将要读取的主题名称。

   1. （可选）对于**使用者组 ID**，输入希望管道加入的使用者组的 ID。

   1. （可选）对于**身份验证 - 可选**，启用**使用身份验证**并执行以下操作：

      1. 对于**身份验证方法**，选择所需的类型。

      1. 在**密钥**中选择密钥。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**批次大小 - 可选**，输入每个批次的最大记录数。默认值是 100。

      1. 对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

      1. 对于**起始位置**，请选择以下选项之一：
         + **最新** - 开始读取分片中包含最新记录的主题。
         + **修剪水平线** - 开始读取分片中包含最后一条未修剪过的记录的和题。这是分片中最早的记录。
**注意**  
**修剪水平线**与 Apache Kafka 中的**最早**相同。

------
#### [ Self managed Apache Kafka ]

**使用控制台配置自托管 Apache Kafka 源**

   1. 对于**源**，选择**自我托管式 Apache Kafka**。

   1. 对于**引导服务器**，输入代理的 `host:port` 配对地址。

   1. 对于**主题名称**，输入管道将要读取的主题名称。

   1. （可选）对于 **VPC**，选择所需的 VPC。然后，对于 **VPC 子网**，选择所需的子网。对于 **VPC 安全组**，选择安全组。

   1. （可选）对于**身份验证 - 可选**，启用**使用身份验证**并执行以下操作：

      1. 对于**身份验证方法**，选择身份验证类型。

      1. 在**密钥**中选择密钥。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**起始位置**，请选择以下选项之一：
         + **最新** - 开始读取分片中包含最新记录的流。
         + **修剪水平线** - 开始读取分片中包含最后一条未修剪过的记录的流。这是分片中最早的记录。

      1. 对于**批次大小 - 可选**，输入每个批次的最大记录数。默认值是 100。

      1. 对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

------
#### [ Amazon SQS ]

**使用控制台配置 Amazon SQS 源**

   1. 对于**源**，选择 **SQS**。

   1. 对于 **SQS 队列**，请选择要使用的队列。

   1. （可选）对于**其他设置 - 可选**，执行以下操作：

      1. 对于**批次大小 - 可选**，输入每个批次的最大记录数。默认值是 100。

      1. 对于**批次时段 - 可选**，输入在继续操作之前收集记录的最大秒数。

------

## 配置事件筛选（可选）
<a name="pipes-configure-filtering"></a>

您可以向管道添加筛选功能，这样就可以只将部分事件从源发送到目标。

**使用控制台配置筛选**

1. 选择**筛选**。

1. 在**示例事件 - 可选**下，您将看到一个示例事件，可以使用它来构建您自己的事件模式，您也可以选择**输入您自己的**，输入您自己的事件。

1. 在**事件模式**下，输入要用于筛选事件的事件模式。有关构造筛选条件的更多信息，请参阅 [Amazon P EventBridge ipes 中的事件筛选](eb-pipes-event-filtering.md)。

   以下是一个事件模式示例，仅发送**城市**字段中的值为**西雅图**的事件。

   ```
   {
     "data": {
       "City": ["Seattle"]
     }
   }
   ```

现在已对事件进行筛选，您可以为管道添加可选的富集和目标。

## 定义事件富集（可选）
<a name="pipes-define-enrichment"></a>

您可以将用于丰富的事件数据发送到 Lambda 函数、 AWS Step Functions 状态机、Amazon API Gateway 或 API 目标。

**选择富集**

1. 选择**富集**。

1. 在**详细信息**下，为**服务**选择要用于富集的服务和相关设置。

您也可以先转换数据，再发送以进行增强。

**（可选）定义输入转换器**

1. 选择**富集输入转换器 - 可选**。

1. 在 “**示例负 events/Event 载**” 中，选择示例事件类型。

1. 对于**转换器**，请输入转换器语法，例如 `"Event happened at <$.detail.field>."`，其中 `<$.detail.field>` 是对示例事件中某个字段的引用。您也可以双击示例事件中的某个字段，将其添加到转换器中。

1. 对于**输出**，请确认输出符合您的要求。

现在，数据已经过筛选和增强，您必须定义要将事件数据发送到的目标。

## 配置目标
<a name="pipes-configure-target"></a>

**配置目标**

1. 选择**目标**。

1. 在**详细信息**下，为**目标服务**选择目标。显示的字段因您选择的目标而异。根据需要输入此目标类型的特定信息。

您也可以先转换数据，再发送到目标。

**（可选）定义输入转换器**

1. 选择**目标输入变压器 - 可选**。

1. 在 “**示例负 events/Event 载**” 中，选择示例事件类型。

1. 对于**转换器**，请输入转换器语法，例如 `"Event happened at <$.detail.field>."`，其中 `<$.detail.field>` 是对示例事件中某个字段的引用。您也可以双击示例事件中的某个字段，将其添加到转换器中。

1. 对于**输出**，请确认输出符合您的要求。

现在，管道已配置完毕，请确保其设置配置正确。

## 配置管道设置
<a name="pipes-configure-pipe-settings"></a>

默认情况下管道处于活动状态，但您可以将其停用。您还可以指定管道的权限、设置管道日志记录和添加标签。

**配置管道设置**

1. 选择**管道设置**选项卡。

1. 默认情况下，新创建的管道创建好就处于活动状态。如果要创建非活动管道，请在**激活**下，为**激活管道**关闭**活动**。

1. 在**权限**下，对于**执行角色**，执行以下操作之一：

   1. 要为此管道 EventBridge 创建新的执行角色，请选择**为此特定资源创建新角色。**在**角色名称**下，您可以选择编辑角色名称。

   1. 要使用现有的执行角色，请选择**使用现有角色**。在**角色名称**下，选择角色。

1. （可选）如果您已将 Kinesis 或 DynamoDB 流指定为管道源，则可以配置重试策略和死信队列 (DLQ)。

   对于**重试策略和死信队列 - 可选**，请执行以下操作：

   在**重试策略**下，执行以下操作：

   1. 如果要启用重试策略，请打开**重试**。默认情况下，新创建的管道未启用重试策略。

   1. 对于 **Maximum age of event**（事件的最大时长），输入一分钟（00:01）与 24 小时（24:00）之间的值。

   1. 对于**重试尝试**，输入 0 到 185 之间的数字。

   1. 如果要使用死信队列 (DLQ)，请打开**死信队列**，选择您的方法，然后选择要使用的队列或主题。默认情况下，新创建的管道不使用 DLQ。

1. 选择加密管道数据时 EventBridge 要使用的。 KMS key 

   有关如何 EventBridge 使用的更多信息 KMS keys，请参阅[静态加密](eb-data-protection.md#eb-encryption-at-rest)。
   + 选择 “** AWS 拥有的密钥使用**” EventBridge 以使用加密数据 AWS 拥有的密钥。

      AWS 拥有的密钥 这是一 KMS key 款 EventBridge 拥有和管理的账户，可在多个 AWS 账户中使用。通常， AWS 拥有的密钥 是一个不错的选择，除非您需要审计或控制保护资源的加密密钥。

     这是默认值。
   + 选择 “**用 客户自主管理型密钥** EventBridge 于”，使用您指定或创建 客户自主管理型密钥 的对数据进行加密。

     客户自主管理型密钥 KMS keys 位于您创建、拥有和管理的 AWS 账户中。您对这些 KMS keys拥有完全控制权。

     1. 指定现有的 客户自主管理型密钥，或选择 “**新建**” KMS key。

       EventBridge 显示密钥状态以及与指定 客户自主管理型密钥密钥关联的所有密钥别名。

1. （可选）在**日志 - 可选**下，您可以设置 EventBridge Pipes 如何向支持的服务发送日志信息，包括如何配置这些日志。

   有关管道记录日志的更多信息，请参阅 [记录 Amazon Pi EventBridge pes 的性能](eb-pipes-logs.md)。

   CloudWatch 默认情况下，日志被选为日志目标，`ERROR`日志级别也是如此。因此，默认情况下，Pip EventBridge es 会创建一个新的 CloudWatch 日志组，向该组发送包含详细`ERROR`级别的日志记录。

   要让 Pip EventBridge es 将日志记录发送到任何支持的日志目的地，请执行以下操作：

   1. 在**日志-可选**下，选择要将日志记录传送到的目的地。

   1. 对于**日志级别**，选择 EventBridge 要包含在日志记录中的信息级别。默认情况下选中 `ERROR` 日志级别。

      有关更多信息，请参阅 [指定 EventBridge 管道日志级别](eb-pipes-logs.md#eb-pipes-logs-level)。

   1. 如果 EventBridge 要在日志记录中包含事件负载信息以及服务请求和响应信息，请选择 “包括**执行数据**”。

      有关更多信息，请参阅 [在 Pip EventBridge es 日志中包含执行数据](eb-pipes-logs.md#eb-pipes-logs-execution-data)。

   1. 配置您选择的每个日志目标：

      对于 CloudWatch Logs 日志，请在**CloudWatch 日志**下执行以下操作：
      + 对于**CloudWatch 日志组**，请选择是否 EventBridge 创建新的日志组，或者您可以选择现有日志组或指定现有日志组的 ARN。
      + 对于新的日志组，请根据需要编辑日志组名称。

      CloudWatch 默认情况下，日志处于选中状态。

      对于 Firehose 流日志，在**Firehose 流日志**下，选择该 Firehose 流。

      对于 Amazon S3 日志，在 **S3 日志**下执行以下操作：
      + 输入要用作日志目标的桶的名称。
      + 输入存储桶所有者的 AWS 账户 ID。
      + 输入 EventBridge 创建 S3 对象时要使用的任何前缀文本。

        有关更多信息，请参阅《Amazon Simple Storage Service 用户指南》中的**[使用前缀组织对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html)。
      + 选择 EventBridge 要如何格式化 S3 日志记录：
        + `json`：JSON 
        + `plain`：纯文本
        + `w3c`：[W3C 扩展日志记录文件格式](https://www.w3.org/TR/WD-logfile)

1. （可选）在**标签 - 可选**下，选择**添加新标签**，然后为规则输入一个或多个标签。有关更多信息，请参阅 [在 Amazon 中为资源添加标签 EventBridge](eb-tagging.md)。

1. 选择**创建管道**。

## 验证配置参数
<a name="pipes-validation"></a>

创建管道后， EventBridge 验证以下配置参数：
+ **IAM 角色** — 由于创建管道后无法更改管道的来源，因此需要 EventBridge 验证提供的 IAM 角色是否可以访问该源。
**注意**  
EventBridge 不会对浓缩或目标执行相同的验证，因为它们可以在创建管道后进行更新。
+ **批处理**- EventBridge 验证源的批量大小是否超过目标的最大批次大小。如果是，则 EventBridge 需要较小的批次大小。此外，如果目标不支持批处理，则无法 EventBridge 为源配置批处理。
+ **扩充** — EventBridge 验证 API Gateway 和 API 目标丰富版的批量大小是否为 1，因为仅支持批量大小为 1。