本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 DynamoDB 和 Lambda 設定部分批次回應
取用和處理事件來源的串流資料時,依預設,只有在批次成功完成時,Lambda 檢查點才會到批次的最高序號。Lambda 會將所有其他結果視為完全失敗,並重試處理批次,直至達到重試限制。若要在處理串流的批次時允許部分成功,請開啟 ReportBatchItemFailures。允許部分成功有助於減少記錄的重試次數,但其不會完全消除在成功記錄中重試的可能性。
若要開啟 ReportBatchItemFailures,請在 FunctionResponseTypes 清單中包含枚舉值 ReportBatchItemFailures。此清單指示已為您的函數啟用哪些回應類型。您可以在建立或更新事件來源映射時設定此清單。
即使您的函數程式碼傳回部分批次失敗回應,除非針對事件來源映射明確開啟該ReportBatchItemFailures功能,否則 Lambda 將不會處理這些回應。
報告語法
設定批次項目失敗的報告時,會傳回 StreamsEventResponse 類別,其中包含批次項目失敗的清單。您可以使用 StreamsEventResponse 物件,來傳回批次中第一個失敗記錄的序號。您還可以使用正確的回應語法,建立自己的自訂類別。下列 JSON 結構顯示所需的回應語法:
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
如果 batchItemFailures 陣列包含多個項目,則 Lambda 會使用具有最低序列號的記錄作為檢查點。然後,Lambda 會重試從該檢查點開始的所有記錄。
成功與失敗條件
如果您傳回下列任一項目,Lambda 會將批次視為完全成功:
-
空白 batchItemFailure 清單
-
Null batchItemFailure 清單
-
空白 EventResponse
-
Null EventResponse
如果您傳回下列任一項目,Lambda 會將批次視為完全失敗:
-
空白字串 itemIdentifier
-
Null itemIdentifier
-
具有錯誤金鑰名稱的 itemIdentifier
Lambda 會根據您的重試政策來重試失敗。
將批次平分
如果您的調用失敗且 BisectBatchOnFunctionError 已開啟,則無論您的 ReportBatchItemFailures 設定如何,批次都會被平分。
收到部分批次成功回應且 BisectBatchOnFunctionError 和 ReportBatchItemFailures 均開啟時,批次會依傳回的序號進行平分,並且 Lambda 僅會重試剩餘的記錄。
為了簡化部分批次回應邏輯的實作,請考慮使用 Powertools 中的 Batch Processor 公用程式 AWS Lambda,其會自動為您處理這些複雜性。
以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單:
- .NET
-
- 適用於 .NET 的 SDK
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 .NET 搭配 Lambda 報告 DynamoDB 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
public class Function
{
public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
StreamsEventResponse streamsEventResponse = new StreamsEventResponse();
foreach (var record in dynamoEvent.Records)
{
try
{
var sequenceNumber = record.Dynamodb.SequenceNumber;
context.Logger.LogInformation(sequenceNumber);
}
catch (Exception ex)
{
context.Logger.LogError(ex.Message);
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
}
}
if (batchItemFailures.Count > 0)
{
streamsEventResponse.BatchItemFailures = batchItemFailures;
}
context.Logger.LogInformation("Stream processing complete.");
return streamsEventResponse;
}
}
- Go
-
- SDK for Go V2
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Go 搭配 Lambda 報告 DynamoDB 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
type BatchItemFailure struct {
ItemIdentifier string `json:"ItemIdentifier"`
}
type BatchResult struct {
BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}
func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
var batchItemFailures []BatchItemFailure
curRecordSequenceNumber := ""
for _, record := range event.Records {
// Process your record
curRecordSequenceNumber = record.Change.SequenceNumber
}
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
}
batchResult := BatchResult{
BatchItemFailures: batchItemFailures,
}
return &batchResult, nil
}
func main() {
lambda.Start(HandleRequest)
}
- Java
-
- SDK for Java 2.x
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Java 搭配 Lambda 報告 DynamoDB 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;
import java.util.ArrayList;
import java.util.List;
public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
@Override
public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
try {
//Process your record
StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse();
}
}
- JavaScript
-
- SDK for JavaScript (v3)
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 報告 DynamoDB 批次項目失敗。
export const handler = async (event) => {
const records = event.Records;
let curRecordSequenceNumber = "";
for (const record of records) {
try {
// Process your record
curRecordSequenceNumber = record.dynamodb.SequenceNumber;
} catch (e) {
// Return failed record's sequence number
return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
}
}
return { batchItemFailures: [] };
};
使用 TypeScript 搭配 Lambda 報告 DynamoDB 批次項目失敗。
import {
DynamoDBBatchResponse,
DynamoDBBatchItemFailure,
DynamoDBStreamEvent,
} from "aws-lambda";
export const handler = async (
event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
let curRecordSequenceNumber;
for (const record of event.Records) {
curRecordSequenceNumber = record.dynamodb?.SequenceNumber;
if (curRecordSequenceNumber) {
batchItemFailures.push({
itemIdentifier: curRecordSequenceNumber,
});
}
}
return { batchItemFailures: batchItemFailures };
};
- PHP
-
- SDK for PHP
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 PHP 搭配 Lambda 報告 DynamoDB 批次項目失敗。
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$dynamoDbEvent = new DynamoDbEvent($event);
$this->logger->info("Processing records");
$records = $dynamoDbEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Python 搭配 Lambda 報告 DynamoDB 批次項目失敗。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- SDK for Ruby
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 報告 DynamoDB 批次項目失敗。
def lambda_handler(event:, context:)
records = event["Records"]
cur_record_sequence_number = ""
records.each do |record|
begin
# Process your record
cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
rescue StandardError => e
# Return failed record's sequence number
return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
end
end
{"batchItemFailures" => []}
end
- Rust
-
- SDK for Rust
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Rust 搭配 Lambda 報告 DynamoDB 批次項目失敗。
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord, StreamRecord},
streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
let stream_record: &StreamRecord = &record.change;
// process your stream record here...
tracing::info!("Data: {:?}", stream_record);
Ok(())
}
/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
let mut response = DynamoDbEventResponse {
batch_item_failures: vec![],
};
let records = &event.payload.records;
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in records {
tracing::info!("EventId: {}", record.event_id);
// Couldn't find a sequence number
if record.change.sequence_number.is_none() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: Some("".to_string()),
});
return Ok(response);
}
// Process your record here...
if process_record(record).is_err() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: record.change.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!("Successfully processed {} record(s)", records.len());
Ok(response)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
Powertools for 的批次處理器公用程式 AWS Lambda 會自動處理部分批次回應邏輯,降低實作批次失敗報告的複雜性。以下是使用批次處理器的範例:
- Python
-
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()
def record_handler(record):
logger.info(record)
# Your business logic here
# Raise an exception to mark this record as failed
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context
)
- TypeScript
-
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
logger.info('Processing record', { record });
// Your business logic here
// Throw an error to mark this record as failed
};
export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
- Java
-
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;
public DynamoDBStreamBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.buildWithRawMessageHandler(this::processMessage);
}
@Override
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
return handler.processBatch(ddbEvent, context);
}
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
// Process the change record
}
}
- .NET
-
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;
[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]
namespace HelloWorld;
public class Customer
{
public string? CustomerId { get; set; }
public string? Name { get; set; }
public string? Email { get; set; }
public DateTime CreatedAt { get; set; }
}
internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer>
{
public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(customer.Email))
{
throw new ArgumentException("Customer email is required");
}
return await Task.FromResult(RecordHandlerResult.None);
}
}
public class Function
{
[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
{
return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
}