入門:建立 Amazon EventBridge 管道 - Amazon EventBridge

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

入門:建立 Amazon EventBridge 管道

為了熟悉管道及其功能,我們將使用 AWS CloudFormation 範本來設定 EventBridge 管道和相關聯的元件。然後,我們可以探索各種管道功能。

範本會建立 EventBridge 管道,將串流從 DynamoDB 資料表連線至 Amazon SQS 佇列。每次在資料庫資料表中建立或修改記錄時,管道都會將產生的事件傳送至佇列。

部署的管道包含:

  • 做為管道來源的 DynamoDB 資料表 (和串流),以及做為目標的 Amazon SQS 佇列。

  • 授予 EventBridge 存取 DynamoDB 資料表和 Amazon SQS 佇列必要許可的執行角色。

  • 管道本身,其中包含事件篩選條件,只會選取建立 (插入) 或修改資料表項目時產生的事件。

如需範本的特定技術詳細資訊,請參閱 範本詳細資訊

資料庫事件會比對至篩選條件,並在相符時傳送至佇列。

使用 建立管道 AWS CloudFormation

若要建立管道及其相關資源,我們將建立 CloudFormation 範本,並使用它來建立包含範例管道的堆疊,並完整包含來源和目標。

重要

如果您從此範本建立堆疊,則會向您收取所使用的 Amazon 資源費用。

建立範本

首先,建立 CloudFormation 範本。

  1. 範本區段中,按一下 JSONYAML 標籤上的複製圖示,以複製範本內容。

  2. 將範本內容貼到新檔案中。

  3. 在本機儲存檔案。

建立 堆疊

接著,使用您儲存的範本來佈建 CloudFormation 堆疊。

  1. 開啟 AWS CloudFormation 主控台。

  2. 堆疊頁面上,從建立堆疊功能表中,選擇新資源 (標準)

  3. 指定範本:

    1. 先決條件下,選擇選擇現有範本

    2. 指定範本下,選擇上傳範本檔案

    3. 選擇選擇檔案,導覽至範本檔案,然後選擇它。

    4. 選擇下一步

  4. 指定堆疊詳細資訊:

    1. 輸入堆疊名稱。

    2. 對於參數,請接受預設值或輸入您自己的值。

    3. 選擇下一步

  5. 設定堆疊選項:

    1. 堆疊失敗選項下,選擇刪除所有新建立的資源

      注意

      選擇此選項可避免針對刪除政策指定的資源向您收費,即使堆疊建立失敗也一樣。如需詳細資訊,請參閱AWS CloudFormation 《 使用者指南》中的 DeletionPolicy 屬性

    2. 接受所有其他預設值。

    3. 功能下,勾選核取方塊以確認 CloudFormation 可能會在您的帳戶中建立 IAM 資源。

    4. 選擇下一步

  6. 檢閱堆疊詳細資訊,然後選擇提交

AWS CloudFormation 會建立堆疊。堆疊建立完成後,堆疊資源即可使用。您可以使用堆疊詳細資訊頁面上的資源索引標籤來檢視帳戶中佈建的資源。

探索管道功能

建立管道之後,您可以使用 EventBridge 主控台來觀察管道操作和測試事件交付。

  1. 開啟位於 https://https://console.aws.amazon.com/events/home?#/pipes 的 EventBridge 主控台。

  2. 選擇您建立的管道。

    在管道詳細資訊頁面上,管道元件區段會顯示構成管道的資源,並包含提供每個元件詳細資訊的索引標籤。

    管道詳細資訊頁面以圖形方式顯示管道的來源、篩選條件和目標元件。

    您可以在設定索引標籤的許可區段中找到我們為管道建立的執行角色。

檢查管道篩選條件

在我們測試管道操作之前,讓我們來檢查我們指定的篩選條件,以控制哪些事件會傳送到目標。管道只會將符合篩選條件的事件傳送至目標;所有其他事件都會遭到捨棄。在這種情況下,我們只希望在建立或修改資料表項目時產生的事件傳送到 Amazon SQS 佇列。

  • 在管道詳細資訊頁面的管道元件下,選擇篩選索引標籤。

    我們包含的篩選條件只會選取 eventName 設定為 INSERT或 的事件MODIFY

    { "eventName": ["INSERT", "MODIFY"] }

透過管道傳送事件

接下來,我們將在管道來源中產生事件,以測試管道篩選和交付是否正常運作。若要這樣做,我們會在指定為管道來源的 DynamoDB 資料表中建立和編輯項目。

  1. 在管道詳細資訊頁面的管道元件下,選擇來源索引標籤。

  2. 來源下,選擇 DynamoDB 串流名稱。

    這會在單獨的視窗中開啟 DynamoDB 主控台,並顯示來源資料表詳細資訊。

  3. 選擇 探索資料表項目

  4. 透過在 資料表中建立項目來產生INSERT事件:

    1. 選擇建立項目

    2. 新增專輯藝術家屬性的值。

    3. 選擇建立項目

  5. 透過編輯項目產生 DELETEINSERT事件:

    1. 從清單中選擇項目,然後從動作功能表中選擇編輯項目

    2. 輸入 AlbumArtist 屬性的新值。

    3. 勾選確認您要變更項目索引鍵值的方塊,然後選擇重新建立項目

      這會導致項目遭到刪除,然後重新建立、產生DELETE事件,然後是新的INSERT事件。

  6. 透過將屬性新增至項目來產生MODIFY事件:

    1. 從清單中選擇項目,然後從動作功能表中選擇編輯項目

    2. 新增屬性功能表中,選擇數字

    3. 針對屬性名稱,輸入年份,然後輸入屬性的值。選擇儲存與關閉

透過管道確認事件交付

最後,我們將確認管道已成功篩選並交付我們在 DynamoDB 中建立和編輯資料表項目所產生的事件。

  1. 在管道詳細資訊頁面的管道元件下,選擇目標索引標籤。

  2. 目標下,選擇 Amazon SQS 佇列名稱。

    這會在單獨的視窗中開啟 Amazon SQS 主控台,並顯示目標佇列詳細資訊。

  3. 選擇傳送及接收訊息

  4. 接收訊息下,選擇輪詢訊息

    Amazon SQS 會將收到的訊息載入佇列。按一下個別訊息以查看其詳細資訊。

    佇列中應該有三個事件訊息:

    • 類型 的兩個INSERT,一個是在您第一次建立資料表項目時產生的,另一個是在您透過變更索引鍵值重新建立項目時產生的。

    • 類型 之一MODIFY,當您將屬性新增至資料表項目時產生。

    請注意,佇列DELETE中沒有 類型的事件訊息,即使您刪除並變更索引鍵值來重新建立資料表項目時,也一樣。我們指定的管道篩選條件只會選取 INSERTMODIFY,因此管道會篩選出DELETE事件,而不是將它交付到佇列。

清除:刪除資源

最後一步,我們將刪除堆疊及其包含的資源。

重要

只要堆疊中包含的 Amazon 資源存在,就會向您收取費用。

  1. 開啟 AWS CloudFormation 主控台。

  2. 堆疊頁面上,選擇從範本建立的堆疊,然後選擇刪除,然後確認刪除

    CloudFormation 會啟動刪除堆疊及其包含的所有資源。

CloudFormation 範本詳細資訊

此範本會在您的帳戶中建立資源並授予許可。

資源

本教學課程的 AWS CloudFormation 範本會在您的帳戶中建立下列資源:

重要

如果您從此範本建立堆疊,則會向您收取所使用的 Amazon 資源費用。

許可

範本包含代表執行角色AWS::IAM::Role的資源。此角色會授予 EventBridge Pipes 服務 (pipes.amazonaws.com) 您帳戶中的下列許可。

下列許可範圍限定於 DynamoDB 資料表,並串流範本建立為管道的事件來源:

  • dynamodb:DescribeStream

  • dynamodb:GetRecords

  • dynamodb:GetShardIterator

  • dynamodb:ListStreams

下列許可的範圍僅限於堆疊建立為管道目標的 Amazon SQS 佇列:

  • sqs:SendMessage

CloudFormation 範本

將下列 JSON 或 YAML 程式碼儲存為個別檔案,以做為本教學課程的 CloudFormation 範本。

JSON
{ "AWSTemplateFormatVersion": "2010-09-09", "Description" : "EventBridge Pipe template example. Provisions a pipe, along with a DynamoDB stream as the pipe source and an SQS queue as the pipe target. Also provisions an execution role that contains the necessary permissions to access both the source and target. Once provisioned, the pipe receives events from the DynamoDB data stream, applies a filter, and sends matching events on to an SQS Queue. You will be billed for the Amazon resources used if you create a stack from this template.", "Parameters" : { "SourceTableName" : { "Type" : "String", "Default" : "pipe-example-source", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." }, "TargetQueueName" : { "Type" : "String", "Default" : "pipe-example-target", "Description" : "Specify the name of the queue to provision as the pipe target, or accept the default." }, "PipeName" : { "Type" : "String", "Default" : "pipe-with-filtering-example", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." } }, "Resources": { "PipeSourceDynamoDBTable": { "Type": "AWS::DynamoDB::Table", "Properties": { "AttributeDefinitions": [{ "AttributeName": "Album", "AttributeType": "S" }, { "AttributeName": "Artist", "AttributeType": "S" } ], "KeySchema": [{ "AttributeName": "Album", "KeyType": "HASH" }, { "AttributeName": "Artist", "KeyType": "RANGE" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 10, "WriteCapacityUnits": 10 }, "StreamSpecification": { "StreamViewType": "NEW_AND_OLD_IMAGES" }, "TableName": { "Ref" : "SourceTableName" } } }, "PipeTargetQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": { "Ref" : "TargetQueueName" } } }, "PipeTutorialPipeRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "Service": "pipes.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringLike": { "aws:SourceArn": { "Fn::Join": [ "", [ "arn:", { "Ref": "AWS::Partition" }, ":pipes:", { "Ref": "AWS::Region" }, ":", { "Ref": "AWS::AccountId" }, ":pipe/", { "Ref": "PipeName" } ] ] }, "aws:SourceAccount": { "Ref" : "AWS::AccountId" } } } }] }, "Description" : "EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe.", "Path": "/", "Policies": [{ "PolicyName": "SourcePermissions", "PolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": [ { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] } ] }] } }, { "PolicyName": "TargetPermissions", "PolicyDocument": { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "sqs:SendMessage" ], "Resource": [ { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] } ] }] } } ] } }, "PipeWithFiltering": { "Type": "AWS::Pipes::Pipe", "Properties": { "Description" : "EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue.", "Name": { "Ref" : "PipeName" }, "RoleArn": {"Fn::GetAtt" : ["PipeTutorialPipeRole", "Arn"] }, "Source": { "Fn::GetAtt" : [ "PipeSourceDynamoDBTable", "StreamArn" ] }, "SourceParameters": { "DynamoDBStreamParameters" : { "StartingPosition" : "LATEST" }, "FilterCriteria" : { "Filters" : [ { "Pattern" : "{ \"eventName\": [\"INSERT\", \"MODIFY\"] }" }] } }, "Target": { "Fn::GetAtt" : [ "PipeTargetQueue", "Arn" ] } } } } }
YAML
AWSTemplateFormatVersion: '2010-09-09' Description: EventBridge Pipe template example. Provisions a pipe, along with a DynamoDB stream as the pipe source and an SQS queue as the pipe target. Also provisions an execution role that contains the necessary permissions to access both the source and target. Once provisioned, the pipe receives events from the DynamoDB data stream, applies a filter, and sends matching events on to an SQS Queue. You will be billed for the Amazon resources used if you create a stack from this template. Parameters: SourceTableName: Type: String Default: pipe-example-source Description: Specify the name of the table to provision as the pipe source, or accept the default. TargetQueueName: Type: String Default: pipe-example-target Description: Specify the name of the queue to provision as the pipe target, or accept the default. PipeName: Type: String Default: pipe-with-filtering-example Description: Specify the name of the table to provision as the pipe source, or accept the default. Resources: PipeSourceDynamoDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: Album AttributeType: S - AttributeName: Artist AttributeType: S KeySchema: - AttributeName: Album KeyType: HASH - AttributeName: Artist KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 10 WriteCapacityUnits: 10 StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES TableName: !Ref SourceTableName PipeTargetQueue: Type: AWS::SQS::Queue Properties: QueueName: !Ref TargetQueueName PipeTutorialPipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: pipes.amazonaws.com Action: sts:AssumeRole Condition: StringLike: aws:SourceArn: !Join - '' - - 'arn:' - !Ref AWS::Partition - ':pipes:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':pipe/' - !Ref PipeName aws:SourceAccount: !Ref AWS::AccountId Description: EventBridge Pipe template example. Execution role that grants the pipe the permissions necessary to send events to the specified pipe. Path: / Policies: - PolicyName: SourcePermissions PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: - !GetAtt PipeSourceDynamoDBTable.StreamArn - PolicyName: TargetPermissions PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - sqs:SendMessage Resource: - !GetAtt PipeTargetQueue.Arn PipeWithFiltering: Type: AWS::Pipes::Pipe Properties: Description: EventBridge Pipe template example. Pipe that receives events from a DynamoDB stream, applies a filter, and sends matching events on to an SQS Queue. Name: !Ref PipeName RoleArn: !GetAtt PipeTutorialPipeRole.Arn Source: !GetAtt PipeSourceDynamoDBTable.StreamArn SourceParameters: DynamoDBStreamParameters: StartingPosition: LATEST FilterCriteria: Filters: - Pattern: '{ "eventName": ["INSERT", "MODIFY"] }' Target: !GetAtt PipeTargetQueue.Arn