

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 入门：创建 Amazon EventBridge 管道
<a name="pipes-get-started"></a>

为了熟悉管道及其功能，我们将使用 CloudFormation 模板来设置 EventBridge 管道和相关组件。随后，我们便可以探索管道的各项特征。

**提示**  
要获得更全面的动手学习体验，可以试试 Pipes Works [EventBridge hop](https://catalog.workshops.aws/eb-pipes)。本次交互式研讨会将引导您完成通过 Lambda 扩充功能将 DynamoDB 连接到 API Gateway 的管道构建和故障排除。

该模板创建了一个 EventBridge 管道，用于将来自 DynamoDB 表的流连接到 Amazon SQS 队列。每当数据库表中创建或修改记录时，管道都会将生成的事件发送到该队列。

部署的管道包含以下组件：
+ 一个作为管道源的 DynamoDB 表（及数据流），以及一个作为管道目标的 Amazon SQS 队列。
+ 一个执行角色，用于授予 EventBridge 访问 DynamoDB 表和 Amazon SQS 队列的必要权限。
+ 管道本身，其中包含一个事件筛选条件，仅选择在表项创建（插入）或修改时生成的事件。

有关模板的具体技术细节，请参阅 [模板详细信息](#pipes-get-started-template-details)。

![\[数据库事件会与筛选条件进行匹配，符合条件则发送至队列。\]](http://docs.aws.amazon.com/zh_cn/eventbridge/latest/userguide/images/pipes-get-started_eventbridge_architectural.svg)


## 使用创建管道 CloudFormation
<a name="pipes-get-started-create"></a>

要创建管道及其关联资源，我们将创建一个 CloudFormation 模板并使用它来创建一个包含示例管道的堆栈，其中包含源和目标。

**重要**  
如果您通过此模板创建堆栈，则需为使用的 Amazon 资源支付相应费用。

### 创建模板
<a name="pipes-get-started-file"></a>

首先，创建 CloudFormation 模板。

1. 在 [模板](#pipes-get-started-template) 部分中，单击 **JSON** 或 **YAML** 选项卡上的复制图标，以复制模板内容。

1. 将模板内容粘贴到新文件中。

1. 将该文件保存在本地。

### 创建堆栈
<a name="pipes-get-started-stack"></a>

接下来，使用您保存的模板来配置堆 CloudFormation 栈。

1. 打开 CloudFormation 控制台，网址为[https://console.aws.amazon.com/cloudformation/](https://console.aws.amazon.com/cloudformation/)。

1. 在**堆栈**页面，从**创建堆栈**菜单中选择**（采用新资源（标准））**。

1. 指定模板：

   1. 在**先决条件**下，选择**选择现有模板**。

   1. 在**指定模板**下，选择**上传模板文件**。

   1. 选择**选择文件**，导航到模板文件并将其选中。

   1. 选择**下一步**。

1. 指定堆栈详细信息：

   1. 输入堆栈名称。

   1. 对于其他参数，可接受默认值或输入自定义值。

   1. 选择**下一步**。

1. 配置堆栈选项：

   1. 在**堆栈故障选项**下，选择**删除所有新创建的资源**。
**注意**  
如果选择此选项，即使堆栈创建失败，您可能也不会因为删除策略规定保留资源而向资源付费。有关更多信息，请参阅《CloudFormation 用户指南》**中的 [`DeletionPolicy` 属性](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-attribute-deletionpolicy.html)。

   1. 接受其他所有默认值。

   1. 在 “**能力**” 下，选中复选框以确认 CloudFormation 可能会在您的账户中创建 IAM 资源。

   1. 选择**下一步**。

1. 查看堆栈详细信息并选择**提交**。

**使用 CloudFormation (AWS CLI) 创建堆栈**

您也可以使用 AWS CLI 来创建堆栈。
+ 使用 [https://docs.aws.amazon.com/cli/latest/reference/cloudformation/create-stack.html](https://docs.aws.amazon.com/cli/latest/reference/cloudformation/create-stack.html) 命令。
  + 接受模板参数的默认值，同时指定堆栈名称。使用 `template-body` 参数传递模板内容，或使用 `template-url` 指定 URL 位置。

    ```
    aws cloudformation create-stack \
      --stack-name eventbridge-rule-tutorial \
      --template-body template-contents \
      --capabilities CAPABILITY_IAM
    ```
  + 覆盖一个或多个模板参数的默认值。例如：

    ```
    aws cloudformation create-stack \
      --stack-name eventbridge-rule-tutorial \
      --template-body template-contents \
      --parameters \
        ParameterKey=SourceTableName,ParameterValue=pipe-example-source \
        ParameterKey=TargetQueueName,ParameterValue=pipe-example-target \
        ParameterKey=PipeName,ParameterValue=pipe-with-filtering-example \
      --capabilities CAPABILITY_IAM
    ```

CloudFormation 创建堆栈。一旦堆栈创建完成之后，堆栈资源就即可投入使用。可使用堆栈详细信息页面上的**资源**选项卡来查看账户中预置的资源。

## 探索管道功能
<a name="pipes-get-started-using"></a>

创建管道后，您可以使用 EventBridge 控制台观察管道操作和测试事件传送。

1. 在[https://console.aws.amazon.com/events/家打开 EventBridge 控制台？ \$1/p](https://console.aws.amazon.com/events/home?#/pipes) ipes。

1. 选择您创建的管道。

   在管道详细信息页面上，**管道组件**部分会显示构成管道的资源，并包含提供各组件更多详细信息的选项卡。  
![\[管道详细信息页面以图形方式显示管道的源、筛选器和目标组件。\]](http://docs.aws.amazon.com/zh_cn/eventbridge/latest/userguide/images/pipes-get-started_eventbridge_pipe-detail.png)

   您可以在**设置**选项卡的**权限**部分中，找到我们为该管道创建的执行角色。

### 检查管道筛选器
<a name="pipes-get-started-using-filter"></a>

在测试管道运行状况之前，我们先来检查一下用于控制将哪些事件发送到目标的筛选器。管道只会将符合筛选条件的事件发送到目标，所有其他事件都将被丢弃。本示例中，我们仅希望将创建或修改表条目时生成的事件发送到 Amazon SQS 队列。
+ 在管道详细信息页面的**管道组件**下，选择**筛选**选项卡。

  我们已添加了一个筛选器，该筛选器仅选择 `eventName` 设置为 `INSERT` 或 `MODIFY` 的事件。

  ```
  {
    "eventName": ["INSERT", "MODIFY"]
  }
  ```

### 通过管道发送事件
<a name="pipes-get-started-using-source"></a>

接下来，我们将在管道源中生成事件，以测试管道的过滤和传送功能是否正常运行。为此，我们将在指定为管道源的 DynamoDB 表中创建并编辑一个表项。

1. 在管道详细信息页面的**管道组件**下，选择**源**选项卡。

1. 在**源**下，选择 DynamoDB 数据流名称。

   这将在单独的窗口中打开 DynamoDB 控制台，并显示源表的详细信息。

1. 选择 “**探索物品**”。

1. 通过在表中创建项目来生成 `INSERT` 事件：

   1. 选择**创建项目**。

   1. 为 **Album** 和 **Artist** 属性添加值。

   1. 选择**创建项目**。

1. 通过编辑项目生成 `DELETE` 和 `INSERT` 事件：

   1. 从列表中选择该项目，然后从**操作**菜单中选择**编辑项目**。

   1. 为 **Album** 或 **Artist** 属性输入新值。

   1. 勾选确认您要更改项目键值的复选框，然后选择**重新创建项目**。

      此操作会先删除该表项，再重新创建，从而生成一个 `DELETE` 事件，接着生成一个新的 `INSERT` 事件。

1. 通过向项目添加属性来生成 `MODIFY` 事件：

   1. 从列表中选择该项目，然后从**操作**菜单中选择**编辑项目**。

   1. 从**添加新属性**菜单中，选择 **Number**。

   1. 在属性名称中输入 **Year**，然后为该属性输入值。选择**保存并关闭**。

### 确认事件通过管道成功传送
<a name="pipes-get-started-using-target"></a>

最后，我们将验证管道是否成功筛选并传送了在 DynamoDB 中创建和编辑表项时生成的事件。

1. 在管道详细信息页面的**管道组件**下，选择**目标**选项卡。

1. 在**目标**下，选择 Amazon SQS 队列名称。

   这将在单独的窗口中打开 Amazon SQS 控制台，并显示目标队列的详细信息。

1. 选择**发送和接收消息**。

1. 在**接收消息**下，选择**轮询消息**。

   Amazon SQS 会将收到的消息加载到队列中。单击各个消息可查看其详细信息。

   队列中应有三条事件消息：
   + 两条 `INSERT` 类型消息，一条是您首次创建表项时生成的，另一条是您通过更改键值重新创建表项时生成的。
   + 一条 `MODIFY` 类型消息，是您向表项添加属性时生成的。

   请注意，队列中没有 `DELETE` 类型的事件消息，尽管在您通过更改键值删除并重新创建表项时生成了一个该类事件。这是因为我们指定的管道筛选器仅筛选 `INSERT` 和 `MODIFY` 类型事件，因此管道会过滤掉 `DELETE` 事件，而不会将其传送到队列。

## 清理：删除资源
<a name="pipes-get-started-delete"></a>

最后一步，我们将删除该堆栈及其包含的资源。

**重要**  
只要堆栈存在，您就需要为堆栈中包含的 Amazon 资源付费。

1. 打开 CloudFormation 控制台，网址为[https://console.aws.amazon.com/cloudformation/](https://console.aws.amazon.com/cloudformation/)。

1. 在**堆栈**页面上，选择根据该模板创建的堆栈，选择**删除**，然后确认**删除**。

   CloudFormation 启动删除堆栈及其包含的所有资源。

## CloudFormation 模板详情
<a name="pipes-get-started-template-details"></a>

此模板会在您的账户中创建资源并授予相应权限。

### 资源
<a name="pipes-get-started-template-resources"></a>

本教程的 CloudFormation 模板将在您的账户中创建以下资源：

**重要**  
如果您通过此模板创建堆栈，则需为使用的 Amazon 资源支付相应费用。
+ [https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html)：充当管道事件源的 DynamoDB 表。
+ [https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sqs-queue.html](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sqs-queue.html)：Amazon SQS 队列，用作流经管道的事件的目标。
+ [https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html)：一个 IAM 执行角色，用于向您的账户中的 Pip EventBridge es 服务授予权限。
+ [https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html)：连接 DynamoDB 表和 Amazon SQS 队列的管道。

### Permissions
<a name="pipes-get-started-template-perms"></a>

该模板包含一个代表执行角色的 `AWS::IAM::Role` 资源。此角色在您的账户中向 Pip EventBridge es 服务 (`pipes.amazonaws.com`) 授予以下权限。

以下权限的作用范围限定于模板创建的作为管道事件源的 DynamoDB 表及其数据流：
+ `dynamodb:DescribeStream`
+ `dynamodb:GetRecords`
+ `dynamodb:GetShardIterator`
+ `dynamodb:ListStreams`

以下权限的作用范围限定于堆栈创建的作为管道目标的 Amazon SQS 队列：
+ `sqs:SendMessage`

## CloudFormation 模板
<a name="pipes-get-started-template"></a>

将以下 JSON 或 YAML 代码另存为单独文件，用作本教程的 CloudFormation 模板。

------
#### [ JSON ]

```
{
  "AWSTemplateFormatVersion": "2010-09-09",

 "Description" : "[AWSDocs] EventBridge: pipes-get-started",

  "Parameters" : {
    "SourceTableName" : {
      "Type" : "String",
      "Default" : "pipe-example-source",
      "Description" : "Specify the name of the table to provision as the pipe source, or accept the default."
    },
  "TargetQueueName" : {
    "Type" : "String",
    "Default" : "pipe-example-target",
    "Description" : "Specify the name of the queue to provision as the pipe target, or accept the default."
  },
    "PipeName" : {
      "Type" : "String",
      "Default" : "pipe-with-filtering-example",
      "Description" : "Specify the name of the table to provision as the pipe source, or accept the default."
    }
},
  "Resources": {
    "PipeSourceDynamoDBTable": {
      "Type": "AWS::DynamoDB::Table",
      "Properties": {
        "AttributeDefinitions": [{
            "AttributeName": "Album",
            "AttributeType": "S"
          },
          {
            "AttributeName": "Artist",
            "AttributeType": "S"
          }

        ],
        "KeySchema": [{
            "AttributeName": "Album",
            "KeyType": "HASH"

          },
          {
            "AttributeName": "Artist",
            "KeyType": "RANGE"
          }
        ],
        "ProvisionedThroughput": {
          "ReadCapacityUnits": 10,
          "WriteCapacityUnits": 10
        },
        "StreamSpecification": {
          "StreamViewType": "NEW_AND_OLD_IMAGES"
        },
        "TableName": { "Ref" : "SourceTableName" }
      }
    },
    "PipeTargetQueue": {
      "Type": "AWS::SQS::Queue",
      "Properties": {
        "QueueName": { "Ref" : "TargetQueueName" }
      }
    },
    "PipeTutorialPipeRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [{
            "Effect": "Allow",
            "Principal": {
              "Service": "pipes.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
              "StringLike": {
                "aws:SourceArn": {
                  "Fn::Join": [
                    "",
                    [
                      "arn:",
                      { "Ref": "AWS::Partition" },
                      ":pipes:",
                      { "Ref": "AWS::Region" },
                      ":",
                      { "Ref": "AWS::AccountId" },
                      ":pipe/",
                      { "Ref": "PipeName" }
                    ]
                  ]
                },
                "aws:SourceAccount": { "Ref" : "AWS::AccountId" }
              }
            }
          }]
        },
        "Description" : "EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe.",
        "Path": "/",
        "Policies": [{
            "PolicyName": "SourcePermissions",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [{
                "Effect": "Allow",
                "Action": [
                  "dynamodb:DescribeStream",
                  "dynamodb:GetRecords",
                  "dynamodb:GetShardIterator",
                  "dynamodb:ListStreams"
                ],
                "Resource": [
                  { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] }
                ]
              }]
            }
          },
          {
            "PolicyName": "TargetPermissions",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [{
                "Effect": "Allow",
                "Action": [
                  "sqs:SendMessage"
                ],
                "Resource": [
                  { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] }
                ]
              }]
            }
          }
        ]
      }
  },
    "PipeWithFiltering": {
      "Type": "AWS::Pipes::Pipe",
      "Properties": {
        "Description" : "EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue.",
        "Name": { "Ref" : "PipeName" },
        "RoleArn": {"Fn::GetAtt" : ["PipeTutorialPipeRole", "Arn"] },
        "Source": { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] },
        "SourceParameters": {
          "DynamoDBStreamParameters" : {
            "StartingPosition" : "LATEST"
         },
        "FilterCriteria" : {
          "Filters" : [ {
            "Pattern" : "{ \"eventName\": [\"INSERT\", \"MODIFY\"] }"
         }]
        }
        },
        "Target": { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] }
      }
    }
  }
}
```

------
#### [ YAML ]

```
AWSTemplateFormatVersion: '2010-09-09'
Description: '[AWSDocs] EventBridge: pipes-get-started'
Parameters:
  SourceTableName:
    Type: String
    Default: pipe-example-source
    Description: Specify the name of the table to provision as the pipe source, or accept the default.
  TargetQueueName:
    Type: String
    Default: pipe-example-target
    Description: Specify the name of the queue to provision as the pipe target, or accept the default.
  PipeName:
    Type: String
    Default: pipe-with-filtering-example
    Description: Specify the name of the table to provision as the pipe source, or accept the default.
Resources:
  PipeSourceDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: Album
          AttributeType: S
        - AttributeName: Artist
          AttributeType: S
      KeySchema:
        - AttributeName: Album
          KeyType: HASH
        - AttributeName: Artist
          KeyType: RANGE
      ProvisionedThroughput:
        ReadCapacityUnits: 10
        WriteCapacityUnits: 10
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      TableName: !Ref SourceTableName
  PipeTargetQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Ref TargetQueueName
  PipeTutorialPipeRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: pipes.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringLike:
                aws:SourceArn: !Join
                  - ''
                  - - 'arn:'
                    - !Ref AWS::Partition
                    - ':pipes:'
                    - !Ref AWS::Region
                    - ':'
                    - !Ref AWS::AccountId
                    - ':pipe/'
                    - !Ref PipeName
                aws:SourceAccount: !Ref AWS::AccountId
      Description: EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe.
      Path: /
      Policies:
        - PolicyName: SourcePermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource:
                  - !GetAtt PipeSourceDynamoDBTable.StreamArn
        - PolicyName: TargetPermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource:
                  - !GetAtt PipeTargetQueue.Arn
  PipeWithFiltering:
    Type: AWS::Pipes::Pipe
    Properties:
      Description: EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue.
      Name: !Ref PipeName
      RoleArn: !GetAtt PipeTutorialPipeRole.Arn
      Source: !GetAtt PipeSourceDynamoDBTable.StreamArn
      SourceParameters:
        DynamoDBStreamParameters:
          StartingPosition: LATEST
        FilterCriteria:
          Filters:
            - Pattern: '{ "eventName": ["INSERT", "MODIFY"] }'
      Target: !GetAtt PipeTargetQueue.Arn
```

------