DynamoDB 및 Lambda로 부분 배치 응답 구성
이벤트 소스에서 스트리밍 데이터를 사용하고 처리할 때 기본적으로 Lambda는 배치가 완전히 성공한 경우에만 배치의 가장 높은 시퀀스 번호로 체크포인트를 수행합니다. Lambda는 다른 모든 결과를 완전한 실패로 처리하고 재시도 제한까지 배치 처리를 재시도합니다. 스트림에서 배치를 처리하는 동안 부분적인 성공을 허용하려면 ReportBatchItemFailures를 설정합니다. 부분적인 성공을 허용하면 레코드에 대한 재시도 횟수를 줄이는 데 도움이 되지만 성공한 레코드의 재시도 가능성을 완전히 막지는 못합니다.
ReportBatchItemFailures를 켜려면 FunctionResponseTypes 목록에 열거형 값 ReportBatchItemFailures를 포함시킵니다. 이 목록은 함수에 대해 활성화된 응답 유형을 나타냅니다. 이벤트 소스 매핑을 생성하거나 업데이트할 때 이 목록을 구성할 수 있습니다.
함수 코드가 부분 배치 실패 응답을 반환하더라도 이벤트 소스 매핑에 대해 ReportBatchItemFailures 기능이 명시적으로 활성화되지 않으면 Lambda가 해당 응답을 처리하지 않습니다.
보고서 구문
배치 항목 실패에 대한 보고를 구성할 때 StreamsEventResponse 클래스는 배치 항목 실패 목록과 함께 반환됩니다. StreamsEventResponse 객체를 사용하여 배치에서 첫 번째 실패한 레코드의 시퀀스 번호를 반환할 수 있습니다. 올바른 응답 구문을 사용하여 고유한 사용자 지정 클래스를 생성할 수도 있습니다. 다음 JSON 구조는 필요한 응답 구문을 보여줍니다.
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
batchItemFailures 어레이에 여러 항목이 포함되어 있으면 Lambda는 시퀀스 번호가 가장 낮은 레코드를 체크포인트로 사용합니다. 그런 다음 Lambda는 해당 체크포인트에서 시작하여 모든 레코드를 다시 시도합니다.
성공 및 실패 조건
Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.
Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.
-
빈 문자열 itemIdentifier
-
null itemIdentifier
-
키 이름이 잘못된 itemIdentifier
Lambda는 재시도 전략에 따라 실패를 재시도합니다.
배치 이등분
호출이 실패하고 BisectBatchOnFunctionError가 활성화되어 있으면 ReportBatchItemFailures 설정에 관계 없이 배치가 이등분됩니다.
부분적 배치 성공 응답이 수신되고 BisectBatchOnFunctionError 및 ReportBatchItemFailures가 모두 활성화되면 배치가 반환된 시퀀스 번호에서 이등분되고 Lambda는 나머지 레코드만 재시도합니다.
부분 배치 응답 로직 구현을 간소화하려면 이러한 복잡성을 자동 처리할 수 있는 Powertools for AWS Lambda의 배치 프로세서 유틸리티를 사용하는 것이 좋습니다.
다음은 일괄적으로 실패한 메시지 ID 목록을 반환하는 함수 코드의 몇 가지 예입니다.
- .NET
-
- SDK for .NET
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
.NET을 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
public class Function
{
public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
StreamsEventResponse streamsEventResponse = new StreamsEventResponse();
foreach (var record in dynamoEvent.Records)
{
try
{
var sequenceNumber = record.Dynamodb.SequenceNumber;
context.Logger.LogInformation(sequenceNumber);
}
catch (Exception ex)
{
context.Logger.LogError(ex.Message);
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
}
}
if (batchItemFailures.Count > 0)
{
streamsEventResponse.BatchItemFailures = batchItemFailures;
}
context.Logger.LogInformation("Stream processing complete.");
return streamsEventResponse;
}
}
- Go
-
- SDK for Go V2
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Go를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
type BatchItemFailure struct {
ItemIdentifier string `json:"ItemIdentifier"`
}
type BatchResult struct {
BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}
func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
var batchItemFailures []BatchItemFailure
curRecordSequenceNumber := ""
for _, record := range event.Records {
// Process your record
curRecordSequenceNumber = record.Change.SequenceNumber
}
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
}
batchResult := BatchResult{
BatchItemFailures: batchItemFailures,
}
return &batchResult, nil
}
func main() {
lambda.Start(HandleRequest)
}
- Java
-
- SDK for Java 2.x
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Java를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;
import java.util.ArrayList;
import java.util.List;
public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
@Override
public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
try {
//Process your record
StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse();
}
}
- JavaScript
-
- SDK for JavaScript (v3)
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
JavaScript를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
export const handler = async (event) => {
const records = event.Records;
let curRecordSequenceNumber = "";
for (const record of records) {
try {
// Process your record
curRecordSequenceNumber = record.dynamodb.SequenceNumber;
} catch (e) {
// Return failed record's sequence number
return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
}
}
return { batchItemFailures: [] };
};
TypeScript를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
import {
DynamoDBBatchResponse,
DynamoDBBatchItemFailure,
DynamoDBStreamEvent,
} from "aws-lambda";
export const handler = async (
event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
let curRecordSequenceNumber;
for (const record of event.Records) {
curRecordSequenceNumber = record.dynamodb?.SequenceNumber;
if (curRecordSequenceNumber) {
batchItemFailures.push({
itemIdentifier: curRecordSequenceNumber,
});
}
}
return { batchItemFailures: batchItemFailures };
};
- PHP
-
- SDK for PHP
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
PHP를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$dynamoDbEvent = new DynamoDbEvent($event);
$this->logger->info("Processing records");
$records = $dynamoDbEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Python을 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- SDK for Ruby
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Ruby를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
def lambda_handler(event:, context:)
records = event["Records"]
cur_record_sequence_number = ""
records.each do |record|
begin
# Process your record
cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
rescue StandardError => e
# Return failed record's sequence number
return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
end
end
{"batchItemFailures" => []}
end
- Rust
-
- SDK for Rust
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Rust를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord, StreamRecord},
streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
let stream_record: &StreamRecord = &record.change;
// process your stream record here...
tracing::info!("Data: {:?}", stream_record);
Ok(())
}
/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
let mut response = DynamoDbEventResponse {
batch_item_failures: vec![],
};
let records = &event.payload.records;
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in records {
tracing::info!("EventId: {}", record.event_id);
// Couldn't find a sequence number
if record.change.sequence_number.is_none() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: Some("".to_string()),
});
return Ok(response);
}
// Process your record here...
if process_record(record).is_err() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: record.change.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!("Successfully processed {} record(s)", records.len());
Ok(response)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
Powertools for AWS Lambda의 배치 프로세서 유틸리티는 부분 배치 응답 로직을 자동으로 처리하여 배치 실패 보고 구현에 따르는 복잡성을 줄입니다. 다음은 배치 프로세서를 사용하는 예제입니다.
- Python
-
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()
def record_handler(record):
logger.info(record)
# Your business logic here
# Raise an exception to mark this record as failed
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context
)
- TypeScript
-
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
logger.info('Processing record', { record });
// Your business logic here
// Throw an error to mark this record as failed
};
export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
- Java
-
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;
public DynamoDBStreamBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.buildWithRawMessageHandler(this::processMessage);
}
@Override
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
return handler.processBatch(ddbEvent, context);
}
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
// Process the change record
}
}
- .NET
-
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;
[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]
namespace HelloWorld;
public class Customer
{
public string? CustomerId { get; set; }
public string? Name { get; set; }
public string? Email { get; set; }
public DateTime CreatedAt { get; set; }
}
internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer>
{
public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(customer.Email))
{
throw new ArgumentException("Customer email is required");
}
return await Task.FromResult(RecordHandlerResult.None);
}
}
public class Function
{
[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
{
return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
}