在 Lambda 中保留 Kinesis Data Streams 事件來源的捨棄批次記錄 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Lambda 中保留 Kinesis Data Streams 事件來源的捨棄批次記錄

Kinesis 事件來源映射的錯誤處理取決於錯誤是在調用函數之前還是在調用函數期間發生:

  • 調用前:如果 Lambda 事件來源映射因為限流或其他問題而無法調用函數,它會重試,直到記錄過期或超過事件來源映射上設定的最長存留期 (MaximumRecordAgeInSeconds)。

  • 在調用期間:如果調用函數但傳回錯誤,Lambda 會重試,直到記錄過期、超過最長存留期 (MaximumRecordAgeInSeconds),或達到設定的重試配額 (MaximumRetryAttempts)。對於函數錯誤,您也可以設定 BisectBatchOnFunctionError,將失敗的批次分割為兩個較小的批次,隔離錯誤的記錄並避免逾時。分割批次不會消耗重試配額。

如果錯誤處理措施失敗,Lambda 會捨棄相應記錄,並繼續處理串流中的批次。使用預設設定時,這表示不良的記錄可能會封鎖受影響碎片上的處理長達一週。若要避免此情況,在設定函數的事件來源映射時,請使用合理的重試次數和符合您使用案例的記錄最大保留期。

設定失敗調用的目的地

若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。傳送至該目的地的每筆記錄都是包含失敗調用之中繼資料的 JSON 文件。對於 Amazon S3 目的地,Lambda 還會將整個調用記錄與中繼資料一起傳送。您可以將任何 Amazon SNS 主題、Amazon SQS 佇列或 S3 儲存貯體設定為目的地。

透過 Amazon S3 目的地,您可以使用 Amazon S3 事件通知功能,在物件上傳至您的目的地 S3 儲存貯體時接收通知。您也可以設定 S3 事件通知來調用另一個 Lambda 函數,以對失敗的批次執行自動處理。

執行角色必須具有目的地的許可:

如果您已使用自己的 KMS 金鑰為 S3 目的地啟用加密,則函數的執行角色也必須具有呼叫 kms:GenerateDataKey 的許可。如果 KMS 金鑰和 S3 儲存貯體目的地與 Lambda 函數和執行角色位於不同的帳戶中,請將 KMS 金鑰設定為信任執行角色以允許 kms:GenerateDataKey。

若要使用主控台設定失敗時的目的地,請依照下列步驟執行:

  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇一個函數。

  3. 函數概觀下,選擇新增目的地

  4. 針對來源,請選擇事件來源映射調用

  5. 對於事件來源映射,請選擇針對此函數設定的事件來源。

  6. 對於條件,選取失敗時。對於事件來源映射調用,這是唯一可接受的條件。

  7. 對於目標類型,請選擇 Lambda 將調用記錄傳送至的目標類型。

  8. 對於目的地,請選擇一個資源。

  9. 選擇儲存

您也可以使用 AWS Command Line Interface () 設定失敗時的目的地AWS CLI。例如,下列 create-event-source-mapping 命令會將具有 SQS 失敗時的目的地的事件來源映射新增至 MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

下列 update-event-source-mapping 命令會更新事件來源映射,以在嘗試兩次重試後,或在記錄超過一小時時,將失敗的調用記錄傳送至 SNS 目的地。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

系統會以非同步的方式套用更新的設定,在處理完成之前不會反映在輸出中。使用 get-event-source-mapping 命令檢視目前的狀態。

若要移除目的地,請提供空白字串作為 destination-config 參數的引數:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon S3 目的地的安全最佳實務

刪除已設定為目的地的 S3 儲存貯體而不從函數的組態中移除目的地,可能會產生安全風險。如果其他使用者知道目的地儲存貯體的名稱,他們可以在其 AWS 帳戶中重新建立儲存貯體。失敗調用的記錄會被傳送到其儲存貯體,可能公開來自您函數的資料。

警告

為了確保無法將來自函數的調用記錄傳送到另一個 中的 S3 儲存貯體 AWS 帳戶,請將條件新增至函數的執行角色,以限制您帳戶中儲存貯體的s3:PutObject許可。

下列範例顯示的 IAM 政策,將函數的 s3:PutObject 許可限制為帳戶中的儲存貯體。此政策也為 Lambda 提供了使用 S3 儲存貯體做為目的地所需的 s3:ListBucket 許可。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::*/*", "arn:aws:s3:::*" ], "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

若要使用 AWS Management Console 或 將許可政策新增至您 funcion 的執行角色 AWS CLI,請參閱下列程序中的指示:

Console
若要將許可政策新增至函數的執行角色 (主控台)
  1. 開啟 Lambda 主控台中的函數頁面

  2. 選取您要修改其執行角色的 Lambda 函數。

  3. 組態索引標籤中,選擇許可

  4. 執行角色索引標籤中,選取函數的角色名稱,以開啟角色的 IAM 主控台頁面。

  5. 透過下列步驟將許可政策新增至角色:

    1. 許可政策窗格中,選擇新增許可 ,然後選取建立內嵌政策

    2. 政策編輯器中,選取 JSON

    3. 將您要新增的政策貼入編輯器 (取代現有的 JSON),然後選擇下一步

    4. 政策詳細資訊下,輸入政策名稱

    5. 選擇建立政策

AWS CLI
若要將許可政策新增至函數的執行角色 (CLI)
  1. 建立具有所需許可的 JSON 政策文件,並將其儲存在本機目錄中。

  2. 使用 IAM put-role-policy CLI 命令,將許可新增至函數的執行角色。從您儲存 JSON 政策文件的目錄執行下列命令,並將角色名稱、政策名稱和政策文件取代為您自己的值。

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Amazon SNS 和 Amazon SQS 調用記錄範例

下列範例顯示 Lambda 針對失敗的 Kinesis 事件來源調用而傳送至 SQS 佇列或 SNS 主題。由於 Lambda 只會傳送這些目標類型的中繼資料,因此請使用 streamArnshardIdstartSequenceNumberendSequenceNumber 欄位來取得完整的原始記錄。KinesisBatchInfo 屬性中顯示的所有欄位一律都會存在。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

您可以使用此資訊來從串流擷取受影響的記錄,以進行疑難排解。實際的記錄不包含在內,因此您必須處理此記錄,並在因過期而遺失之前從資料串流中擷取它們。

Amazon S3 調用記錄範例

下列範例顯示 Lambda 針對失敗的 Kinesis 事件來源調用而傳送至 Amazon S3 儲存貯體的內容。除了上一個 SQS 和 SNS 目的地範例中的所有欄位之外,此 payload 欄位還包含原始調用記錄做為逸出 JSON 字串。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }

包含調用記錄的 S3 物件使用以下命名慣例:

aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>