使用 Lambda 處理 Amazon Kinesis Data Streams 記錄 - AWS Lambda

使用 Lambda 處理 Amazon Kinesis Data Streams 記錄

若要使用 Lambda 處理 Amazon Kinesis Data Streams 記錄,請建立 Lambda 事件來源映射。您可以將 Lambda 函式映射至標準迭代器或增強型散發取用者。如需更多詳細資訊,請參閱 輪詢和批次處理串流

建立 Kinesis 事件來源映射

若要使用資料串流中的記錄調用您的 Lambda 函數,請建立一個事件來源映射。您可以建立多個事件來源映射,來使用多個 Lambda 函數處理相同資料,或使用單一函數處理來自多個資料串流的項目。處理來自多個串流的項目時,每個批次將僅包含來自單一碎片或串流的記錄。

您可以設定事件來源映射,以處理來自不同 AWS 帳戶中的串流的記錄。如需詳細資訊,請參閱 建立跨帳戶事件來源映射

建立事件來源映射之前,您需要授予 Lambda 函數從 Kinesis 資料串流讀取的許可。Lambda 需要以下許可,才能管理與 Kinesis 資料串流相關的資源:

AWS 受管政策 AWSLambdaKinesisExecutionRole 包含這些許可。如下列程序所述,將此受管政策新增至您的函數。

注意
  • 無需 kinesis:ListStreams 許可,即可為 Kinesis 建立並管理事件來源映射。但是,如果您在主控台中建立事件來源映射卻並不具備此許可,將無法從下拉式清單中選取 Kinesis 串流,且主控台會顯示錯誤。若要建立事件來源映射,需要手動輸入串流的 Amazon Resource Name (ARN)。

  • Lambda 會在重試失敗的調用時發起 kinesis:GetRecordskinesis:GetShardIterator API 呼叫。

AWS 管理主控台
若要將 Kinesis 許可新增至函數
  1. 開啟 Lambda 主控台的函數頁面,然後選取您的函數。

  2. 組態索引標籤中,選擇許可

  3. 執行角色窗格的角色名稱下,選擇函數執行角色的連結。此連結會在 IAM 主控台中開啟該角色的頁面。

  4. 許可政策窗格中,選擇新增許可,然後選取附接政策

  5. 在搜尋欄位中輸入 AWSLambdaKinesisExecutionRole

  6. 選取政策旁邊的核取方塊,然後選擇新增許可

AWS CLI
若要將 Kinesis 許可新增至函數
  • 執行下列 CLI 命令,將 AWSLambdaKinesisExecutionRole 政策新增至函數的執行角色:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
若要將 Kinesis 許可新增至函數
  • 在函數定義中,新增 Policies 屬性,如下列範例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

設定需要的許可後,建立事件來源映射。

AWS 管理主控台
若要建立 Kinesis 事件來源映射
  1. 開啟 Lambda 主控台的函數頁面,然後選取您的函數。

  2. 函數概觀窗格中,選擇新增觸發條件

  3. 觸發條件組態下,針對來源選取 Kinesis

  4. 選取您要為之建立事件來源映射的 Kinesis 串流,以及 (選用) 串流的取用者。

  5. (選用) 編輯事件來源映射的批次大小開始位置批次時段

  6. 選擇新增

當從主控台建立事件來源映射時,您的 IAM 角色必須有 kinesis:ListStreamskinesis:ListStreamConsumers 許可。

AWS CLI
若要建立 Kinesis 事件來源映射
  • 執行下列 CLI 命令以建立 Kinesis 事件來源映射。根據您的使用案例,選擇您自己的批次大小和開始位置。

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

若要指定批次處理時段,請新增 --maximum-batching-window-in-seconds 選項。如需使用此參數和其他參數的詳細資訊,請參閱《AWS CLI 命令參考》中的 create-event-source-mapping

AWS SAM
若要建立 Kinesis 事件來源映射
  • 在函數定義中,新增 KinesisEvent 屬性,如下列範例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

若要進一步了解在 AWS SAM 中為 Kinesis Data Streams 建立事件來源映射,請參閱《AWS Serverless Application Model 開發人員指南》中的 Kinesis

輪詢和串流開始位置

請注意,建立和更新事件來源映射期間的串流輪詢最終會一致。

  • 在建立事件來源映射期間,從串流開始輪詢事件可能需要幾分鐘時間。

  • 在更新事件來源映射期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這種行為表示如果您指定 LATEST 當作串流的開始位置,事件來源映射可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZONAT_TIMESTAMP

建立跨帳戶事件來源映射

Amazon Kinesis Data Streams 支援資源型政策。因此,您可以使用一個 AWS 帳戶中的 Lambda 函數來處理擷取至另一個帳戶中的串流的資料。

若要使用另一個 AWS 帳戶中的 Kinesis 串流為您的 Lambda 函數建立事件來源映射,您必須使用資源型政策來設定該串流,以授予 Lambda 函數讀取項目的許可。若要了解如何設定串流以允許跨帳戶存取權,請參閱《Amazon Kinesis Streams 開發人員指南》中的與跨帳戶 AWS Lambda 函數共用存取權

使用為您的 Lambda 函數提供所需許可的資源型政策設定串流後,使用上一節所述的任何方法建立事件來源映射。

如果您選擇使用 Lambda 主控台來建立事件來源映射,則請將串流的 ARN 直接貼入輸入欄位。如果您想要為串流指定取用者,則貼上取用者的 ARN 即會自動填入串流欄位。