使用 Lambda 处理 Amazon Kinesis Data Streams 记录
要使用 Lambda 处理 Amazon Kinesis Data Streams 记录,可创建 Lambda 事件源映射。您可以将 Lambda 函数映射到标准迭代器或增强型扇出功能使用者。有关更多信息,请参阅轮询和批处理流。
创建 Kinesis 事件源映射
要使用来自数据流的记录调用 Lambda 函数,请创建一个事件源映射。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。处理来自多个流的项目时,每个批处理将只包含来自单个分片或流的记录。
您可以配置事件源映射来处理来自不同 AWS 账户中的流的记录。要了解更多信息,请参阅创建跨账户事件源映射。
在创建事件源映射之前,您需要向您的 Lambda 函数授予读取 Kinesis 数据流中数据的权限。Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源:
AWS 托管式策略 AWSLambdaKinesisExecutionRole 包含这些权限。按照以下过程所述将此托管式策略添加到您的函数。
-
您不需要 kinesis:ListStreams 权限来创建和管理 Kinesis 的事件源映射。但是,如果您在控制台中创建事件源映射但没有此权限,则无法从下拉列表中选择 Kinesis 流,并且控制台将显示错误。要创建事件源映射,您需要手动输入流的 Amazon 资源名称(ARN)。
-
Lambda 在重试失败的调用时会进行 kinesis:GetRecords 和 kinesis:GetShardIterator API 调用。
- AWS 管理控制台
-
为您的函数添加 Kinesis 权限
-
打开 Lambda 控制台的“函数”页面,然后选择函数。
-
在配置选项卡中,选择权限。
-
在执行角色窗格的角色名称下,选择指向函数的执行角色的链接。此链接将在 IAM 控制台中打开该角色的页面。
-
在权限策略窗格中,选择添加权限,然后选择附加策略。
-
在搜索字段中输入 AWSLambdaKinesisExecutionRole。
-
选中该策略名称旁边的复选框,然后选择添加权限。
- AWS CLI
-
- AWS SAM
-
为您的函数添加 Kinesis 权限
-
在函数定义中添加 Policies 属性,如以下示例所示:
Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./my-function/
Handler: index.handler
Runtime: nodejs22.x
Policies:
- AWSLambdaKinesisExecutionRole
配置所需的权限后,创建事件源映射。
- AWS 管理控制台
-
创建 Kinesis 事件源映射
-
打开 Lambda 控制台的“函数”页面,然后选择函数。
-
在函数概述窗格中,选择添加触发器。
-
在触发器配置下,对于源,请选择 Kinesis 。
-
选择要为其创建事件源映射的 Kinesis 流,也可以选择流的使用者。
-
(可选)编辑事件源映射的批处理大小、起始位置和批处理窗口。
-
选择添加。
在控制台中创建事件源映射时,您的 IAM 角色必须拥有 kinesis:ListStreams 和 kinesis:ListStreamConsumers 权限。
- AWS CLI
-
创建 Kinesis 事件源映射
-
运行以下 CLI 命令以创建 Kinesis 事件源映射。根据您的应用场景选择自己的批量大小和起始位置。
aws lambda create-event-source-mapping \
--function-name MyFunction \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--starting-position LATEST \
--batch-size 100
要指定批处理时间窗,请添加 --maximum-batching-window-in-seconds 选项。有关使用此参数和其他参数的更多信息,请参阅《AWS CLI Command Reference》中的 create-event-source-mapping。
- AWS SAM
-
创建 Kinesis 事件源映射
-
在函数定义中添加 KinesisEvent 属性,如以下示例所示:
Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./my-function/
Handler: index.handler
Runtime: nodejs22.x
Policies:
- AWSLambdaKinesisExecutionRole
Events:
KinesisEvent:
Type: Kinesis
Properties:
Stream: !GetAtt MyKinesisStream.Arn
StartingPosition: LATEST
BatchSize: 100
MyKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
要了解有关在 AWS SAM 中创建 Kinesis 数据流事件源映射的更多信息,请参阅《AWS Serverless Application Model Developer Guide》中的 Kinesis。
轮询和流的起始位置
请注意,事件源映射创建和更新期间的流轮询最终是一致的。
此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP。
创建跨账户事件源映射
Amazon Kinesis Data Streams 支持基于资源的策略。因此,您可以在一个 AWS 账户中使用 Lambda 函数来处理另一个账户的流中摄入的数据。
要使用其他 AWS 账户中的 Kinesis 流为您的 Lambda 函数创建事件源映射,您必须使用基于资源的策略配置该流,以向您的 Lambda 函数授予读取相关项目的权限。要了解如何配置流以允许跨账户存取,请参阅《Amazon Kinesis Streams 开发人员指南》中的 Sharing access with cross-account AWS Lambda functions。
使用基于资源的策略配置流以向您的 Lambda 函数授予所需的权限后,请使用上一节中描述的任何方法创建事件源映射。
如果您选择使用 Lambda 控制台创建事件源映射,请将流的 ARN 直接粘贴到输入字段中。如果您想指定流的使用者,粘贴使用者的 ARN 会自动填充流字段。