使用 Lambda 处理 Amazon Kinesis Data Streams 记录 - AWS Lambda

使用 Lambda 处理 Amazon Kinesis Data Streams 记录

要使用 Lambda 处理 Amazon Kinesis Data Streams 记录,可创建 Lambda 事件源映射。您可以将 Lambda 函数映射到标准迭代器或增强型扇出功能使用者。有关更多信息,请参阅轮询和批处理流

创建 Kinesis 事件源映射

要使用来自数据流的记录调用 Lambda 函数,请创建一个事件源映射。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。处理来自多个流的项目时,每个批处理将只包含来自单个分片或流的记录。

您可以配置事件源映射来处理来自不同 AWS 账户中的流的记录。要了解更多信息,请参阅创建跨账户事件源映射

在创建事件源映射之前,您需要向您的 Lambda 函数授予读取 Kinesis 数据流中数据的权限。Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源:

AWS 托管式策略 AWSLambdaKinesisExecutionRole 包含这些权限。按照以下过程所述将此托管式策略添加到您的函数。

注意
  • 您不需要 kinesis:ListStreams 权限来创建和管理 Kinesis 的事件源映射。但是,如果您在控制台中创建事件源映射但没有此权限,则无法从下拉列表中选择 Kinesis 流,并且控制台将显示错误。要创建事件源映射,您需要手动输入流的 Amazon 资源名称(ARN)。

  • Lambda 在重试失败的调用时会进行 kinesis:GetRecordskinesis:GetShardIterator API 调用。

AWS 管理控制台
为您的函数添加 Kinesis 权限
  1. 打开 Lambda 控制台的“函数”页面,然后选择函数。

  2. 配置选项卡中,选择权限

  3. 执行角色窗格的角色名称下,选择指向函数的执行角色的链接。此链接将在 IAM 控制台中打开该角色的页面。

  4. 权限策略窗格中,选择添加权限,然后选择附加策略

  5. 在搜索字段中输入 AWSLambdaKinesisExecutionRole

  6. 选中该策略名称旁边的复选框,然后选择添加权限

AWS CLI
为您的函数添加 Kinesis 权限
  • 运行以下 CLI 命令,以将 AWSLambdaKinesisExecutionRole 策略附加到函数的执行角色。

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
为您的函数添加 Kinesis 权限
  • 在函数定义中添加 Policies 属性,如以下示例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

配置所需的权限后,创建事件源映射。

AWS 管理控制台
创建 Kinesis 事件源映射
  1. 打开 Lambda 控制台的“函数”页面,然后选择函数。

  2. 函数概述窗格中,选择添加触发器

  3. 触发器配置下,对于源,请选择 Kinesis

  4. 选择要为其创建事件源映射的 Kinesis 流,也可以选择流的使用者。

  5. (可选)编辑事件源映射的批处理大小起始位置批处理窗口

  6. 选择添加

在控制台中创建事件源映射时,您的 IAM 角色必须拥有 kinesis:ListStreamskinesis:ListStreamConsumers 权限。

AWS CLI
创建 Kinesis 事件源映射
  • 运行以下 CLI 命令以创建 Kinesis 事件源映射。根据您的应用场景选择自己的批量大小和起始位置。

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

要指定批处理时间窗,请添加 --maximum-batching-window-in-seconds 选项。有关使用此参数和其他参数的更多信息,请参阅《AWS CLI Command Reference》中的 create-event-source-mapping

AWS SAM
创建 Kinesis 事件源映射
  • 在函数定义中添加 KinesisEvent 属性,如以下示例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

要了解有关在 AWS SAM 中创建 Kinesis 数据流事件源映射的更多信息,请参阅《AWS Serverless Application Model Developer Guide》中的 Kinesis

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

创建跨账户事件源映射

Amazon Kinesis Data Streams 支持基于资源的策略。因此,您可以在一个 AWS 账户中使用 Lambda 函数来处理另一个账户的流中摄入的数据。

要使用其他 AWS 账户中的 Kinesis 流为您的 Lambda 函数创建事件源映射,您必须使用基于资源的策略配置该流,以向您的 Lambda 函数授予读取相关项目的权限。要了解如何配置流以允许跨账户存取,请参阅《Amazon Kinesis Streams 开发人员指南》中的 Sharing access with cross-account AWS Lambda functions

使用基于资源的策略配置流以向您的 Lambda 函数授予所需的权限后,请使用上一节中描述的任何方法创建事件源映射。

如果您选择使用 Lambda 控制台创建事件源映射,请将流的 ARN 直接粘贴到输入字段中。如果您想指定流的使用者,粘贴使用者的 ARN 会自动填充流字段。