入门:创建 Amazon EventBridge 管道 - Amazon EventBridge

入门:创建 Amazon EventBridge 管道

为帮助您熟悉管道及其功能,我们将使用 CloudFormation 模板搭建一个 EventBridge 管道和相关组件。随后,我们便可以探索管道的各项特征。

该模板会创建一个 EventBridge 管道,用于将 DynamoDB 表中的数据流连接到 Amazon SQS 队列。每当数据库表中创建或修改记录时,管道都会将生成的事件发送到该队列。

部署的管道包含以下组件:

  • 一个作为管道源的 DynamoDB 表(及数据流),以及一个作为管道目标的 Amazon SQS 队列。

  • 一个执行角色,用于授予 EventBridge 访问 DynamoDB 表和 Amazon SQS 队列的必要权限。

  • 管道本身,其中包含一个事件筛选条件,仅选择在表项创建(插入)或修改时生成的事件。

有关模板的具体技术细节,请参阅 模板详细信息

数据库事件会与筛选条件进行匹配,符合条件则发送至队列。

使用 CloudFormation 创建管道

为了创建管道及其相关资源,我们将创建一个 CloudFormation 模板,并使用该模板创建一个包含示例管道且已配置源和目标的堆栈。

重要

如果您通过此模板创建堆栈,则需为使用的 Amazon 资源支付相应费用。

首先,创建 CloudFormation 模板。

  1. 模板 部分中,单击 JSONYAML 选项卡上的复制图标,以复制模板内容。

  2. 将模板内容粘贴到新文件中。

  3. 将该文件保存在本地。

接下来,使用已保存的模板来预置 CloudFormation 堆栈。

  1. 通过以下网址打开 CloudFormation 控制台:https://console.aws.amazon.com/cloudformation/

  2. 堆栈页面,从创建堆栈菜单中选择(采用新资源(标准))

  3. 指定模板:

    1. 先决条件下,选择选择现有模板

    2. 指定模板下,选择上传模板文件

    3. 选择选择文件,导航到模板文件并将其选中。

    4. 选择下一步

  4. 指定堆栈详细信息:

    1. 输入堆栈名称。

    2. 对于其他参数,可接受默认值或输入自定义值。

    3. 选择下一步

  5. 配置堆栈选项:

    1. 堆栈故障选项下,选择删除所有新创建的资源

      注意

      如果选择此选项,即使堆栈创建失败,您可能也不会因为删除策略规定保留资源而向资源付费。有关更多信息,请参阅《CloudFormation 用户指南》中的 DeletionPolicy 属性

    2. 接受其他所有默认值。

    3. 功能下勾选复选框,确认 CloudFormation 可能在您的账户中创建 IAM 资源。

    4. 选择下一步

  6. 查看堆栈详细信息并选择提交

使用 CloudFormation 创建堆栈(AWS CLI)

您还可以使用 AWS CLI 创建堆栈。

  • 使用 create-stack 命令。

    • 接受模板参数的默认值,同时指定堆栈名称。使用 template-body 参数传递模板内容,或使用 template-url 指定 URL 位置。

      aws cloudformation create-stack \ --stack-name eventbridge-rule-tutorial \ --template-body template-contents \ --capabilities CAPABILITY_IAM
    • 覆盖一个或多个模板参数的默认值。例如:

      aws cloudformation create-stack \ --stack-name eventbridge-rule-tutorial \ --template-body template-contents \ --parameters \ ParameterKey=SourceTableName,ParameterValue=pipe-example-source \ ParameterKey=TargetQueueName,ParameterValue=pipe-example-target \ ParameterKey=PipeName,ParameterValue=pipe-with-filtering-example \ --capabilities CAPABILITY_IAM

CloudFormation 会开始创建堆栈。一旦堆栈创建完成之后,堆栈资源就即可投入使用。可使用堆栈详细信息页面上的资源选项卡来查看账户中预置的资源。

探索管道功能

管道创建完成后,您可以通过 EventBridge 控制台观察管道运行状态并测试事件传送效果。

  1. 访问 https://console.aws.amazon.com/events/home?#/pipes,打开 EventBridge 控制台。

  2. 选择您创建的管道。

    在管道详细信息页面上,管道组件部分会显示构成管道的资源,并包含提供各组件更多详细信息的选项卡。

    管道详细信息页面以图形方式显示管道的源、筛选器和目标组件。

    您可以在设置选项卡的权限部分中,找到我们为该管道创建的执行角色。

检查管道筛选器

在测试管道运行状况之前,我们先来检查一下用于控制将哪些事件发送到目标的筛选器。管道只会将符合筛选条件的事件发送到目标,所有其他事件都将被丢弃。本示例中,我们仅希望将创建或修改表条目时生成的事件发送到 Amazon SQS 队列。

  • 在管道详细信息页面的管道组件下,选择筛选选项卡。

    我们已添加了一个筛选器,该筛选器仅选择 eventName 设置为 INSERTMODIFY 的事件。

    { "eventName": ["INSERT", "MODIFY"] }

通过管道发送事件

接下来,我们将在管道源中生成事件,以测试管道的过滤和传送功能是否正常运行。为此,我们将在指定为管道源的 DynamoDB 表中创建并编辑一个表项。

  1. 在管道详细信息页面的管道组件下,选择选项卡。

  2. 下,选择 DynamoDB 数据流名称。

    这将在单独的窗口中打开 DynamoDB 控制台,并显示源表的详细信息。

  3. 选择浏览表项目

  4. 通过在表中创建项目来生成 INSERT 事件:

    1. 选择创建项目

    2. AlbumArtist 属性添加值。

    3. 选择创建项目

  5. 通过编辑项目生成 DELETEINSERT 事件:

    1. 从列表中选择该项目,然后从操作菜单中选择编辑项目

    2. AlbumArtist 属性输入新值。

    3. 勾选确认您要更改项目键值的复选框,然后选择重新创建项目

      此操作会先删除该表项,再重新创建,从而生成一个 DELETE 事件,接着生成一个新的 INSERT 事件。

  6. 通过向项目添加属性来生成 MODIFY 事件:

    1. 从列表中选择该项目,然后从操作菜单中选择编辑项目

    2. 添加新属性菜单中,选择 Number

    3. 在属性名称中输入 Year,然后为该属性输入值。选择保存并关闭

确认事件通过管道成功传送

最后,我们将验证管道是否成功筛选并传送了在 DynamoDB 中创建和编辑表项时生成的事件。

  1. 在管道详细信息页面的管道组件下,选择目标选项卡。

  2. 目标下,选择 Amazon SQS 队列名称。

    这将在单独的窗口中打开 Amazon SQS 控制台,并显示目标队列的详细信息。

  3. 选择发送和接收消息

  4. 接收消息下,选择轮询消息

    Amazon SQS 会将收到的消息加载到队列中。单击各个消息可查看其详细信息。

    队列中应有三条事件消息:

    • 两条 INSERT 类型消息,一条是您首次创建表项时生成的,另一条是您通过更改键值重新创建表项时生成的。

    • 一条 MODIFY 类型消息,是您向表项添加属性时生成的。

    请注意,队列中没有 DELETE 类型的事件消息,尽管在您通过更改键值删除并重新创建表项时生成了一个该类事件。这是因为我们指定的管道筛选器仅筛选 INSERTMODIFY 类型事件,因此管道会过滤掉 DELETE 事件,而不会将其传送到队列。

清理:删除资源

最后一步,我们将删除该堆栈及其包含的资源。

重要

只要堆栈存在,您就需要为堆栈中包含的 Amazon 资源付费。

  1. 通过以下网址打开 CloudFormation 控制台:https://console.aws.amazon.com/cloudformation/

  2. 堆栈页面上,选择根据该模板创建的堆栈,选择删除,然后确认删除

    CloudFormation 启动删除堆栈及其包含的所有资源。

CloudFormation 模板详细信息

此模板会在您的账户中创建资源并授予相应权限。

资源

本教程的 CloudFormation 模板将在您的账户中创建以下资源:

重要

如果您通过此模板创建堆栈,则需为使用的 Amazon 资源支付相应费用。

权限

该模板包含一个代表执行角色的 AWS::IAM::Role 资源。此角色向 EventBridge 管道服务(pipes.amazonaws.com)授予您账户中的以下权限。

以下权限的作用范围限定于模板创建的作为管道事件源的 DynamoDB 表及其数据流:

  • dynamodb:DescribeStream

  • dynamodb:GetRecords

  • dynamodb:GetShardIterator

  • dynamodb:ListStreams

以下权限的作用范围限定于堆栈创建的作为管道目标的 Amazon SQS 队列:

  • sqs:SendMessage

CloudFormation 模板

将以下 JSON 或 YAML 代码另存为单独文件,以用作本教程的 CloudFormation 模板。

JSON
{ "AWSTemplateFormatVersion": "2010-09-09", "Description" : "[AWSDocs] EventBridge: pipes-get-started", "Parameters" : { "SourceTableName" : { "Type" : "String", "Default" : "pipe-example-source", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." }, "TargetQueueName" : { "Type" : "String", "Default" : "pipe-example-target", "Description" : "Specify the name of the queue to provision as the pipe target, or accept the default." }, "PipeName" : { "Type" : "String", "Default" : "pipe-with-filtering-example", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." } }, "Resources": { "PipeSourceDynamoDBTable": { "Type": "AWS::DynamoDB::Table", "Properties": { "AttributeDefinitions": [{ "AttributeName": "Album", "AttributeType": "S" }, { "AttributeName": "Artist", "AttributeType": "S" } ], "KeySchema": [{ "AttributeName": "Album", "KeyType": "HASH" }, { "AttributeName": "Artist", "KeyType": "RANGE" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 10, "WriteCapacityUnits": 10 }, "StreamSpecification": { "StreamViewType": "NEW_AND_OLD_IMAGES" }, "TableName": { "Ref" : "SourceTableName" } } }, "PipeTargetQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": { "Ref" : "TargetQueueName" } } }, "PipeTutorialPipeRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "Service": "pipes.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringLike": { "aws:SourceArn": { "Fn::Join": [ "", [ "arn:", { "Ref": "AWS::Partition" }, ":pipes:", { "Ref": "AWS::Region" }, ":", { "Ref": "AWS::AccountId" }, ":pipe/", { "Ref": "PipeName" } ] ] }, "aws:SourceAccount": { "Ref" : "AWS::AccountId" } } } }] }, "Description" : "EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe.", "Path": "/", "Policies": [{ "PolicyName": "SourcePermissions", "PolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": [ { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] } ] }] } }, { "PolicyName": "TargetPermissions", "PolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "sqs:SendMessage" ], "Resource": [ { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] } ] }] } } ] } }, "PipeWithFiltering": { "Type": "AWS::Pipes::Pipe", "Properties": { "Description" : "EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue.", "Name": { "Ref" : "PipeName" }, "RoleArn": {"Fn::GetAtt" : ["PipeTutorialPipeRole", "Arn"] }, "Source": { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] }, "SourceParameters": { "DynamoDBStreamParameters" : { "StartingPosition" : "LATEST" }, "FilterCriteria" : { "Filters" : [ { "Pattern" : "{ \"eventName\": [\"INSERT\", \"MODIFY\"] }" }] } }, "Target": { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] } } } } }
YAML
AWSTemplateFormatVersion: '2010-09-09' Description: '[AWSDocs] EventBridge: pipes-get-started' Parameters: SourceTableName: Type: String Default: pipe-example-source Description: Specify the name of the table to provision as the pipe source, or accept the default. TargetQueueName: Type: String Default: pipe-example-target Description: Specify the name of the queue to provision as the pipe target, or accept the default. PipeName: Type: String Default: pipe-with-filtering-example Description: Specify the name of the table to provision as the pipe source, or accept the default. Resources: PipeSourceDynamoDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: Album AttributeType: S - AttributeName: Artist AttributeType: S KeySchema: - AttributeName: Album KeyType: HASH - AttributeName: Artist KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 10 WriteCapacityUnits: 10 StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES TableName: !Ref SourceTableName PipeTargetQueue: Type: AWS::SQS::Queue Properties: QueueName: !Ref TargetQueueName PipeTutorialPipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17 ' Statement: - Effect: Allow Principal: Service: pipes.amazonaws.com Action: sts:AssumeRole Condition: StringLike: aws:SourceArn: !Join - '' - - 'arn:' - !Ref AWS::Partition - ':pipes:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':pipe/' - !Ref PipeName aws:SourceAccount: !Ref AWS::AccountId Description: EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe. Path: / Policies: - PolicyName: SourcePermissions PolicyDocument: Version: '2012-10-17 ' Statement: - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: - !GetAtt PipeSourceDynamoDBTable.StreamArn - PolicyName: TargetPermissions PolicyDocument: Version: '2012-10-17 ' Statement: - Effect: Allow Action: - sqs:SendMessage Resource: - !GetAtt PipeTargetQueue.Arn PipeWithFiltering: Type: AWS::Pipes::Pipe Properties: Description: EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue. Name: !Ref PipeName RoleArn: !GetAtt PipeTutorialPipeRole.Arn Source: !GetAtt PipeSourceDynamoDBTable.StreamArn SourceParameters: DynamoDBStreamParameters: StartingPosition: LATEST FilterCriteria: Filters: - Pattern: '{ "eventName": ["INSERT", "MODIFY"] }' Target: !GetAtt PipeTargetQueue.Arn