入门:创建 Amazon EventBridge 管道 - Amazon EventBridge

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

入门:创建 Amazon EventBridge 管道

为了熟悉管道及其功能,我们将使用 AWS CloudFormation 模板来设置 EventBridge 管道和相关组件。然后我们可以探索各种管道特征。

该模板创建了一个 EventBridge 管道,用于将来自 DynamoDB 表的流连接到 Amazon SQS 队列。每次在数据库表中创建或修改记录时,管道都会将生成的事件发送到队列。

部署的管道包括:

  • 一个 DynamoDB 表(和流)作为管道源,一个 Amazon SQS 队列作为目标。

  • 一个执行角色,用于授予 EventBridge 访问 DynamoDB 表和 Amazon SQS 队列的必要权限。

  • 管道本身,它包含一个事件过滤器,该过滤器仅选择创建(插入)或修改表项目时生成的事件。

有关模板的具体技术细节,请参阅模板详情

数据库事件与筛选器匹配,如果匹配则发送到队列。

使用创建管道 CloudFormation

要创建管道及其关联资源,我们将创建一个 CloudFormation 模板并使用它来创建一个包含示例管道的堆栈,其中包含源和目标。

重要

如果您使用此模板创建堆栈,则需要为所使用的 Amazon 资源付费。

首先,创建 CloudFormation 模板。

  1. 在该模板部分中,单击 JSONYAML 选项卡上的复制图标以复制模板内容。

  2. 将模板内容粘贴到新文件中。

  3. 将该文件保存在本地。

接下来,使用您保存的模板来配置堆 CloudFormation 栈。

  1. 打开 CloudFormation 控制台,网址为https://console.aws.amazon.com/cloudformation/

  2. 堆栈页面上,从创建堆栈菜单中选择使用新资源(标准)

  3. 指定模板:

    1. 在 “先决条件” 下,选择选择现有模板

    2. 指定模板下,选择上传模板文件

    3. 选择 “选择文件”,导航到模板文件并将其选中。

    4. 选择下一步

  4. 指定堆栈详细信息:

    1. 输入堆栈名称。

    2. 对于参数,请接受默认值或输入自己的值。

    3. 选择下一步

  5. 配置堆栈选项:

    1. 堆栈故障选项下,选择删除所有新创建的资源

      注意

      如果选择此选项,则即使堆栈创建失败,也不会因为删除策略规定保留这些资源的费用而向您收费。有关更多信息,请参阅《CloudFormation 用户指南》中的DeletionPolicy属性

    2. 接受所有其他默认值。

    3. 在 “能力” 下,选中复选框以确认 CloudFormation 可能会在您的账户中创建 IAM 资源。

    4. 选择下一步

  6. 查看堆栈详细信息并选择提交

使用 CloudFormation (AWS CLI) 创建堆栈

您也可以使用 AWS CLI 来创建堆栈。

  • 使用 create-stack 命令。

    • 接受默认模板参数值,指定堆栈名称。使用template-body参数传递模板内容或template-url指定 URL 位置。

      aws cloudformation create-stack \ --stack-name eventbridge-rule-tutorial \ --template-body template-contents \ --capabilities CAPABILITY_IAM
    • 覆盖一个或多个模板参数的默认值。例如:

      aws cloudformation create-stack \ --stack-name eventbridge-rule-tutorial \ --template-body template-contents \ --parameters \ ParameterKey=SourceTableName,ParameterValue=pipe-example-source \ ParameterKey=TargetQueueName,ParameterValue=pipe-example-target \ ParameterKey=PipeName,ParameterValue=pipe-with-filtering-example \ --capabilities CAPABILITY_IAM

CloudFormation 创建堆栈。堆栈创建完成后,堆栈资源就可以使用了。您可以使用堆栈详细信息页面上的资源选项卡来查看您的账户中配置的资源。

探索管道功能

创建管道后,您可以使用 EventBridge 控制台观察管道操作和测试事件传送。

  1. https://console.aws.amazon.com/events/家打开 EventBridge 控制台? #/p ipes。

  2. 选择您创建的管道。

    在管道详细信息页面上,“管道组件” 部分显示了构成管道的资源,并包含提供每个组件更多详细信息的选项卡。

    管道详细信息页面以图形方式显示管道的源组件、过滤器和目标组件。

    你可以在 “设置” 选项卡的 “权限” 部分中找到我们为管道创建的执行角色。

检查管道过滤器

在测试管道操作之前,让我们检查一下我们为控制将哪些事件发送到目标而指定的过滤器。管道只会向目标发送符合过滤条件的事件;所有其他事件都将被丢弃。在这种情况下,我们只希望将创建或修改表条目时生成的事件发送到 Amazon SQS 队列。

  • 在管道详细信息页面的 “管道组件” 下,选择 “筛选” 选项卡。

    我们添加了一个过滤器,该过滤器仅选择设置eventNameINSERT或的事件MODIFY

    { "eventName": ["INSERT", "MODIFY"] }

通过管道发送事件

接下来,我们将在管道源中生成事件,以测试管道过滤和交付是否正常运行。为此,我们将在指定为管道源的 DynamoDB 表中创建和编辑一个项目。

  1. 在管道详细信息页面的 “管道组件” 下,选择 “” 选项卡。

  2. 在 “来源” 下,选择 DynamoDB 直播名称。

    这将在单独的窗口中打开 DynamoDB 控制台,并显示源表的详细信息。

  3. 选择 Explore table items(浏览表项目)。

  4. 通过在表中创建项目来生成INSERT事件:

    1. 选择创建项目

    2. 为 “专辑” 和 “艺术家” 属性添加值。

    3. 选择创建项目

  5. 通过编辑项目生成DELETEINSERT事件:

    1. 从列表中选择项目,然后从 “操作” 菜单中选择 “编辑项目”

    2. 为 “专辑” 或 “艺术家” 属性输入新值。

    3. 勾选确认您正在更改项目键值的复选框,然后选择 “重新创建项目”。

      这会导致项目被删除,然后重新创建,生成一个DELETE事件,然后生成一个新INSERT事件。

  6. 通过向项目添加属性来生成MODIFY事件:

    1. 从列表中选择项目,然后从 “操作” 菜单中选择 “编辑项目”

    2. 从 “添加新属性” 菜单中,选择 “数字”。

    3. 在属性名称中输入 Year,然后输入该属性的值。选择保存并关闭

通过管道确认事件传送

最后,我们将确认管道成功筛选并传送了我们通过在 DynamoDB 中创建和编辑表格项目而生成的事件。

  1. 在管道详细信息页面的 “管道组件” 下,选择 “目标” 选项卡。

  2. 在 “目标” 下,选择 Amazon SQS 队列名称。

    这将在单独的窗口中打开 Amazon SQS 控制台,并显示目标队列的详细信息。

  3. 选择发送和接收消息

  4. 在 “接收消息” 下,选择 “轮询留言”。

    Amazon SQS 将收到的消息加载到队列中。单击单个消息以查看其详细信息。

    队列中应该有三条事件消息:

    • 两种类型INSERT,一种是在您首次创建表项目时生成的,另一种是在您通过更改键值重新创建该项目时生成的。

    • 类型之一MODIFY,在向表格项目添加属性时生成。

    请注意,队列DELETE中没有类型的事件消息,尽管事件消息是在您通过更改键值删除和重新创建表格项目时生成的。我们指定的管道过滤器仅选择 on INSERT an MODIFY d,因此管道会过滤掉DELETE事件,而不是将其传送到队列。

清理:删除资源

最后一步,我们将删除堆栈及其包含的资源。

重要

只要堆栈中包含的 Amazon 资源存在,您就需要为其付费。

  1. 打开 CloudFormation 控制台,网址为https://console.aws.amazon.com/cloudformation/

  2. 堆栈页面上,选择根据模板创建的堆栈,选择删除,然后确认删除

    CloudFormation 启动删除堆栈及其包含的所有资源。

CloudFormation 模板详情

此模板在您的账户中创建资源并授予权限。

资源

本教程的 CloudFormation 模板将在您的账户中创建以下资源:

重要

如果您使用此模板创建堆栈,则需要为所使用的 Amazon 资源付费。

  • AWS::DynamoDB::Table: 充当管道事件源的 DynamoDB 表。

  • AWS::SQS::Queue:一个 Amazon SQS 队列,它充当流经管道的事件的目标。

  • AWS::IAM::Role:一个 IAM 执行角色,用于向您的账户中的 Pip EventBridge es 服务授予权限。

  • AWS::Pipes::Pipe: 连接 DynamoDB 表和亚马逊 SQS 队列的管道。

权限

该模板包含一个代表执行角色的AWS::IAM::Role资源。此角色在您的账户中向 Pip EventBridge es 服务 (pipes.amazonaws.com) 授予以下权限。

以下权限仅限于 DynamoDB 表和模板作为管道事件源创建的流式传输:

  • dynamodb:DescribeStream

  • dynamodb:GetRecords

  • dynamodb:GetShardIterator

  • dynamodb:ListStreams

以下权限仅限于堆栈创建的作为管道目标的 Amazon SQS 队列:

  • sqs:SendMessage

CloudFormation 模板

将以下 JSON 或 YAML 代码另存为单独文件,用作本教程的 CloudFormation 模板。

JSON
{ "AWSTemplateFormatVersion": "2010-09-09", "Description" : "[AWSDocs] EventBridge: pipes-get-started", "Parameters" : { "SourceTableName" : { "Type" : "String", "Default" : "pipe-example-source", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." }, "TargetQueueName" : { "Type" : "String", "Default" : "pipe-example-target", "Description" : "Specify the name of the queue to provision as the pipe target, or accept the default." }, "PipeName" : { "Type" : "String", "Default" : "pipe-with-filtering-example", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." } }, "Resources": { "PipeSourceDynamoDBTable": { "Type": "AWS::DynamoDB::Table", "Properties": { "AttributeDefinitions": [{ "AttributeName": "Album", "AttributeType": "S" }, { "AttributeName": "Artist", "AttributeType": "S" } ], "KeySchema": [{ "AttributeName": "Album", "KeyType": "HASH" }, { "AttributeName": "Artist", "KeyType": "RANGE" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 10, "WriteCapacityUnits": 10 }, "StreamSpecification": { "StreamViewType": "NEW_AND_OLD_IMAGES" }, "TableName": { "Ref" : "SourceTableName" } } }, "PipeTargetQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": { "Ref" : "TargetQueueName" } } }, "PipeTutorialPipeRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "Service": "pipes.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringLike": { "aws:SourceArn": { "Fn::Join": [ "", [ "arn:", { "Ref": "AWS::Partition" }, ":pipes:", { "Ref": "AWS::Region" }, ":", { "Ref": "AWS::AccountId" }, ":pipe/", { "Ref": "PipeName" } ] ] }, "aws:SourceAccount": { "Ref" : "AWS::AccountId" } } } }] }, "Description" : "EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe.", "Path": "/", "Policies": [{ "PolicyName": "SourcePermissions", "PolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": [ { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] } ] }] } }, { "PolicyName": "TargetPermissions", "PolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "sqs:SendMessage" ], "Resource": [ { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] } ] }] } } ] } }, "PipeWithFiltering": { "Type": "AWS::Pipes::Pipe", "Properties": { "Description" : "EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue.", "Name": { "Ref" : "PipeName" }, "RoleArn": {"Fn::GetAtt" : ["PipeTutorialPipeRole", "Arn"] }, "Source": { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] }, "SourceParameters": { "DynamoDBStreamParameters" : { "StartingPosition" : "LATEST" }, "FilterCriteria" : { "Filters" : [ { "Pattern" : "{ \"eventName\": [\"INSERT\", \"MODIFY\"] }" }] } }, "Target": { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] } } } } }
YAML
AWSTemplateFormatVersion: '2010-09-09' Description: '[AWSDocs] EventBridge: pipes-get-started' Parameters: SourceTableName: Type: String Default: pipe-example-source Description: Specify the name of the table to provision as the pipe source, or accept the default. TargetQueueName: Type: String Default: pipe-example-target Description: Specify the name of the queue to provision as the pipe target, or accept the default. PipeName: Type: String Default: pipe-with-filtering-example Description: Specify the name of the table to provision as the pipe source, or accept the default. Resources: PipeSourceDynamoDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: Album AttributeType: S - AttributeName: Artist AttributeType: S KeySchema: - AttributeName: Album KeyType: HASH - AttributeName: Artist KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 10 WriteCapacityUnits: 10 StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES TableName: !Ref SourceTableName PipeTargetQueue: Type: AWS::SQS::Queue Properties: QueueName: !Ref TargetQueueName PipeTutorialPipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: pipes.amazonaws.com Action: sts:AssumeRole Condition: StringLike: aws:SourceArn: !Join - '' - - 'arn:' - !Ref AWS::Partition - ':pipes:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':pipe/' - !Ref PipeName aws:SourceAccount: !Ref AWS::AccountId Description: EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe. Path: / Policies: - PolicyName: SourcePermissions PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: - !GetAtt PipeSourceDynamoDBTable.StreamArn - PolicyName: TargetPermissions PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - sqs:SendMessage Resource: - !GetAtt PipeTargetQueue.Arn PipeWithFiltering: Type: AWS::Pipes::Pipe Properties: Description: EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue. Name: !Ref PipeName RoleArn: !GetAtt PipeTutorialPipeRole.Arn Source: !GetAtt PipeSourceDynamoDBTable.StreamArn SourceParameters: DynamoDBStreamParameters: StartingPosition: LATEST FilterCriteria: Filters: - Pattern: '{ "eventName": ["INSERT", "MODIFY"] }' Target: !GetAtt PipeTargetQueue.Arn