使用 DynamoDB 和 Lambda 配置部分批处理响应
在使用和处理来自事件源的流式数据时,默认情况下,Lambda 仅在批处理完全成功时,才会在批次的最高序列号处设置检查点。Lambda 会将所有其他结果视为完全失败并重试批处理,直至达到重试次数上限。要允许在处理来自流的批次时部分成功,请开启 ReportBatchItemFailures。允许部分成功有助于减少对记录重试的次数,尽管这并不能完全阻止在成功记录中重试的可能性。
要开启 ReportBatchItemFailures,请在 FunctionResponseTypes 列表中包含枚举值 ReportBatchItemFailures。此列表指示为函数启用了哪些响应类型。您可以在创建或更新事件源映射时配置此列表。
即使函数代码返回部分批处理失败响应,除非为事件源映射显式启用 ReportBatchItemFailures 功能,否则 Lambda 也不会处理这些响应。
报告语法
配置批处理项目失败的报告时,将返回 StreamsEventResponse 类,其中包含批处理项目失败列表。您可以使用 StreamsEventResponse 对象返回批处理中第一个失败记录的序列号。您还可以使用正确的响应语法来创建自己的自定义类。以下 JSON 结构显示了所需的响应语法:
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
如果 batchItemFailures 数组包含多个项目,Lambda 会使用序列号最小的记录作为检查点。然后,Lambda 会重试从该检查点开始的所有记录。
成功和失败的条件
如果返回以下任意一项,则 Lambda 会将批处理视为完全成功:
-
空的 batchItemFailure 列表
-
Null batchItemFailure 列表
-
空的 EventResponse
-
Null EventResponse
如果返回以下任何一项,则 Lambda 会将批处理视为完全失败:
-
空字符串 itemIdentifier
-
Null itemIdentifier
-
包含错误密钥名的 itemIdentifier
Lambda 会根据您的重试策略在失败时重试。
将批次一分为二
如果调用失败并且已开启 BisectBatchOnFunctionError,则无论您的 ReportBatchItemFailures 设置如何,批次都将一分为二。
当收到批处理部分成功响应且同时开启 BisectBatchOnFunctionError 和 ReportBatchItemFailures 时,批次将在返回的序列号处一分为二,并且 Lambda 将仅重试剩余记录。
为了简化部分批处理响应逻辑的实现,请考虑使用 Powertools for AWS Lambda 中的批处理器实用程序,它可以自动为您处理这些复杂问题。
以下函数代码示例将返回批处理中处理失败消息的 ID 列表:
- .NET
-
- 适用于 .NET 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 .NET 通过 Lambda 进行 DynamoDB 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
public class Function
{
public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
StreamsEventResponse streamsEventResponse = new StreamsEventResponse();
foreach (var record in dynamoEvent.Records)
{
try
{
var sequenceNumber = record.Dynamodb.SequenceNumber;
context.Logger.LogInformation(sequenceNumber);
}
catch (Exception ex)
{
context.Logger.LogError(ex.Message);
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
}
}
if (batchItemFailures.Count > 0)
{
streamsEventResponse.BatchItemFailures = batchItemFailures;
}
context.Logger.LogInformation("Stream processing complete.");
return streamsEventResponse;
}
}
- Go
-
- SDK for Go V2
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Go 通过 Lambda 进行 DynamoDB 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
type BatchItemFailure struct {
ItemIdentifier string `json:"ItemIdentifier"`
}
type BatchResult struct {
BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}
func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
var batchItemFailures []BatchItemFailure
curRecordSequenceNumber := ""
for _, record := range event.Records {
// Process your record
curRecordSequenceNumber = record.Change.SequenceNumber
}
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
}
batchResult := BatchResult{
BatchItemFailures: batchItemFailures,
}
return &batchResult, nil
}
func main() {
lambda.Start(HandleRequest)
}
- Java
-
- 适用于 Java 的 SDK 2.x
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Java 通过 Lambda 进行 DynamoDB 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;
import java.util.ArrayList;
import java.util.List;
public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
@Override
public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
try {
//Process your record
StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse();
}
}
- JavaScript
-
- 适用于 JavaScript 的 SDK(v3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 JavaScript 报告 Lambda 的 DynamoDB 批处理项目失败。
export const handler = async (event) => {
const records = event.Records;
let curRecordSequenceNumber = "";
for (const record of records) {
try {
// Process your record
curRecordSequenceNumber = record.dynamodb.SequenceNumber;
} catch (e) {
// Return failed record's sequence number
return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
}
}
return { batchItemFailures: [] };
};
使用 TypeScript 报告 Lambda 的 DynamoDB 批处理项目失败。
import {
DynamoDBBatchResponse,
DynamoDBBatchItemFailure,
DynamoDBStreamEvent,
} from "aws-lambda";
export const handler = async (
event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
let curRecordSequenceNumber;
for (const record of event.Records) {
curRecordSequenceNumber = record.dynamodb?.SequenceNumber;
if (curRecordSequenceNumber) {
batchItemFailures.push({
itemIdentifier: curRecordSequenceNumber,
});
}
}
return { batchItemFailures: batchItemFailures };
};
- PHP
-
- 适用于 PHP 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 PHP 通过 Lambda 进行 DynamoDB 批处理项目失败。
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$dynamoDbEvent = new DynamoDbEvent($event);
$this->logger->info("Processing records");
$records = $dynamoDbEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- 适用于 Python 的 SDK (Boto3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Python 通过 Lambda 进行 DynamoDB 批处理项目失败。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- 适用于 Ruby 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Ruby 通过 Lambda 进行 DynamoDB 批处理项目失败。
def lambda_handler(event:, context:)
records = event["Records"]
cur_record_sequence_number = ""
records.each do |record|
begin
# Process your record
cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
rescue StandardError => e
# Return failed record's sequence number
return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
end
end
{"batchItemFailures" => []}
end
- Rust
-
- 适用于 Rust 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Rust 通过 Lambda 进行 DynamoDB 批处理项目失败。
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord, StreamRecord},
streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
let stream_record: &StreamRecord = &record.change;
// process your stream record here...
tracing::info!("Data: {:?}", stream_record);
Ok(())
}
/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
let mut response = DynamoDbEventResponse {
batch_item_failures: vec![],
};
let records = &event.payload.records;
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in records {
tracing::info!("EventId: {}", record.event_id);
// Couldn't find a sequence number
if record.change.sequence_number.is_none() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: Some("".to_string()),
});
return Ok(response);
}
// Process your record here...
if process_record(record).is_err() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: record.change.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!("Successfully processed {} record(s)", records.len());
Ok(response)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
Powertools for AWS Lambda 中的批处理器实用程序会自动处理部分批处理响应逻辑,从而降低实施批处理故障报告的复杂性。下面是使用批处理器的示例:
- Python
-
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()
def record_handler(record):
logger.info(record)
# Your business logic here
# Raise an exception to mark this record as failed
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context
)
- TypeScript
-
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
logger.info('Processing record', { record });
// Your business logic here
// Throw an error to mark this record as failed
};
export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
- Java
-
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;
public DynamoDBStreamBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.buildWithRawMessageHandler(this::processMessage);
}
@Override
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
return handler.processBatch(ddbEvent, context);
}
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
// Process the change record
}
}
- .NET
-
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;
[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]
namespace HelloWorld;
public class Customer
{
public string? CustomerId { get; set; }
public string? Name { get; set; }
public string? Email { get; set; }
public DateTime CreatedAt { get; set; }
}
internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer>
{
public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(customer.Email))
{
throw new ArgumentException("Customer email is required");
}
return await Task.FromResult(RecordHandlerResult.None);
}
}
public class Function
{
[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
{
return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
}